dashbitco / flow

Computational parallel flows on top of GenStage
https://hexdocs.pm/flow
1.55k stars 90 forks source link

Using a Flow with a GenStage producer causes errors #9

Closed rschmukler closed 7 years ago

rschmukler commented 7 years ago

When using a Flow as the enumerable for the events as a GenStage producer, the process can receive consumer messages causing it to error. Interestingly enough, most of the messages to get correctly routed to the consumer but there appears to be some issue with the producer also receiving them. It appears to be some sort of race case as removing the Process.sleep causes it to work fine for me on my machine. Additionally, the error seems to only happen at the beginning and then the events will run fine afterwards.

Here is some example code that can recreate the issue:

defmodule FlowTest do
  @moduledoc """
  Documentation for FlowTest.
  """

  def run do
    {:ok, producer} = Producer.start_link()
    {:ok, consumer} = Consumer.start_link()

    GenStage.sync_subscribe(consumer, to: producer, max_demand: 200, min_demand: 100)
  end
end

defmodule Producer do
  use GenStage

  #########################
  # Public API
  #########################

  def start_link() do
    GenStage.start_link(__MODULE__, :ok)
  end

  #########################
  # GenStage Callbacks
  #########################

  def init(_) do
    {:producer, %{cont: build_flow()}}
  end

  def handle_demand(demand, %{cont: cont} = state) do
    case cont.({:cont, {[], demand}}) do
      {:suspended, {list, 0}, cont} ->
        {:noreply, :lists.reverse(list), %{state | cont: cont}}

      {_finished, {list, _}} ->
        IO.puts "Done!"
        {:noreply, :lists.reverse(list), %{}}
    end
  end

  #########################
  # Private  Helper
  #########################
  defp build_flow() do
    flow =
      0..1_000_000
      |> Flow.from_enumerable(consumers: :permanent)
      |> Flow.map(fn val ->
        # Simulate a computation
        Process.sleep(10)
        val + 1
      end)

    # Largely borrowed from the GenStage.Streamer implementation
    &Enumerable.reduce(flow, &1, fn
      x, {acc, 1} ->
        {:suspend, {[x | acc], 0}}
      x, {acc, demand} ->
        {:cont, {[x | acc], demand - 1}}
    end)
  end

end

defmodule Consumer do
  use GenStage

  def start_link() do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(_) do
    {:consumer, :no_state}
  end

  def handle_events(events, _from, state) do
    IO.puts "Got #{length events} events"
    {:noreply, [], state}
  end
end

Here is some of the error messages that come through:

13:16:59.215 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.165.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.93>}},
 [1001, 1002, 1003, 1004, 1005, 1006, 1007, 1008, 1009, 1010, 1011, 1012, 1013,
  1014, 1015, 1016, 1017, 1018, 1019, 1020, 1021, 1022, 1023, 1024, 1025, 1026,
  1027, 1028, 1029, 1030, 1031, 1032, 1033, 1034, 1035, 1036, 1037, 1038, 1039,
  1040, 1041, 1042, 1043, 1044, 1045, 1046, 1047, 1048, ...]}

13:16:59.215 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.166.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.94>}},
 [2001, 2002, 2003, 2004, 2005, 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013,
  2014, 2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024, 2025, 2026,
  2027, 2028, 2029, 2030, 2031, 2032, 2033, 2034, 2035, 2036, 2037, 2038, 2039,
  2040, 2041, 2042, 2043, 2044, 2045, 2046, 2047, 2048, ...]}

13:16:59.215 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.167.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.95>}},
 [3001, 3002, 3003, 3004, 3005, 3006, 3007, 3008, 3009, 3010, 3011, 3012, 3013,
  3014, 3015, 3016, 3017, 3018, 3019, 3020, 3021, 3022, 3023, 3024, 3025, 3026,
  3027, 3028, 3029, 3030, 3031, 3032, 3033, 3034, 3035, 3036, 3037, 3038, 3039,
  3040, 3041, 3042, 3043, 3044, 3045, 3046, 3047, 3048, ...]}

13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.168.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.96>}},
 [4001, 4002, 4003, 4004, 4005, 4006, 4007, 4008, 4009, 4010, 4011, 4012, 4013,
  4014, 4015, 4016, 4017, 4018, 4019, 4020, 4021, 4022, 4023, 4024, 4025, 4026,
  4027, 4028, 4029, 4030, 4031, 4032, 4033, 4034, 4035, 4036, 4037, 4038, 4039,
  4040, 4041, 4042, 4043, 4044, 4045, 4046, 4047, 4048, ...]}

13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.169.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.97>}},
 [5001, 5002, 5003, 5004, 5005, 5006, 5007, 5008, 5009, 5010, 5011, 5012, 5013,
  5014, 5015, 5016, 5017, 5018, 5019, 5020, 5021, 5022, 5023, 5024, 5025, 5026,
  5027, 5028, 5029, 5030, 5031, 5032, 5033, 5034, 5035, 5036, 5037, 5038, 5039,
  5040, 5041, 5042, 5043, 5044, 5045, 5046, 5047, 5048, ...]}

13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.170.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.98>}},
 [6001, 6002, 6003, 6004, 6005, 6006, 6007, 6008, 6009, 6010, 6011, 6012, 6013,
  6014, 6015, 6016, 6017, 6018, 6019, 6020, 6021, 6022, 6023, 6024, 6025, 6026,
  6027, 6028, 6029, 6030, 6031, 6032, 6033, 6034, 6035, 6036, 6037, 6038, 6039,
  6040, 6041, 6042, 6043, 6044, 6045, 6046, 6047, 6048, ...]}

13:16:59.216 [error] GenStage producer Producer received $gen_consumer message: {:"$gen_consumer",
 {#PID<0.171.0>, {#Reference<0.0.1.90>, #Reference<0.0.1.99>}},
 [7001, 7002, 7003, 7004, 7005, 7006, 7007, 7008, 7009, 7010, 7011, 7012, 7013,
  7014, 7015, 7016, 7017, 7018, 7019, 7020, 7021, 7022, 7023, 7024, 7025, 7026,
  7027, 7028, 7029, 7030, 7031, 7032, 7033, 7034, 7035, 7036, 7037, 7038, 7039,
  7040, 7041, 7042, 7043, 7044, 7045, 7046, 7047, 7048, ...]}
josevalim commented 7 years ago

If you check out GenStage.stream, which a Flow as an Enumerable is, you will see that it hijacks the current process inbox. So you have the GenStage producer and the flow racing for the same messages in the process inbox. When you halt the flow, the messages it is not able to consume as a consumer remain in the box which are then consumed by the Flow. The approach of what you are trying to do is not going to work. Why don't you connect the consumer directly to the Flow?