elixir-lang / gen_stage

Producer and consumer actors with back-pressure for Elixir
http://hexdocs.pm/gen_stage
1.48k stars 192 forks source link

Issue changing demand on-the-fly #295

Closed manuel-rubio closed 1 year ago

manuel-rubio commented 1 year ago

I was trying different things because the documentation says that changing the demand from :forward to :accumulate when all of the consumers are still not ready could help to avoid distributing messages and then changing it back to :forward could start normally, but I found that the internal events key is configured as [1000] when I start the producer directly using :accumulate but it's [] when I change from :forward to :accumulate.

The worse thing is when I accumulated events inside of the producer, the buffer was reflecting it has events but when I was changing to :forward they were not pushed to the consumers. Even, starting/stopping the consumers I finally get one by one the events from the buffer.

To reproduce, it's a bit long...

Mix.install([{:gen_stage, ">= 0.0.0"}])

defmodule Producer do
  use GenStage

  def start_link, do: GenStage.start_link(__MODULE__, [], name: __MODULE__)

  def cast(msg), do: GenStage.cast(__MODULE__, msg)

  @impl GenStage
  @doc false
  def init([]) do
    {:producer, [], dispatcher: GenStage.BroadcastDispatcher}
  end

  @impl GenStage
  @doc false
  def handle_cast(event, state), do: {:noreply, [event], state}

  @impl GenStage
  @doc false
  def handle_demand(demand, state) do
    IO.inspect({self(), :demand, demand, state})
    {:noreply, [], state}
  end
end

defmodule Consumer do
  use GenStage

  def start_link(producer), do: GenStage.start_link(__MODULE__, [producer])

  def start_all do
    {:ok, consumer1} = start_link(Producer)
    {:ok, consumer2} = start_link(Producer)
    {consumer1, consumer2}
  end

  @impl GenStage
  @doc false
  def init([producer]) do
    {:consumer, [], subscribe_to: [producer]}
  end

  @impl GenStage
  @doc false
  def handle_events(events, from, state) do
    IO.inspect({:events, from, events, state})
    {:noreply, [], state}
  end
end

{:ok, producer} = Producer.start_link()
{consumer1, consumer2} = Consumer.start_all()
GenStage.demand(producer, :accumulate)

GenStage.stop(consumer1)
GenStage.stop(consumer2)

Producer.cast(:a1)
Producer.cast(:a2)
Producer.cast(:a3)
Producer.cast(:a4)
Producer.cast(:a5)

Process.sleep(200)
{consumer1, consumer2} = Consumer.start_all()

IO.inspect(:sys.get_state(producer).buffer, label: "BUFFER")
josevalim commented 1 year ago

@manuel-rubio can you please convert into a failing a test? It would help a lot speed up the process for a fix. :)

For the test, I think you only need to cast a1, a2, a3, initially, then cast a4 after Consumer.start_all, and assert that a1 is consumed before a4. There are already producers in the test file that can handle cast. Thank you!

josevalim commented 1 year ago

I think this is a fix:

diff --git a/lib/gen_stage.ex b/lib/gen_stage.ex
index 8d299d7..5194234 100644
--- a/lib/gen_stage.ex
+++ b/lib/gen_stage.ex
@@ -1339,9 +1339,9 @@ defmodule GenStage do
   Sets the demand mode for a producer.

   When `:forward`, the demand is always forwarded to the `c:handle_demand/2`
-  callback. When `:accumulate`, demand is accumulated until its mode is
-  set to `:forward`. This is useful as a synchronization mechanism, where
-  the demand is accumulated until all consumers are subscribed. Defaults
+  callback. When `:accumulate`, both demand and events are accumulated until
+  its mode is set to `:forward`. This is useful as a synchronization mechanism,
+  where the demand is accumulated until all consumers are subscribed. Defaults
   to `:forward`.

   This command is asynchronous.
@@ -2261,11 +2261,11 @@ defmodule GenStage do

     if is_list(events) do
       fold_fun = fn
-        d, {:noreply, %{state: state} = stage} ->
-          noreply_callback(:handle_demand, [d, state], stage)
+        event, {:noreply, stage} ->
+          handle_accumulated_event(event, stage)

-        d, {:noreply, %{state: state} = stage, _} ->
-          noreply_callback(:handle_demand, [d, state], stage)
+        event, {:noreply, stage, _} ->
+          handle_accumulated_event(event, stage)

         _, {:stop, _, _} = acc ->
           acc
@@ -2285,6 +2285,14 @@ defmodule GenStage do
     end
   end

+  defp handle_accumulated_event({:demand, d}, stage) do
+    take_from_buffer_or_handle_demand(d, stage)
+  end
+
+  defp handle_accumulated_event({:dispatch, events, length}, stage) do
+    {:noreply, dispatch_events(events, length, stage)}
+  end
+
   defp producer_subscribe(opts, from, stage) do
     %{mod: mod, state: state, dispatcher_mod: dispatcher_mod, dispatcher_state: dispatcher_state} =
       stage
@@ -2370,16 +2378,20 @@ defmodule GenStage do
         take_pc_events(queue, counter, stage)

       %{} ->
-        case take_from_buffer(counter, %{stage | dispatcher_state: dispatcher_state}) do
-          {:ok, 0, stage} ->
-            {:noreply, stage}
+        take_from_buffer_or_handle_demand(counter, %{stage | dispatcher_state: dispatcher_state})
+    end
+  end

-          {:ok, counter, %{events: :forward, state: state} = stage} ->
-            noreply_callback(:handle_demand, [counter, state], stage)
+  defp take_from_buffer_or_handle_demand(counter, stage) do
+    case take_from_buffer(counter, stage) do
+      {:ok, 0, stage} ->
+        {:noreply, stage}

-          {:ok, counter, %{events: events} = stage} when is_list(events) ->
-            {:noreply, %{stage | events: [counter | events]}}
-        end
+      {:ok, counter, %{events: :forward, state: state} = stage} ->
+        noreply_callback(:handle_demand, [counter, state], stage)
+
+      {:ok, counter, %{events: events} = stage} when is_list(events) ->
+        {:noreply, %{stage | events: [{:demand, counter} | events]}}
     end
   end

@@ -2387,6 +2399,12 @@ defmodule GenStage do
     stage
   end

+  # We don't dispatch when we are accumulating demand
+  defp dispatch_events(to_dispatch, length, %{events: events, type: :producer} = stage)
+       when is_list(events) do
+    %{stage | events: [{:dispatch, to_dispatch, length} | events]}
+  end
+
   defp dispatch_events(events, _length, %{type: :consumer} = stage) do
     error_msg =
       ~c"GenStage consumer ~tp cannot dispatch events (an empty list must be returned): ~tp~n"

Feel free to use it if you send a PR, otherwise just a test will help!

manuel-rubio commented 1 year ago

@josevalim ping ;-)