taskforcesh / bullmq

BullMQ - Message Queue and Batch processing for NodeJS and Python based on Redis
https://bullmq.io
MIT License
5.76k stars 374 forks source link

Ensuring atomicity with horizontally scaling #2239

Open kraikov opened 9 months ago

kraikov commented 9 months ago

In documentation (https://docs.bullmq.io/guide/jobs/repeatable) it is mentioned that:

For instance, let's say that you have a job that is repeated every second, but the process of the job itself takes 5 seconds. As explained above, repeatable jobs are just delayed jobs, so this means that the next repeatable job will be added as soon as the next job is starting to be processed.

However, I am seeking guidance on ensuring that the next execution isn't triggered until the previous one has completed, especially when we have horizontally scaled instances.

Here's the context: Currently, we manage task scheduling using cron jobs and distributed locks by locking a database table. I am exploring the possibility of migrating to using repeatable jobs in BullMQ. The challenge we face is that many tasks run at 5-second intervals, but the runtime of each task varies unpredictably. Some tasks may take 20 seconds, while others may take 1 minute.

To address this challenge, I want to:

I would greatly appreciate any guidance or recommendations on how to achieve this level of control and atomicity when working with repeatable jobs in a horizontally scaled environment. Thanks in advance.

manast commented 9 months ago

If you can check if the previous job for a given repeatable job has not done its work yet, then you could just return from that job doing nothing. I think that would be the easiest way to accomplish what you are trying to do, and it should scale without problems.

kraikov commented 9 months ago

@manast thank you for the suggestion. I was also considering similar solution, however I did not found a way to check wether a repeatable job has completed or not.

AFAIK, the only way to get the repeatable jobs is through getRepeatableJobs and the returned object is

{
    endDate: number;
    id: string;
    key: string;
    name: string;
    next: number;
    pattern: string;
    tz: string;
}
manast commented 9 months ago

I was thinking more along the lines of checking your service if the previous job is being still processed or not, as if you have some database field that you can use for that matter.

kraikov commented 9 months ago

I'm currently looking for an alternative to our existing implementation, which relies on a database field. I want to explore better options.

Since the next repeatable job is created at the start of the current one (according to the docs), I'm wondering if it's possible to include a reference to the current job in the next one (job id should be sufficient?). This way, one can easily check if the previous job has finished.

manast commented 9 months ago

If we were going to make a change like that in BullMQ I would prefer to have a different repeat setting that creates the next repetition at the current job's completion or failure.

manast commented 9 months ago

There are probably some edge cases in such an approach though, I have to think more about it.

kraikov commented 9 months ago

I'd love to try and contribute if you believe such a feature would be feasible and beneficial.

manast commented 9 months ago

With such an approach there is also the case where the job could get stalled (if the worker died in the middle of the processing for example), but sure, if the job eventually completes or fails, the job for the next iteration would be created. One difference though is that it must be created in the moveToFinish lua script, as it would be the only way to guarantee that if a job finishes, the next iteration will be generated.

manast commented 9 months ago

Or even the case where the job is manually removed, in that case, there would be no more iterations.

nullndr commented 9 months ago

Related to https://github.com/taskforcesh/bullmq/discussions/2053

kraikov commented 9 months ago

I've identified a potential temporary workaround that appears to be effective in most cases: