riverqueue / river

Fast and reliable background jobs in Go
https://riverqueue.com
Mozilla Public License 2.0
3.22k stars 86 forks source link

can't configure RescueStuckJobsAfter for an individual job #347

Open elee1766 opened 3 months ago

elee1766 commented 3 months ago

we have some jobs that take 1-2 minutes, and other jobs that takes at most 5 seconds, and so we set worker timeouts accordingly

however, we can't configure RescueStuckJobsAfter for an individual job. this means that when a worker running the 5 seconds task dies, we need to wait for the global RescueStuckJobsAfter, instead of what should be 5-10 seconds. this means that updates that should be happening around once a second pause for a few minutes instead of a few seconds.

we schedule with a unique constraint because this task runs on every event, but sometimes events can be up to 10 per second, and we are okay with not scheduling the task if it is already running since it is an aggregate and will just be picked up on the next unique task. if we schedule too many jobs, we will not only schedule more than we can process, but be doing extra work, and need to worry about race conditions where tasks that started earlier finish later.

one possible solution is to read the jobs table during InsertTx, and check using scheduled_time if the job is stuck ourselves, and manually fix it, but i'm not sure if that's allowed. for now i am checking the scheduled_time in a loop in my go application to manually rescue the task as this one specific task is critical to the movement of the rest of the applications.

a deadline feature in InsertOpts i think would make me really happy, as it would also resolve other issues im having to work around. workers should discard jobs that they pick up that are past their deadline, and a maintenance job which "discards" tasks that are after their deadline could run on a regular interval.

the problem is adjacent to something that @bgentry mentioned in a previous reply to me in https://github.com/riverqueue/river/issues/336#issuecomment-2093895895 . This is actually a different summary job in a different app, we deal with a lot of data that needs to be updated in this manner.

It also sounds like you’re looking to ensure that only one of these summary jobs is running at a given moment. This is also something there’s no official support for, though you can hack it in with your own locking mechanism. However it’s something we have thought about and hope to implement.

we have had success using some hacky locking mechanisms for our other tasks, however in this case we are trying using the job=running as our locking mechanism, and the issue is that it takes too long for a dying worker to "abandon" their lock.

for reference, the system we are migrating from used a user defined hash as an optional uniqueness key with support for a job timeout and deadline on the job level. the key would be locked when scheduled, and unlocked at deadline or job completion. we could bring that idea back and use it along with river.

this calls back to what i was told to do here: https://github.com/riverqueue/river/discussions/346

and also very much relates to this problem: https://github.com/riverqueue/river/issues/165

In your case, an alternative: drop the uniqueness checks and then implement your job such that it checks on start up the last time its data was updated. If the update was very recent, it falls through with a no op. So you'd still be inserting lots of jobs, but most of them wouldn't be doing any work, and you wouldn't suffer the unique performance penalty.

just checking if the job is running using my own locks table.

that said, these restrictions + the incredibly slow insert speed (100-200rps) with unique makes me feel like the current "unique constraint" feature is missing a lot of power.

I am feeling like my best course of action at this point given the current state of Unique to move forward with river is to build a layer on top of river that deals a little better with these more complex scheduling constraints.

brandur commented 3 months ago

Hm, okay, quite a lot of content here, but speaking to the headline issue: I spoke to @bgentry a little about this, and it does feel like a per-job customizable RescueStuckJobsAfter is just a little too specific of a feature for anything even close to a general case. As in, I'd expect 99.99% of users wouldn't need it.

One point of clarification: ideally in a perfectly functioning system, RescueStuckJobsAfter is never used. Remember that this is basically only there to cover the case of an unexpected crash that might occur during something like a panic. If jobs return an error instead like they're supposed to, they're rescheduled immediately, and never need to be rescued.

a deadline feature in InsertOpts i think would make me really happy, as it would also resolve other issues im having to work around. workers should discard jobs that they pick up that are past their deadline, and a maintenance job which "discards" tasks that are after their deadline could run on a regular interval.

This is interesting, but my first reaction against is somewhat similar to the above: it'd be such a specific feature that I think we'd want some very strong rationale for why it should be included in the default API.

It's worth remembering too that River could be quite easily be augmented with something like a common job bootstrap where project-specific functionality could be customized to your heart's desire. e.g.

type MyProjectCommonArgs struct {
    Deadline time.Time `json:"deadline"`
}

func (a *MyProjectCommonArgs) GetCommonArgs() *MyProjectCommonArgs {
    return a
}

type WithCommonArgs interface {
    GetCommonArgs() *MyProjectCommonArgs
}

type SortArgs struct {
    MyProjectCommonArgs

    // Strings is a slice of strings to sort.
    Strings []string `json:"strings"`
}

func (SortArgs) Kind() string { return "sort" }
func CommonJobBootstrap[T WithCommonArgs](ctx context.Context, args T, workFunc func(args T) error) error {
    commonArgs := args.GetCommonArgs()

    if commonArgs.Deadline.Before(time.Now()) {
        return errors.New("deadline exceeded")
    }

    return workFunc(args)
}

