elixir-lang / gen_stage

Producer and consumer actors with back-pressure for Elixir
http://hexdocs.pm/gen_stage
1.52k stars 192 forks source link

:producer_consumer doesn't invoke handle_demand #214

Closed toddkazakov closed 6 years ago

toddkazakov commented 6 years ago

When we have a producer->producer_consumer->consumer pipeline, producer_consumers ought to provide ability to "intercept" demand sent by consumers by overriding handle_demand. This will allow those stages to buffer consumer demand and only send the demand upstream when certain conditions are met (using GenStage.ask and :manual). Currently handle_demand is not invoked and when we have a :manual subscriptions it's not clear when consumers are ready to consume events.

I was trying to implement a reusable and pluggable producer_consumer rate-limiting stage, but was surprised that it doesn't work as I expected (in the past I have used akka-streams). Is there maybe another way to achieve this functionality?

josevalim commented 6 years ago

Hi @toddkazakov, a producer_consumer works more as a buffer. So it makes sure that it always has max_demand events ready to consume, which are then sent downstream based on how a consumer asks.Therefore the handle_demand aspect works transparently.

If that's not enough, then you can always implement your own process that implements the GenStage protocol, which is documented in the GenStage module itself.

Have fun!

toddkazakov commented 6 years ago

@josevalim thanks for the quick reply, but I'm not sure I understand why producer_consumers must be limited to being buffers, instead of being allowed to control demand.

Here's my use case: an ETL pipeline which fetches data from a multiple services (multiple producers), then calls multiple apis in succession to fetch metadata about those records (multiple producer_consumers) and then persists the results in batches. However, each of those APIs have different rate-limits and in this case producer_consumers have not way of controlling their own demand. Should just assume GenStage is not the right tool for this use-case?

Implementing a new process from scratch the re-implements most of the current logic seems unnecessary.

josevalim commented 6 years ago

There is no reason besides it was never implemented. Although I think that, in your particular case, those rates belong in the producers, because if a producer has multiple consumers, those rates need to work across all of the consumers demand.

I am working on a similar issue and that’s where we placed it. :) --

José Valimwww.plataformatec.com.br http://www.plataformatec.com.br/Founder and Director of R&D

toddkazakov commented 6 years ago

Producers in my case (or even more generally) don't need to know specifics of how the events they produce are being consumed (i.e. details about third party apis and rate-limits). They should be concerned only with satisfying the demand that's propagated by their consumers. Downstream stages ought to signal the demand they can handle.

I'll see if I can grok the code in a reasonable time and open a PR.

josevalim commented 6 years ago

@toddkazakov that's what I am saying though. Your reusable producer-consumer stage could be implemented as a producer module that calls the other producer module code. It is preferable for you to do the functionality composition on the functional side of things than on the pipeline side of things, as adding processes to the pipeline implies runtime costs.

Imagine someone implemented SQSProducer. And you want to provide your functionality on top of it. I would do it like GenStage.start_link(YourRateLimieterProducer, {SQSProducer, ...}) and then you wrap the producer callbacks. So when somebody calls your handle_demand, you choose to call the producer handle_demand (or not).

If you do a producer_consumer as a separate process instead, then it means everyone talking to your producer has to go through the producer_consumer. Which is most likely unnecessary overhead. And if your producer has multiple producer consumers, they will all have only a partial view of what the producer is doing, and you won't be able to achieve what you want to.

toddkazakov commented 6 years ago

I get your point - that's a good workaround, but when you have to wrap-around multiple producers, the rate limiter has to do more than one thing - rate limit and take the decision which one to call (are they called in round-robin fashion, randomly, etc?) so it also becomes a scheduler/dispatcher.

Yes, a producer_consumer process implies run-time costs, but at least in my mental model, I'd say it easier to reason about, makes code cleaner, where each stage has strictly defined responsibilities.

I'll go in the direction you suggested, because from first sight, what I proposed doesn't look as an easy change. I still believe this is valid issue, because there are different trade-offs in the two approaches outlined here.

Thanks for the good discussion!

josevalim commented 6 years ago

GenStage also has a dispatcher module, which probably allows you to do both rate limiting and the dispatching control. So if you want both, then maybe you can start by implementing a new dispatcher. --

José Valimwww.plataformatec.com.br http://www.plataformatec.com.br/Founder and Director of R&D

toddkazakov commented 6 years ago

I fail to understand how a rate-limiting dispatcher doesn't also violate separation of concerns.

josevalim commented 6 years ago

The reason I mentioned a dispatcher is, because it sits between producers and consumers, it can do both things you asked for: rate-limiting AND control dispatching. A dispatcher is a well defined contract built on top of the stages' communication. It understands both the demand that is forwarded to producers and the events that are dispatched to consumers, all from the producer process. This is precisely why it was designed.

I think the discrepancy in our lines of thoughts come from here:

Yes, a producer_consumer process implies run-time costs, but at least in my mental model, I'd say it easier to reason about, makes code cleaner, where each stage has strictly defined responsibilities.

We typically do not use processes to define software boundaries. We use modules and behaviours for that, processes should model runtime concerns. We also mention this in the GenStage docs. Sasa Juric has a very good article on the matter too.

So overall, I don't agree with how you are splitting the concerns, which is why I don't see this as a violation of concerns, while you do. :) But that's fine, we don't have to agree, and I think we both understood each other well. :+1:

