oban-bg / oban

💎 Robust job processing in Elixir, backed by modern PostgreSQL and SQLite3
https://oban.pro
Apache License 2.0
3.37k stars 313 forks source link

Stream passed to `Oban.insert_all` is materialized twice? #1189

Closed michallepicki closed 2 days ago

michallepicki commented 2 days ago

Environment

Current Behavior

Hi! When optimizing some issues we encountered with passing a big list of jobs to Oban.insert_all, we thought of using a Stream instead. Oban documentation has this example:

(fn -> MyApp.Worker.new(%{}))
|> Stream.repeatedly()
|> Stream.take(100)
|> Oban.insert_all()

Stream.take returns a lazy Stream, so it suggests that Oban should work nicely all streams here.

But when reading the oban_pro code I noticed a pattern that suggests the stream would get executed twice. Simplifying a bit, here is roughly the pattern I saw, with some inspect logging added:

Repo.transaction(fn ->
  s = Repo.stream(User)  |> Stream.map(fn x -> IO.inspect(x.id); x end)

  # Oban.insert_all(s) start

  Enum.reduce(s, 0, fn u, a -> a + 1 end)
  Enum.map(s, fn u -> u.id end)

  # Oban.insert_all(s) end
end)

Expected Behavior

Ideally Oban.insert_all wouldn't run the stream passed to it twice. If that's not possible, I would expect this to be documented as a warning.

And I'm sorry if I mis-read the code! 🙏🏻

sorentwo commented 2 days ago

Job insertion is largely rewritten in Pro v1.5 and no longer iterates through the stream multiple times. However, Stream support is provided as a convenience more than an optimization (internally it uses Enum rather than Stream to iterate and the full stream)