type SortWorker struct {
    // An embedded WorkerDefaults sets up default methods to fulfill the rest of
    // the Worker interface:
    river.WorkerDefaults[SortArgs]
}

func (w *SortWorker) Work(ctx context.Context, job *river.Job[SortArgs]) error {
    return CommonJobBootstrap(ctx, args, func(args SortArgs) error {
        sort.Strings(job.Args.Strings)
        fmt.Printf("Sorted strings: %+v\n", job.Args.Strings)
        return nil
    }
}
elee1766 commented 3 months ago

One point of clarification: ideally in a perfectly functioning system, RescueStuckJobsAfter is never used. Remember that this is basically only there to cover the case of an unexpected crash that might occur during something like a panic. If jobs return an error instead like they're supposed to, they're rescheduled immediately, and never need to be rescued.

We lose jobs when the program panicking in a separate code path than the job. I see this as an unavoidable problem - yes you would like it to not happen - but iterating fast means it will, and we need to be defensive when they happen.

We seem to be also losing jobs when we lose connection to the database - even if the worker itself does not die, for instance if the database restarts due to crash, or when the patroni cluster goes through re-election. I don't know if this is intended or a bug/race in river.

For instance, if we are panicking every 30 minutes, the result should be a slight service interruptions every 30 minutes, not the entire application coming to a halt because it can't process the latest task. the job queue is a tool that is meant to help us meet these ends. scheduling jobs means that you can have workers crash and jobs still be processed by other workers - that is in my opinion, one of the major features of the job queue.

that is - when i chose to add a job queue to my application - it is because i want this sort of protection (one bad worker doesn't stall everything), and features such as retry, uniqueness, etc, are to help facilitate this.

deadline might not be the solution - but i think that stuck jobs are an unavoidable problem that can't be hand-waved, and how a specific job queue system deals with stuck jobs is a key differentiating factor.

It's worth remembering too that River could be quite easily be augmented with something like a common job bootstrap where project-specific functionality could be customized to your heart's desire. e.g.

this is how the hack we are using to implement our own locking mechanisms that i mentioned is being done. I was just trying to avoid making one for this job.

and it does feel like a per-job customizable RescueStuckJobsAfter is just a little too specific of a feature for anything even close to a general case. As in, I'd expect 99.99% of users wouldn't need it.

Yeah, i would tend to agree, hence floating the idea of a deadline i have seen in other queues.

This is interesting, but my first reaction against is somewhat similar to the above: it'd be such a specific feature that I think we'd want some very strong rationale for why it should be included in the default API.

Would it break river to run my own CleanStuckJobs task, putting deadlines per task and processing myself? To me adding this sort of "extension" to river would be preferred if possible, I'm just not too sure how well river plays around with other things modifying its tables.

ultimately im just looking to schedule my own maintenance task, which I believe i can just do with a periodicjob.

brandur commented 3 months ago

Okay, I think I'd have to see this all in action to wrap my head around it, but I can't shake the feeling that the idea of rescuing specific types of jobs more quickly is the right answer.

I run a big Go app at work and we treat every bug that's resulting in a panic as priority 0 in terms of triage. Our API runs on literally two Go processes, so if even one of them is panicking, that's 50% of our capacity gone right there. Our background worker is 1x Go process. If it's panicking, that's 100% of our background work capacity gone. The acceptable number of expected panics at steady state is exactly zero — no other number is tolerable.

That might sound pie-in-the-sky, and I think in most languages it would be, but in Go because everything uses error-based returns, we find that it's pretty doable. One comes up maybe 2-3 times a year (usually through a start up or configuration bug) and we have each one squashed inside of a day.

Would it break river to run my own CleanStuckJobs task, putting deadlines per task and processing myself? To me adding this sort of "extension" to river would be preferred if possible, I'm just not too sure how well river plays around with other things modifying its tables.

You'd probably have to try it to know for sure, but conceptually, I think it should be okay. River's maintenance tasks wrap all their operations in a transaction, so there shouldn't be any data consistency type bugs that are possible. The one thing to watch out for I suppose is not putting job rows into impossible states that River might make assumptions about — e.g. if attempts > 0 then it'd expect attempted_at to always have a non-null value, and something might break in case it didn't. That sort of thing.

bgentry commented 3 months ago

Would it break river to run my own CleanStuckJobs task, putting deadlines per task and processing myself? To me adding this sort of "extension" to river would be preferred if possible, I'm just not too sure how well river plays around with other things modifying its tables.

It should be ok to do this, but I would caution you to be careful to have the task operate in a manner similar to River's own stuck job rescuer, at least in terms of the updates it makes to jobs. I believe a lot of the foot guns are prevented by means of proper database constraints on the river_job table, but there's still a chance something is missing there that would allow you to put a job into an invalid state.