botify-labs / simpleflow

Python library for dataflow programming.
https://botify-labs.github.com/simpleflow/
MIT License
68 stars 24 forks source link

Giving multiple blocking dependencies to tasks #173

Open ampelmann opened 7 years ago

ampelmann commented 7 years ago

Today, we have canvas to create dependencies through groups and chain. The problem is that we don't have the possibility to give them multiple dependencies for one of them.

Ex :

class MyWorkflow(Workflow):
    def run(self):
         chain1 = Chain(Group(a, b, c), Group(d, e, f))
         chain2 = Chain(Group(h, i, j), Group(k, l, m))
         my_group = Group(chain1, chain2)
         self.submit(my_group)

I'd like to run in parallel two chains, but the last part of my second chain needs Group(a, b, c) to be completed.

Note that when the workflow is dynamically built (features iteration at Botify for example), we don't necessarily have an access from chain2 to chain1

First proposal

We could give an id to a task, chain or group, and then having a function taking this id, and returning the right future status

class MyWorkflow(Workflow):
    def run(self):
         chain1 = Chain(Group(a, b, c, id="abc"), Group(d, e, f))
         chain2 = Chain(Group(h, i, j), self.wait_by_id("abc"), Group(k, l, m))
         my_group = Group(chain1, chain2)
         self.submit(my_group)

More ideas are welcome ! @jbbarth @ybastide @AsoSunag

ybastide commented 7 years ago

An alternative:

class MyWorkflow(Workflow):
    def run(self):
         chain1 = Chain(Group(a, b, c), self.signal("abc"), Group(d, e, f))
         chain2 = Chain(Group(h, i, j), self.wait_signal("abc"), Group(k, l, m))
         my_group = Group(chain1, chain2)
         self.submit(my_group)
ampelmann commented 7 years ago

@ybastide Is it the official signal concept from SWF ?

ybastide commented 7 years ago

That's what I'd use, but after a POC to check this applies

AsoSunag commented 7 years ago

By using the signal concept from SWF can we send a signal in a child workflow and receive it in another child workflow?

ybastide commented 7 years ago

Yes, as long as the sender knows the receiver's workflow ID...

... But this is not something we pass around. If the ID is deterministic, the sender can compute it; other ideas welcome :-).

jbbarth commented 7 years ago

I like the signal approach proposal by @ybastide !

Knowing the workflow ID is just a matter of deducing it from the input or from the workflow execution parameters (tag list? big up to #172 and #94!). It definitely should be deterministic in most cases imho. Or it may be something we discover during the workflow execution, wrapped into the future (we can imagine a future.id that would behave like future.result: return the activity/workflow ID if already known, raise ExecutionBlocked else).

As for your first proposal @ampelmann, do you think it would be OK with signals instead? Managing such an ID is definitely possible, but I'd prefer something generic if possible (that works with simple tasks, workflows, or group/chains).

ybastide commented 7 years ago

I'm vaguely imagining:

def signal(self, name, input=None, workflow_id=None, run_id=None, domain=None):
    SignalWorkflowExecution(signalName=name, input=input, domain=domain or self.get_domain_somewhat(), ...)

def wait_signal(self, name):
    f = self.executor.get_signal_future(name)  # completed on WorkflowExecutionSignaled
    return f.result
ampelmann commented 7 years ago

I think it's a very good idea to use signals, and it will be compatible with the child workflow needs, which was not possible with my first proposal.

Let's POC that !