riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.54k stars 93 forks source link

Batch processsing #453

Closed nexovec closed 3 months ago

nexovec commented 3 months ago

Hi, how do you deal with batching jobs on the side of the worker, if that's at all possible?

bgentry commented 3 months ago

@nexovec can you clarify what you mean here with an example? Are you talking about a case where you have 100 different jobs to do the same thing, and instead of executing 100 separate jobs you process them as one job with 100 IDs?

nexovec commented 3 months ago

Let's say I have a job worker like this running:

type RoomJobWorker struct {
    river.WorkerDefaults[RoomJobWorkerJobArgs]
}
func (w *RoomJobWorker) Work(ctx context.Context, job *river.Job[RoomJobWorkerJobArgs]) error {
        storeRoomInfo(job)
}

But it takes a lot of work to do this processing atomically, so I'd like something like

func (w *RoomJobWorker) WorkMany(ctx context.Context, jobs []*river.Job[RoomJobWorkerJobArgs]) error {
        storeRoomInfoMany(jobs)
}

This is notably oversimplified, not least because it's not specified how often and how many jobs can be processed.

My ideas on how to do this are all very ugly, from manually inspecting river's SQL tables to aggregating the events(i.e. Room) on the node that runs the client processing the jobs. What would you recommend?

elee1766 commented 3 months ago

not sure if we have the best approach, but we have this exact problem, and "solve" it through

  1. create another struct which does batch/asynchronous processing of jobs (so move your worker logic out of your worker). this way you can preserve/share state across runs. attach ref to this struct to the worker.
  2. process the jobs one at a time normally (river can do a few thousand qps bursts with no issues from our usage. you can already configure the # of jobs that it will poll for at a time)
  3. for the processing worker, send the work to your other struct, and wait for async completion (either from the batch, or async processing)

would love to hear what others have to say though, this felt very hacky as well.

nexovec commented 3 months ago

Thanks for the response @elee1766,

Postgres can't handle 10K inserts/sec., I absolutely need to batch. This is quite a bit more extra plumbing than I'd like. Especially because there's so many ways to implement this.

imo. this is a fairly common thing you'd do with a job queue. Should I throw together a feature request?

Do you think it's a bad idea to try to use river as an event bus?

brandur commented 3 months ago

imo. this is a fairly common thing you'd do with a job queue. Should I throw together a feature request?

I don't think I'd recommend this one as a first feature. Although it's trivial to come up with an API for what a worker handling a batch would look like, it's quite a bit more complicated when it comes to figuring out a scalable and stable way to dequeue and distribute work in bulk. Getting the code and all the testing right would be a big job.

Do you think it's a bad idea to try to use river as an event bus?

TBH, sending this all through your job queue might be more risk than it's worth. A more scalable alternative would be to store the events to a separate table, and then use the job queue to run a job periodically that consumes as much work as it can from the events table and processes it all at once.

nexovec commented 3 months ago

run a job periodically that consumes as much work as it can from the events table and processes it all at once.

That is currently what I'm doing, but postgres will have nothing close to the throughput of River for single inserts, so that becomes a problem on the write side of things.

So what do I do, use pgmq?

it's quite a bit more complicated when it comes to figuring out a scalable and stable way to dequeue and distribute work in bulk.

I thought the default client already does fetch multiple jobs each time it polls.

nexovec commented 3 months ago

@brandur

it's quite a bit more complicated

My overtly naive and optimistic idea is we could modify this method to sort jobs by Kind and pass it to an implementation of the multiworker, seems pretty straightforward. What am I missing?

elee1766 commented 3 months ago

Thanks for the response @elee1766,

Postgres can't handle 10K inserts/sec., I absolutely need to batch. This is quite a bit more extra plumbing than I'd like. Especially because there's so many ways to implement this.

the insert speed has nothing to do with processing speed. you're confounding two things. using bulk insert you can insert much more than 10k/s. of course with single job insert you may run into db bottleneck ofc.

batch processing won't resolve your issue of scheduling large amount of jobs one at a time. again, river already fetches more than one job from postgres at once. I don't know why you think bulk processing will somehow allow you to ingest 10k tx/s

so you absolutely don't need to batch, I don't think it will even fix your problems if you add it.

nexovec commented 3 months ago

If I process 10K jobs at once, I can perhaps do one db commit during processing instead of 10 000(this is an example, there might be other reasons why you'd batch processing of jobs). Are we not on the same page?

elee1766 commented 3 months ago

If I process 10K jobs at once, I can perhaps do one db commit during processing instead of 10 000(this is an example, there might be other reasons why you'd batch processing of jobs). Are we not on the same page?

you can easily achieve this with

create another struct which does batch/asynchronous processing of jobs (so move your worker logic out of your worker). this way you can preserve/share state across runs. attach ref to this struct to the worker.

your db commit can exist outside of your worker, you do not have to commit once per work completion.

you can implement this "batching" feature that you want with an in-memory queue that processes its entries either when it gets too large, or when a certain amount of time has passed. the river job simply adds to the queue, and returns when the batch is processed (from queue). this would allow you to retry only the jobs in a batch which failed, which is more efficient.

simply completing the individual river jobs - postgres has no problem doing thousands of rps here.

if you have proof you are bottleneck by river job completion, that would be surprising to me (maybe this is why we are not on same page). since we haven't run into it. we find it much faster than job inserts and not an issue (from tracing). I was under the impression when you said inserts, you meant individual inserts from your job results (not the insert/update for job completion)

the only real way i can think of to truly batch "completion" if you are really running into that as a bottleneck is to use COPY FROM to send the records for completed jobs to a temporary table, then do the mutation between it and the jobs table, but this is not trivial. otherwise, the only thing that could be done is to use the pgx batch feature with the existing query that processes job by job, which would saves network round trips (not relevant at scale), but not the individual updates from postgres, so it's not changing anything. basically, i think you need a complete overhaul of job completion and a completely new code path if you want to actually get over the performance problem of "i can't update completed job status fast enough", because existing one will not be able to avoid 1 update per job completed, even if you batch.

both methods of batching also add some more complexity, as there is a higher chance a job in a batch will be seen as stuck when it isn't actually stuck, although this is user error, it would be very weird DX to have to increase the timeout/rescue thresholds because your batches become big - and these parameters would have to be tuned per database.

FWIW: we don't use river as an event bus, but instead have a nats jetstream cluster. but we use that for notifications, not data processing. all data processing we have goes through river and its constantly sustaining 1krps, with bursts into the many thousands, with the major bottleneck being job insert speed.

brandur commented 3 months ago

My overtly naive and optimistic idea is we could modify this method to sort jobs by Kind and pass it to an implementation of the multiworker, seems pretty straightforward. What am I missing?

Presumably you'd try to make other optimizations at the same time like that if one client was going to process a batch job, it should try to select as many of those same event as possible (as opposed to other clients getting them) so that as many as possible could be processed in a single batch.

nexovec commented 3 months ago

Can that optimization not be done completely ad hoc? This feature seems doable, but you seem to not like the idea at all... If the captain is not on board, I can't be either.

This feature likely joins the 18 years of my backlog.

bgentry commented 3 months ago

To be clear we do think this could be a useful addition to River at some point, but it won't be trivial to implement well. In fact we've discussed it on numerous occasions over the last year.

nexovec commented 3 months ago

Please just make sure to mention it in issues if any of you started working on this, so we avoid duplicate work.