toddkazakov commented 6 years ago

How can a dispatcher buffer demand? From what I understand ask is called every time a consumer asks for events. Let's say we reach the rate-limit and start buffering demand in dispatcher's state and return {:ok, 0, next_state}. How can a tick be scheduled after a period of time to replay the buffered demand so it's propagated to the producers?

josevalim commented 6 years ago

@toddkazakov that's a very good question. I believe you can use the info callback, the only limitation is that it cannot return demand, so we would have to amend that but that should be a small change. If that doesn't work, then we can discuss extending the callback return, which I believe should be easier than both the producer wrapping (that I suggested) or the producer_consumer extension you had in mind. I am looking forward to see what you will end up with!

toddkazakov commented 6 years ago

@josevalim If I can schedule an info message from within the dispatcher to the dispatcher after a certain period of time, there's no need to make the info callback return demand. One can just return an "unsuspended" dispatcher state and do:

    for {counter, pid, ref} <- buffered_demands do
      send(self(), {:"$gen_producer", {pid, ref}, {:ask, counter}})
    end

Which will then invoke ask for all buffered demands.

GenStage.cast(self(), {:"$info", :tick}) does the trick, but can only be scheduled with an async task which seems to be over complicated. GenStage.async_info() also works.

Unfortunately, I'll need you help here, because I don't have a good intuition why the info callback exists on the dispatcher, when is it called under normal circumstances and why?

If a tick is schedule with send(self(), :tick) this gets delivered to the producer, which just prints a message that a handle_info callback is not implemented for this message.

Looks like we can add a new handle_info(:"$gen_dispatcher", message), which then invokes the info callback on the dispatcher.

What do you think? `

josevalim commented 6 years ago

If I can schedule an info message from within the dispatcher to the dispatcher after a certain period of time, there's no need to make the info callback return demand.

I believe you shouldn't do that. Demand should never be lost or discarded, otherwise things can get out of sync. For example, if a consumer asks for 10 items and the producer does not have it available, then it is the responsibility of the producer to store that. Therefore, if you send an ask message to the producer, then you are generating an increased demand that will have to be answered.

The info callback is used for GenStage.async_info. It is a mechanism that you can control a message to be delivered to consumers only after all of the currently buffered events are processed. However, because the dispatcher is the one who delivers it, it is OK if you intercept it in the dispatcher to do something else, instead of delivering it.

toddkazakov commented 6 years ago

What I described is way to buffer the demand in the dispatcher until a rate-limit is reset.

A consumer asks 10 times for 1 item, but we can process only 5 of them in a certain period of time. This will invoke dispatcher's ask 10 times. For the first 5 items we return {:ok, 1, state}, but when we reach the rate-limit we should buffer the demand in dispatcher's state (e.g. {:ok, 0, {state | buffered: buffered+1}). The demand is not lost or discarded - it's buffered in the dispatcher and never delivered to the producer in first place. Then the producer dispatches 5 items, which meets the current demand that's within the rate limit. This will invoke dispatcher's dispatch and those events will be delivered to consumers. The consumer then demands another 5 items, but the demand is buffered again, because it's still rate-limited. At this point if there are no new subscriptions, cancellations or invocations of GenStage.(a)sync_info, thus no callback of the dispatcher will be invoked. Hence the need to schedule a tick to the dispatcher somehow when the rate-limit is reset. Broadcast dispatcher also sends :ask to the producer in its dispatch callback for discarded items.

What other way is there to rate-limit in the dispatcher?

josevalim commented 6 years ago

Your description is correct! My comment was just that the dispatcher should not ask the producer itself again using send(self(), {:"$gen_producer", {pid, ref}, {:ask, counter}}) or similar from your previous comment, as that will lead to duplicated demand. The info route should be safe to go.

toddkazakov commented 6 years ago

So the correct way to do it is something like:

  def info(:tick, state) do
    buffered_demand = state.buffered_demand
    {:ok, buffered_demand, {state | buffered_demand: 0}
  end

and make sure dispatch_info is invoked with dispatcher_callback(:info, ...) so the returned demand is processed correctly.

Now back to my original question - how to schedule that tick from within the dispatcher? The only reasonable way I see is with Process.send_after(self(), {:"$gen_dispatcher", msg}, interval). Would it be ok with you if I extend the protocol so those messages are relayed by producers (and producer_consumers) to the dispatcher's info callback?

josevalim commented 6 years ago

@toddkazakov I think we need to two changes: we need to have a GenStage.async_info_after, which will do the scheduling from within the dispatcher, and we need to allow the info callback in the dispatcher to return a demand, as you showed.

I don't see why we would need to extend the message protocol, as all of this happens within the producer. Does it make sense? Or am I missing something?

toddkazakov commented 6 years ago

async_info uses cast() - what's a good (idomatic) way to schedule this call after a given period?

josevalim commented 6 years ago

@toddkazakov we can change the implementation to use send. So it would do something like:

def async_info(stage, message) do
  send GenServer.whereis(stage), {:"$somethig_something", message}
end

And then we move the implementation to a handle_info callback. Then the other one will be:

def async_info_later(stage, message, time) do
  Process.send_after GenServer.whereis(stage), {:"$somethig_something", message}, time
end
toddkazakov commented 6 years ago

That works great. Thanks for guidance. I'm facing other issues which I described in WIP PR #215