dashbitco / flow

Computational parallel flows on top of GenStage
https://hexdocs.pm/flow
1.55k stars 90 forks source link

Need help understanding back pressure #113

Closed narrowtux closed 2 years ago

narrowtux commented 2 years ago

When I write this code:

1..20
|> Flow.from_enumerable(min_demand: 0, max_demand: 5, stages: 1)
|> Flow.map(fn item ->
  IO.puts "Flow.map #{item}"
  item
end)
|> Enum.each(fn item ->
  Process.sleep(10)
  IO.puts "Enum.each #{item}"
end)

I expect it to print

Flow.map 1
Flow.map 2
Flow.map 3
Flow.map 4
Flow.map 5
Enum.each 1
Flow.map 6
Enum.each 2

and so on.

But when I run it, it prints this:

Flow.map 1
Flow.map 2
Flow.map 3
Flow.map 4
Flow.map 5
Flow.map 6
Flow.map 7
Flow.map 8
Flow.map 9
Flow.map 10
Flow.map 11
Flow.map 12
Flow.map 13
Flow.map 14
Flow.map 15
Flow.map 16
Flow.map 17
Flow.map 18
Flow.map 19
Flow.map 20
Enum.each 1
Enum.each 2
Enum.each 3
Enum.each 4
Enum.each 5
Enum.each 6
Enum.each 7
Enum.each 8
Enum.each 9
Enum.each 10
Enum.each 11
Enum.each 12
Enum.each 13
Enum.each 14
Enum.each 15
Enum.each 16
Enum.each 17
Enum.each 18
Enum.each 19
Enum.each 20

I think this means that the first call to Flow.map makes the consumer immediately output everything it has to offer, and doesn't properly back-pressure the demand that's slowed down by Enum.each.

In production, we are running code similar to this, except that 1..20 is a database query as a GenStage producer, and Enum.each is |> Stream.chunk_every(250) |> Enum.each(&Repo.insert_all(Schema, &1)).

What we're seeing is that the query stage will immediately load all entries for the query as fast as it can and then the node crashes due to the memory being exhausted.

josevalim commented 2 years ago

Between the mapper and the enumerable there is another genstage connection (prod-consumer), I am not sure if there is a way to customize the demand for it right now. Maybe we should add a Flow.to_stream, so you can customize that step, but I also think you should move everything inside Flow instead, even the repo operations, and do Flow.run. But if you want to send a PR for to_stream it is welcoime.