dashbitco / flow

Computational parallel flows on top of GenStage
https://hexdocs.pm/flow
1.55k stars 90 forks source link

provide from/into/through_flows() to connect flows #72

Closed sunaku closed 5 years ago

sunaku commented 5 years ago

Hello,

I need to connect my flow to three different downstream flows. However, I'm unable to instantiate the downstream flows because all Flow constructors require that you have knowledge of the upstream source (which must be either materialized Flow.from_stages or written as a GenStage module Flow.from_specs).

However, if there was a way to construct a non-materialized flow (e.g. Flow.new()), then I could do this:

file_parser = File.stream!(file) |> Flow.from_enumerable() # this is my existing flow

database_writer = Flow.new() |> Flow.filter(..) |> Flow.map(..) |> Flow.partition() |> Flow.reduce(..)
data_summarizer = Flow.new() |> Flow.flat_map(..) |> Flow.partition() |> Flow.reduce(..)
stats_collector = Flow.new() |> Flow.emit_and_reduce(..) |> Flow.on_trigger(..)

overall = file_parser |> Flow.through_flows(database_writer, data_summarizer, stats_collector)

In the overall flow, events coming out of upstream flow are copied into each of the downstream flows. As a bonus, this lets us further connect the downstream flows together into a more complex graph like this:

file_parser = File.stream!(file) |> Flow.from_enumerable() # this is my existing flow

database_writer = Flow.new() |> Flow.filter(..) |> Flow.map(..) |> Flow.partition() |> Flow.reduce(..)
data_summarizer = Flow.new() |> Flow.flat_map(..) |> Flow.partition() |> Flow.reduce(..)
stats_collector = Flow.new() |> Flow.emit_and_reduce(..) |> Flow.on_trigger(..)

data_summarizer = data_summarizer |> Flow.into_flows(database_writer)
stats_collector = stats_collector |> Flow.into_flows(database_writer)

overall = file_parser |> Flow.through_flows(database_writer, data_summarizer, stats_collector)

Thanks for your consideration.

josevalim commented 5 years ago

The reason why we don't support such constructs is because breaking the flows apart like that will drastically reduce our ability to optimize them. Or at best, require a big rewrite of how flow works. So you can compose the them unidirectionally, by moving each of those to a function that receives a flow, and then doing this instead:

file_parser |> database_writer() |> data_summarize() |> stats_collectors()
sunaku commented 5 years ago

I see now, thanks! :+1: In essence, I would have to perform two steps in each of my "downstream flows":

  1. pass the original input through to the next "downstream flow"
  2. do the actual work for the "downstream flow" and emit results
file_parser() |> data_summarizer() |> stats_collector() |> database_writer()

This way, all of the original output events from the file_parser make it all the way through the entire chain. Similarly, additional events generated by each of my "downstream flows" also make it all the way through. Eventually, everything ends up being written to the database by the database_writer "flow". :ok_hand: Fantastic!