elixir-lang / gen_stage

Producer and consumer actors with back-pressure for Elixir
http://hexdocs.pm/gen_stage
1.51k stars 192 forks source link

GenStage.cancel kills the associated consumer instead of just the subscription #222

Closed vegabook closed 5 years ago

vegabook commented 5 years ago

I'm using GensStage.sync_subscribe to get a consumer to connect to a producer-consumer. Later I want to cancel the subscription, but I'd still like to use the consumer later. However using GenStage.cancel seems to kill both the subscription and the associated consumer.

Herewith some code, assuming GenStage already specified in mix.exs. This can be pasted wholesale into IEX, but you must first change @filename in GenstageTest.Consumer to something that will work on your computer. Then "touch filename", and "tail -F filename" in another terminal. This will avoid all the output swarming into iex.

defmodule GenstageTest.Producer do
  use GenStage

  def start_link(initial \\ 1) do
    GenStage.start_link(__MODULE__, initial, name: __MODULE__)
  end

  def init(counter), do: {:producer, counter}

  def handle_demand(demand, state) do
    events = Enum.to_list(state..(state + demand - 1))
    {:noreply, events, state + demand}
  end

  def handle_info({:doprint}, state) do
    IO.puts "yep"
    {:noreply, [], state}
  end

  def handle_info({:cancel, sublink}, state) do
    GenStage.cancel sublink, []
    {:noreply, [], state}
  end

end

defmodule GenstageTest.PcAddOne do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter, name: __MODULE__)
  end

  def init(state) do
    {:producer_consumer, state}
  end

  def handle_events(events, _from, state) do
    numbers =
      events
      |> Enum.map(&(&1 + 1))
    {:noreply, numbers, state}
  end
end

defmodule GenstageTest.Consumer do
  use GenStage

  # change the filename to something that works on your computer
  # create the file using "touch filename"
  # then use Linux "tail -f filename" in another terminal so that 
  # the output of this consumer doesn't overwhelm your iex session
  @filename "/home/nvidia/scratch/output.txt"

  def start_link do
    GenStage.start_link(__MODULE__, :state_doesnt_matter)
  end

  def init(state) do
    {:consumer, state}
  end

  def handle_events(events, _from, state) do
    for event <- events do
      File.write(@filename,
      Kernel.inspect(event) <> " ", [:append])
      :timer.sleep(100)
    end

    # As a consumer we never emit events
    {:noreply, [], state}
  end
end

{:ok, p} = GenstageTest.Producer.start_link(0)
{:ok, a1} = GenstageTest.PcAddOne.start_link()
{:ok, c} = GenstageTest.Consumer.start_link()
{:ok, link1} = GenStage.sync_subscribe(a1, to: p, min_demand: 0, max_demand: 1)
{:ok, link2} = GenStage.sync_subscribe(c, to: a1, min_demand: 0, max_demand: 1)

So now what you see a sequence of incrementing numbers being produced in the terminal where you ran tail -F.

Now whether I use GenStage.cancel({p, link1}, :noconnect) or GenStage.cancel({p, link1}, :nosuspend), it will stop the subscription, but also kills my producer-consumer (GenstageTest.pcAddOne).

image

Surely it's supposed to cancel the subscription, but not kill the consumer at the same time? Is this a bug? Or is there another technique I should use to reach the objective only of desubscribing without ending the consumer?

josevalim commented 5 years ago

The behavior is controlled by the :cancel option given to sync_subscribe. Note that cancel is not routed through the consumer, so the consumer doesn’t know it will be cancelled, that’s why it doesn’t treat it in any special way. From the perspective of the consumer, it is a cancellation as any other. --

José Valim www.plataformatec.com.br Skype: jv.ptec Founder and Director of R&D

josevalim commented 5 years ago

Also note the atom given to cancel is the cancellation reason. You likely want to set it to shutdown and pass cancel: :transient on sync_subscribe. --

José Valim www.plataformatec.com.br Skype: jv.ptec Founder and Director of R&D

vegabook commented 5 years ago

Okay I think I did that, but maybe I misunderstood because something still seems to be going wrong. My producer-consumer that consumes from producer still shuts down when I close the subscription between them.

image

josevalim commented 5 years ago

The second argument to cancel should be the atom :shutdown. I.e. it is the cancellation reason. --

José Valimwww.plataformatec.com.br http://www.plataformatec.com.br/Founder and Director of R&D

vegabook commented 5 years ago

aha! it works! apologies for disturbing. Not sure this is well documented? Docs seem only to give options :noconnect and :nosuspend.

josevalim commented 5 years ago

I will take a look at the docs! --

José Valimwww.plataformatec.com.br http://www.plataformatec.com.br/Founder and Director of R&D