Trouble with producer that initially has no events

myronmarston commented 8 years ago

I've been playing around with gen_stage today to prototype some stuff and I'm having trouble with an example that feels like it should be really simple. I'm trying to build a really simple worker pool -- the idea is that I start up a single producer and N consumers. The consumers are workers and demand N jobs (0-arity functions) to work on and then work on them. A enqueue_jobs function is provided so you can enqueue work. Here's what I've got:

# worker_pool.exs
alias Experimental.GenStage
require Logger

defmodule JobWorkerPool do
  def start_link(worker_count, subscribe_options) do
    {:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok)
    subscribe_options = Keyword.put(subscribe_options, :to, producer_pid)

    Enum.each(1..worker_count, fn _ ->
      {:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok)
      GenStage.sync_subscribe(consumer_pid, subscribe_options)

    {:ok, producer_pid}

  def enqueue_jobs(pid, jobs) do
    :ok = GenStage.call(pid, {:enqueue_jobs, jobs})

  defmodule JobProducer do
    use GenStage

    def init(:ok), do: {:producer, :queue.new()}

    def handle_call({:enqueue_jobs, jobs}, _from, queue) do
      Logger.info "Enqueued #{length jobs} jobs"
      queue = Enum.reduce(jobs, queue, &:queue.in(&1, &2))
      {:reply, :ok, [], queue}

    def handle_demand(demand, queue) do
      Logger.info "Handling #{demand} demand with a queue of size #{:queue.len(queue)}"
      {reversed_jobs, queue} = take_jobs(queue, demand, [])

      if System.get_env("FILL_IN_FAKE_JOBS") && Enum.empty?(reversed_jobs) do
        fake_jobs = Enum.map(1..demand, fn _ -> :fake_job end)
        {:noreply, fake_jobs, queue}
        {:noreply, Enum.reverse(reversed_jobs), queue}

    defp take_jobs(queue, 0, jobs), do: {jobs, queue}
    defp take_jobs(queue, n, jobs) when n > 0 do
      case :queue.out(queue) do
        {:empty, ^queue} -> {jobs, queue}
        {{:value, job}, queue} -> take_jobs(queue, n - 1, [job | jobs])

  defmodule Worker do
    use GenStage

    def init(:ok), do: {:consumer, nil}

    if System.get_env("FILL_IN_FAKE_JOBS") do
      def handle_events([:fake_job | _], _from, nil) do
        {:noreply, [], nil}

    def handle_events(jobs, _from, nil) do
      Logger.info "Handling #{length jobs} job events"
      Enum.each(jobs, &(&1.()))
      {:noreply, [], nil}

{:ok, pid} = JobWorkerPool.start_link(4, max_demand: 10)

jobs = Enum.map(1..100, fn i ->
  fn -> IO.puts "performed job #{i}" end

JobWorkerPool.enqueue_jobs(pid, jobs)

(Ignore the System.get_env("FILL_IN_FAKE_JOBS") bit for the moment -- it's a work around that I explain below).

When I run this with mix run worker_pool.exs, you can see that the workers send demand before any jobs have been enqueued (as you would expect), and then they apparently don't ever ask again, so things just sit there and nothing happens:

$ mix run worker_pool.exs
23:21:26.015 [info] Handling 10 demand with a queue of size 0
23:21:26.015 [info] Handling 10 demand with a queue of size 0
23:21:26.015 [info] Handling 10 demand with a queue of size 0
23:21:26.015 [info] Handling 10 demand with a queue of size 0
23:21:26.015 [info] Enqueued 100 jobs

However, if I fake it out and provide fake events just to satisfy the demand the consumers asked for (implemented conditionally using the FILL_IN_FAKE_JOBS env var), it works:

$ FILL_IN_FAKE_JOBS=1 mix run worker_pool.exs
23:22:53.963 [info] Handling 10 demand with a queue of size 0
23:22:53.963 [info] Handling 10 demand with a queue of size 0
23:22:53.963 [info] Handling 10 demand with a queue of size 0
23:22:53.963 [info] Handling 10 demand with a queue of size 0
23:22:53.963 [info] Enqueued 100 jobs
23:22:54.018 [info] Handling 5 demand with a queue of size 100
23:22:54.019 [info] Handling 5 demand with a queue of size 95
23:22:54.019 [info] Handling 5 demand with a queue of size 90
23:22:54.019 [info] Handling 5 demand with a queue of size 85
performed job 1
performed job 6
performed job 11
23:22:54.070 [info] Handling 5 job events
performed job 16
23:22:54.070 [info] Handling 5 job events
23:22:54.070 [info] Handling 5 job events
23:22:54.070 [info] Handling 5 job events
23:22:54.070 [info] Handling 5 demand with a queue of size 80
performed job 2
performed job 7
performed job 12
performed job 17
performed job 3
# ...

So, a few questions/comments:

I'm hoping we can figure out a solution to these problems, because I'm quite keen to use GenStage in production soon :).

josevalim commented 8 years ago

TL;DR - if consumers send demand and you can't serve it immediately, you should store it and do it the next time you have data.

When the producer returns an empty list of events from handle_demand, why do consumers stop sending demand? Apparently they give up and never ask again, which seems like a bug.

Because they ask only once. Demand is cumulative. If they asked again, you would need to serve d1 + d2 the next time. If you can't serve demand immediately, you should store that demand and serve the next jobs are enqueued. See this example: https://github.com/elixir-lang/gen_stage/blob/master/examples/gen_event.exs#L4

I read through the docs a couple times to see if I was missing something and couldn't find anything that suggested that consumers unsubscribe (or halt or whatever) if they don't get the asked for events

They don't. They are waiting for the events they asked and you never served them. :)

Interestingly enough, the success of the fake job list hack depends on the size of the list. If I send back a list of 5-10 fake jobs (half the max_demand or more), the hack works. But if I return a list of 1-4 fake jobs, it doesn't do anything, and I get the same behavior of the consumers no longer requesting work

When you return only 1-4, they never reach min_demand, so they are still waiting for the jobs they asked in the past and you haven't served.

josevalim commented 8 years ago

Btw, I will add the Broadcast example to the actual documentation. It should make cases such as these clearer.

josevalim commented 8 years ago

Fixed in master.

myronmarston commented 8 years ago

Thank you, @josevalim! The broadcast example is a huge help. When I initially read your response last night I understood that I could store the pending demand as you said, but wasn't sure how to dispatch events if handle_demand was never called since all consumers have already notified the producer of their demand. The example in the docs helped me understand that I can dispatch from handle_call.

For anyone else who runs into this issue, here's the corrected version of the worker pool I posted above:

alias Experimental.GenStage
require Logger

defmodule JobWorkerPool do
  def start_link(worker_count, subscribe_options) do
    {:ok, producer_pid} = GenStage.start_link(__MODULE__.JobProducer, :ok)
    subscribe_options = Keyword.put(subscribe_options, :to, producer_pid)

    Enum.each(1..worker_count, fn _ ->
      {:ok, consumer_pid} = GenStage.start_link(__MODULE__.Worker, :ok)
      GenStage.sync_subscribe(consumer_pid, subscribe_options)

    {:ok, producer_pid}

  def enqueue_jobs(pid, jobs) do
    :ok = GenStage.call(pid, {:enqueue_jobs, jobs})

  defmodule JobProducer do
    use GenStage

    def init(:ok), do: {:producer, {:queue.new(), 0}}

    def handle_call({:enqueue_jobs, jobs}, _from, {queue, pending_demand}) do
      Logger.info "Enqueued #{length jobs} jobs"
      queue = Enum.reduce(jobs, queue, &:queue.in(&1, &2))
      {reversed_jobs, state} = take_jobs(queue, pending_demand, [])
      {:reply, :ok, Enum.reverse(reversed_jobs), state}

    def handle_demand(demand, {queue, pending_demand}) do
      Logger.info "Handling #{demand} demand with a queue of size #{:queue.len(queue)}"
      {reversed_jobs, state} = take_jobs(queue, pending_demand + demand, [])
      {:noreply, Enum.reverse(reversed_jobs), state}

    defp take_jobs(queue, 0, jobs), do: {jobs, {queue, 0}}
    defp take_jobs(queue, n, jobs) when n > 0 do
      case :queue.out(queue) do
        {:empty, ^queue} -> {jobs, {queue, n}}
        {{:value, job}, queue} -> take_jobs(queue, n - 1, [job | jobs])

  defmodule Worker do
    use GenStage

    def init(:ok), do: {:consumer, nil}

    def handle_events(jobs, _from, nil) do
      Logger.info "Handling #{length jobs} job events"
      Enum.each(jobs, &(&1.()))
      {:noreply, [], nil}

{:ok, pid} = JobWorkerPool.start_link(4, max_demand: 10)

jobs = Enum.map(1..100, fn i ->
  fn -> IO.puts "performed job #{i}" end

JobWorkerPool.enqueue_jobs(pid, jobs)