elixir-lang / gen_stage

Producer and consumer actors with back-pressure for Elixir
http://hexdocs.pm/gen_stage
1.52k stars 193 forks source link

Subscribing to BroadcastDispatcher is not idempotent #224

Closed evnu closed 5 years ago

evnu commented 5 years ago

It appears that BroadcastDispatcher allows a single consumer to subscribe multiple times. This can result in subtle errors. In the following example, I subscribe C multiple times to A. Replacing the default dispatcher with BroadcastDispatcher results in a timeout.

defmodule ProCon do
  use GenStage

  def init(tag) do
    # XXX fails {:producer, tag, dispatcher: GenStage.BroadcastDispatcher}
    {:producer, tag}
  end

  def handle_demand(_demand, tag) do
    {:noreply, [tag], tag}
  end
end

defmodule Con do
  use GenStage

  def init({tag, sink}) do
    {:consumer, %{tag: tag, sink: sink}}
  end

  def handle_info(:trigger, state) do
    :ok = GenStage.ask(state.subscription, 1)
    {:noreply, [], state}
  end

  def handle_subscribe(_, _options, from, state) do
    state = Map.put(state, :subscription, from)
    {:manual, state}
  end

  def handle_events(events, _from, state) do
    send(state.sink, [state.tag|events])
    {:noreply, [], state}
  end
end

defmodule Helper do
  def assert_receive(msg) do
    receive do
      ^msg -> :ok
      msg -> exit({:unexpected, msg})
    after 1000 -> exit(:timeout)
    end
  end

  def refute_receive do
    receive do
      msg -> exit({:unexpected, msg})
      after 1000 -> :ok
    end
  end
end

#
# Check that GenStage subscription is idempotent
#
{:ok, a} = GenStage.start_link(ProCon, A)
{:ok, c} = GenStage.start_link(Con, {C, self()})
GenStage.sync_subscribe(c, to: a)

send(c, :trigger)
Helper.assert_receive([C, A])
Helper.refute_receive()

GenStage.sync_subscribe(c, to: a)
send(c, :trigger)
Helper.assert_receive([C, A])
Helper.refute_receive()

Would it be possible to either allow a process to subscribe multiple times (make subscribing idempotent), or to otherwise throw an error?

josevalim commented 5 years ago

All of the dispatchers allow multiple subscriptions. It is something you have to handle in your logic yourself and not up to the dispatchers to enforce it. So the important question is: why are you subscribing twice when you don't want to? :)

evnu commented 5 years ago

why are you subscribing twice when you don't want to? :)

This came up during playing around and is of course a classic example of "stop hitting yourself". Nevertheless, I wonder for which case allowing a process to subscribe multiple times to the same source is useful. I think that this is something which does not generalize well over the different default dispatchers.

josevalim commented 5 years ago

That's a very good point. We can add a check for BroadcastDispatcher then. Could you please send a PR? Thanks!

evnu commented 5 years ago

Could you please send a PR?

Will do!

evnu commented 5 years ago

@josevalim Should subscription then be idempotent, or result in an error?

josevalim commented 5 years ago

I don't know. I think an error is likely better because if you change dispatchers they won't be idempotent?

evnu commented 5 years ago

I don't know. I think an error is likely better because if you change dispatchers they won't be idempotent?

I would also prefer an error. I am not yet sure though how to propagate that error, especially as subscription can happen asynchronously.

josevalim commented 5 years ago

Maybe the best we can do is log and discard it.

evnu commented 5 years ago

I would also prefer an error. I am not yet sure though how to propagate that error, especially as subscription can happen asynchronously.

Another option would be to make the subscription idempotent for BroadcastDispatcher and simply ignore multiple subscriptions. With that, no change to the behaviour Dispatcher would be necessary, and other dispatchers would not be affected.

EDIT: Forgot to refresh. Logging and discarding seems reasonable.