dashbitco / flow

Computational parallel flows on top of GenStage
https://hexdocs.pm/flow
1.55k stars 90 forks source link

Combining `Flow.shuffle/2` with `Flow.start_link/2` raises an error about unknown option `:dispatcher` #116

Closed linusdm closed 1 year ago

linusdm commented 1 year ago

I'm hitting an issue when I try to combine Flow.shuffle/2 with Flow.start_link/2. For example, if I start this simple flow as a child in Kino/Livebook with

flow =
  1..3
  |> Flow.from_enumerable()
  |> Flow.shuffle()

Kino.start_child(%{id: MyFlow, start: {Flow, :start_link, [flow]}})

the error unknown options [dispatcher: GenStage.DemandDispatcher] is raised.

Running the same flow from the current process does not hit this error and yields as expected:

[1, 2, 3] =
  1..3
  |> Flow.from_enumerable()
  |> Flow.shuffle()
  |> Enum.to_list()

I only hit the error when running it with Flow version 1.2.0 and the most recent version 1.2.1. Version 1.1.0 seems to be ok. Could this be regression since 1.2.0?

I understand that the GenStage.DemandDispatcher is used when using Flow.shuffle/2 (as opposed to the GenStage.PartitionDispatcher when using Flow.partition/2). But I'm confused as of why this :dispatcher option would be unknown at that place.

Full error:

{:error,
 {{:badmatch,
   {:error,
    {{:bad_opts, "unknown options [dispatcher: GenStage.DemandDispatcher]"},
     {:child, :undefined, #Reference<0.3176698714.92274699.142312>,
      {GenStage, :start_link,
       [
         Flow.MapReducer,
         {:consumer,
          [
            subscribe_to: [{#PID<0.870.0>, [partition: 0, cancel: :transient, cancel: :transient]}],
            dispatcher: GenStage.DemandDispatcher
          ], {0, 16}, #Function<3.74786200/3 in Flow.Window.Count.materialize/5>,
          #Function<1.74786200/0 in Flow.Window.Count.materialize/5>,
          #Function<2.74786200/4 in Flow.Window.Count.materialize/5>},
         []
       ]}, :temporary, false, 5000, :worker, [Flow.MapReducer]}}}},
  [
    {Flow.Materialize, :"-start_stages/7-fun-2-", 12, [file: 'lib/flow/materialize.ex', line: 100]},
    {Enum, :reduce_range, 5, [file: 'lib/enum.ex', line: 4299]},
    {Flow.Materialize, :start_stages, 7, [file: 'lib/flow/materialize.ex', line: 92]},
    {Flow.Materialize, :materialize, 5, [file: 'lib/flow/materialize.ex', line: 31]},
    {Flow.Coordinator, :init, 1, [file: 'lib/flow/coordinator.ex', line: 27]},
    {:gen_server, :init_it, 2, [file: 'gen_server.erl', line: 851]},
    {:gen_server, :init_it, 6, [file: 'gen_server.erl', line: 814]},
    {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 240]}
  ]}}
josevalim commented 1 year ago

I will investigate but note there is no benefit in shuffling after a from_enumerable. Shuffle is only useful when shuffling across multiple flows or increasing/decreasing the number of stages after. If there is no processing before it, there is no benefit.

linusdm commented 1 year ago

Thx! Is it still useful when configuring a window, without changing the number of stages after?

I was changing to shuffle instead of partition because I have an embarrassingly parallel problem at hand. My line of thinking was that if I can avoid the hashing in the partition step, I could save some time. But maybe I'm misinterpreting this part in the documentation:

However, notice that unnecessary partitioning will increase memory usage and reduce throughput with no benefit whatsoever. Flow takes care of using all cores regardless of the number of times you call partition. You should only partition when the problem you are trying to solve requires you to route the data around. Such as the problem presented in Flow's module documentation. If you can solve a problem without using partition at all, that is typically preferred.

josevalim commented 1 year ago

You can configure the window on from_enumerable, no?

linusdm commented 1 year ago

^ yes. I had some misconceptions about Flow.shuffle/2.

For what it's worth, I saw the same error when merging flows with Flow.merge/3 with a GenStage.DemandDispatcher which might be a more realistic use, like this:

flow =
  Flow.merge(
    [
      Flow.from_enumerable(1..3),
      Flow.from_enumerable(4..6)
    ],
    GenStage.DemandDispatcher
  )

Kino.start_child(%{id: MyFlow, start: {Flow, :start_link, [flow]}})

Using the GenStage.PartitionDispatcher as second argument to Flow.merge/2 does not yield an error.

It seems a GenStage consumer stage is started with the forbidden option :dispatcher (the :dispatcher option is only applicable to :producer and :producer_consumer type stages, which sounds reasonable because a consumer stage shouldn't dispatch, as it's last in chain).

Sorry if I'm stating the obvious... I find Flow quite interesting and I'm learning a lot.

I'll try to add a unit test that triggers this situation in a PR.