-
Notifications
You must be signed in to change notification settings - Fork 54
Description
I am trying to understand how DAG works for conditional and parallel operators,
specifically say, the below design
func1 = cb.register(lambda _, x: x * x, 'square')
func2 = cb.register(lambda _, x: x + 1, 'increment')
func3 = cb.register(lambda _, x: x % 2, 'choice')
# based on value returned by func5, I need to call either square or half
edge1 = ('choice', 'square')
edge2 = ('choice', 'increment')
cb.register_dag('branch', ['choice','square','increment'], [edge1, edge2])
cb.call_dag('branch', { 'choice': random.randint(1,2) }).get()
Though the DAG is correct, I am not getting the expected functionality. For e.g. When choice returns 0, I want square to be called and vice-versa, not both. Can you please point to references to achieve this on cloudburst? I would also like to achieve switch case functionality using this pattern.
On adding a sink function, it expects 3 parameters(1 dummy and 2 from each upstream function), which is not what I want, as only 1 path must execute.
Similarly for fork-join pattern, this is the design
func4 = func5 = cb.register(lambda _, x: x, 'fork')
func5 = cb.register(lambda _, x,y: x + y, 'merge')
edge1 = ('fork', 'square')
edge2 = ('fork', 'increment')
edge3 = ('square', 'merge')
edge4 = ('increment', 'merge')
cb.register_dag('f-m', ['fork','square','increment', 'merge'], [edge1, edge2, edge3, edge4])
cb.call_dag('f-m', { 'fork': random.randint(1,2) }).get()
This gives me expected results, What I wanted to know is if they execute in parallel, as their upstream functions complete at the same time.