elixir-lang / gen_stage

Producer and consumer actors with back-pressure for Elixir
http://hexdocs.pm/gen_stage
1.52k stars 193 forks source link

Need documentation for cancelling flows of finite producers #85

Closed aaronrenner closed 8 years ago

aaronrenner commented 8 years ago

The original question was brought up on this ElixirForum post but here's a recap.

I am creating a finite GenStage producer that reads the body of a hackney http request. Ideally, I'd like to hook this Producer up to a Flow using Flow.from_stage/1. I've been able to create a basic producer that reads the request, but don't know how to shut down the flow after the body has been read. Is this possible?

@josevalim mentioned that I needed to use GenStage.async_notify(self(), {:producer, :done}) to shut down all of the consumers and this behavior would be good to document. It would also be good to mention how GenStage.Streamer tracks the subscriptions and cancellations of its consumers when the consumers: is set to :permanent. It looks like once all of the consumers have been deleted, it returns {:stop, :normal, state} to shut down itself. Without this tracking it appears the producer won't shut down.

I'm also wondering where should documentation for this go? Is GenStage.async_notify(self(), {:producer, :done}) specific to Flow, or does it work for GenStage as well? I noticed in the GenStage.from_enumerable/2 function it mentions you could also send GenStage.async_notify(self(), {:producer, :halted}). What does that do?

Thanks so much for your help!

josevalim commented 8 years ago

@aaronrenner I just noticed that almost all of this is documented in the GenStage.from_enumerable/2 function. I am also copying some of this documentation to the Flow module where we can tie it all up together.

aaronrenner commented 8 years ago

Those changes look great, @josevalim!

Would it be worth mentioning that if you want the producer to shut down after it calls GenStage.async_notify(self(), {:producer, :done}), then you have to track subscriptions and cancellations, like is done in GenStage.Streamer? Or is it the responsibility of the code that started the producer with GenStage.start_link to manually stop it after the flow is complete?

josevalim commented 8 years ago

That's worth talking about too, yes. :)

josevalim commented 8 years ago

Pushed those too. But yes, it is responsibility of the producer.