riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.34k stars 89 forks source link

[FEATURE REQUEST] Jobs sequential run #551

Open krhubert opened 3 weeks ago

krhubert commented 3 weeks ago

I haven't seen this in the api/docs, so please let me know if this is currently possible or not.

Description Imagine there's a shopping mall with only one unloading point. Trucks (jobs) are queued to unload the cargo one by one. This can't be done in parallel because the current infrastructure does not allow that.

Tech description Allow adding jobs that are guaranteed to run sequentially. The jobs "sequentiality" might be configured similar to jobs uniqueness - ByArgs, ByPeriod, ByQueue, ByState or ByKey (see below). The difference between sequential and unique is that sequential insert guarantees to add job to a queue but do not execute it until all jobs from the same "pool" are not completed.

API proposal There are two approaches to add ByKey API. One keeps it simple and only allows string as a key. The other can use generics but here the insert API becomes more complex .

type SequentialOpts struct {
    ByKey string
}

type SequentialOpts[K comparable] struct {
    ByKey K
}

type OrganizationArgs struct {
    OrganizationId int
    // other args relevant to this task 
}

func (OrganizationArgs ) Kind() string { return "organization_task" }

func (a OrganizationArgs) InsertOpts() river.InsertOpts {
    return river.InsertOpts{
        SequentialOpts: river.SequentialOpts{
            ByKey:  strconv.Itoa(a.OrganizationId) ,
        },
    }
}

Open questions

  1. should the order of jobs execution be controlled or not. It can be FIFO, LIFO, random or custom-defined.
  2. Maybe InsertOpts.Tags can handle this functionality. Docs says there are only for uses for grouping, maybe this field can be "promoted"

Workaround I can use a distributed-lock (redis/postgresql) based. If a job with a given arg starts to execute, I can try to obtain the lock, and if that's not possible, then snooze the job for a specific duration.

brandur commented 3 weeks ago

Yeah, +1. I think we're going to implement this, but it's a little trickier because it requires some more invasive changes to the locking queries. It's on my list of next ~2-3 features to do.

Similar to:

https://github.com/riverqueue/river/discussions/431

krhubert commented 3 weeks ago

Do you have a rough ETA for this one?

brandur commented 3 weeks ago

We've been pretty bad historically at delivering precise estimates on that sort of thing. I'd say a major feature usually takes ~2 weeks including reviews and everything, and this is 2-3 away, so maybe a month optimistically or two less optimistically?

That said, I'm going to look a little closer at it, and if I can spot a way to do it relatively easily, I might take it as the next feature after batch processing, so maybeee sooner.

krhubert commented 3 weeks ago

Thanks, this is great!

mauza commented 3 days ago

+1 to this feature. We are loving river here at my job but this is holding up our adoption a little. We want to architect our jobs to be very small and allow for serial execution based on an id. Thanks for accepting this and we look forward to any progress.

bgentry commented 2 days ago

@mauza can you give any more info about how you’d want to use this? There are sort of two features conflated together here and I’d like to understand which one you’re more interested in or if you need to use them together for some reason:

  1. Ability to ensure that for a given partition key, the jobs execute one at a time and in the order they were inserted.
  2. Ensuring that only one job (or some larger number of jobs) with a given partition key can run at once globally, with no particular consideration given to a precise sequence order.