warpstreamlabs / bento

Fancy stream processing made operationally mundane. This repository is a fork of the original project before the license was changed.
https://warpstreamlabs.github.io/bento/
Other
969 stars 55 forks source link

Workflow Processor - DAG Execution Ordering #17

Open jem-davies opened 3 months ago

jem-davies commented 3 months ago

The Bentos provided processor Workflow executes a DAG of Nodes, "performing them in parallel where possible".

However the current implementation uses this dependency solver and it takes the approach: resolve the DAG into series of steps where the steps are performed sequentially but the nodes in the step are performed in parallel.

This means that there can be a situation where a step could be waiting for all the nodes in the previous step: even though all dependencies for the step are ready.

Consider the following DAG, from the workflow processor docs:

      /--> B -------------|--> D
     /                   /
A --|          /--> E --|
     \--> C --|          \
               \----------|--> F

The dependency solver would resolve the DAG into: [ [ A ], [ B, C ], [ E ], [ D, F ] ]. When we consider the node E, we can see the that full dependency of this node would be : A -> C -> E, however in the stage before [ E ], there is the node B so in the current Bentos Workflow implementation E would not execute until B even though there is no dependency of B for E.

jem-davies commented 3 months ago

16

richardartoul commented 3 months ago

@jem-davies your P.R makes sense to me, at least at a high level. Do you want to:

  1. Add it as a new workflow processor type (in which case probably need to add some tests to your P.R?) OR
  2. Replace the existing one because it implements the same semantics but just with improved ability to proceed early. In which case, does your new version pass all the existing tests?

Also thanks for the contribution, this looks like you spent a lot of time on it.

richardartoul commented 3 months ago

Ok I see you called out some of the question in your P.R:

  1. unit tests
  2. writing docs
  3. implementing the ability to restart DAG execution at particular places described on the workflow documentation under structured metadata
  4. Currently there is no logic to process batches > 1

Seem like making this a completely new workflow implementation may be the safest way to start then, and letting people opt into the new one if it solves a problem for them the original one doesnt?

jem-davies commented 3 months ago

@richardartoul Thanks for response

I think that because I have made a change to the way the config is structured:

OLD:


pipeline:
  processors:
    - workflow:
        order: [ [ A ], [ B] ]

        branches:
          A:
            processors:
              #...

          B:
            processors:
              #...

NEW:


pipeline:
  processors:
    - workflow_v2:

        branches:
          A:
            processors:
              #...

          B:
            dependency_list: ["A"]
            processors:
              #...

I have started to think that the most appropriate way would be to do a new workflowV2 (need to check the camel case / snake case what have you) ...

Because I think that the DAG can't be expressed as an array of arrays - and the controller can't start and stop nodes sometimes at the time they can start.

Then in the original workflow it has the ability to infer the DAG from the request_map - I am thinking that this should be removed now and have it be an explicit part of the config. What do you think about that specifically?

Yes I do need to do more work on this but was just wanting some interim feedback - thanks for quick response.

jem-davies commented 3 months ago

Restarting DAG execution - this is good we want this I think.

EDIT: something to implement on the draft PR

richardartoul commented 3 months ago

I think Benthos(Bento now..)/YAML use snake_case so sticking with that convention makes sense to me. I also agreed about an explicit dependency graph.

Are you implementing this to solve a specific problem you have, or you just noticed the deficiency and thought you'd fix it?

jem-davies commented 3 months ago

It was noticed as part of a solution that was being worked on that - it is possible that a Node in the DAG doesn't start until it's ready.

This isn't a problem in so far that it still works but it is potentially sub-optimal. In the provided configs I have put a longer sleep on the Node B that block E when it shouldn't to highlight the issue.

I have made some attempts at getting into doing some open source contributions - thought this would be a good place to start really.

Also currently working on a Neo4j plugin for Bento / Red-Panda connect - that I am assigned an issue on red-panda connect that was actually raised by another person. The Neo4j Go SDK was crap (seen other people complain about it on HackerNews) but they have reworked it so might be a bit better now.

richardartoul commented 3 months ago

Cool makes sense. Well we're totally happy for you to keep working on this and have it be a workflow_v2 processor, does that unblocks you? Let me know if I can help with anything or answer any other questions.

Also maybe join our slack/discord or shoot me an email so we can talk further.

jem-davies commented 3 months ago

yes that unblocks me thanks for your input 😄

I am on the discord - I answered the guys question earlier - dog with melon on it's head.