oban-bg / oban

💎 Robust job processing in Elixir, backed by modern PostgreSQL and SQLite3
https://oban.pro
Apache License 2.0
3.37k stars 314 forks source link

Support Stream in `Oban.Pro.Workers.Batch.new_batch/2` #1090

Closed up2jj closed 5 months ago

up2jj commented 6 months ago

Currently, Oban.Pro.Workers.Batch.new_batch/2 accepts list of args/jobs so it is not possible to prepare batch in streaming fashion like this:

csv
|> download()
|> stream_csv_records()
|> MyBatchJob.new_batch()
|> Oban.insert_all(timeout: 60_000)

I made a workaround by adding batch_id manually:

csv
|> download()
|> stream_csv_records()
|> Stream.map(fn row ->
  company_id
  |> create_job(row, operation)
  |> put_batch_id(batch_id)
end)
|> Oban.insert_all(timeout: 60_000)

and it seems to work, but I have mixed feeling about this as this approach relies on Oban's internals (meta, batch_id, etc.). I know that new_batch/1 can be overridden but I believe it should be handled natively.

sorentwo commented 6 months ago

The current implementation of both Batch.new can't gracefully handle streams. For this to be effective, Batch.new would need to return a stream rather than a list.

There are some other big changes coming for Batches, and we'll keep this in mind. In the meantime, you can safely keep injecting a batch_id into meta (it's private, but won't be changing).

up2jj commented 6 months ago

Thanks for the explanation 👍🏻

sorentwo commented 5 months ago

This is on main for oban_pro and will be out soon (🤞) with v1.5