film42 / sidekiq-rs

A port of sidekiq to rust using tokio
MIT License
95 stars 10 forks source link

Strategies for Repeating Execution of Periodic Tasks #47

Open mrxsisyphus opened 1 week ago

mrxsisyphus commented 1 week ago

Hello, I would like to inquire about the repeating execution of periodic tasks: Assuming I have a scheduled task that runs every 10 seconds. Under normal circumstances, the task completes within the 10-second interval, which works well. However, in certain situations where the task exceeds the 10-second limit and the subsequent task is set to start, the current implementation appears to allow the ongoing task to continue execution. At such times, it is often necessary to just skip the execution or implement alternative strategies. I think the coalesce and misfire_grace_time parameters of Python APScheduler may provide a good insight.

APScheduler

film42 commented 1 week ago

A few thoughts here:

  1. I like the idea of a misfire_grace_time where we know the interval of the period job and last target execution time.
  2. Should a job start on time but continue running longer than the interval (maybe it polls an API and the API is taking a long time to respond), we could offer a baked in ExecutionTimeout middleware that obeys a jobs configuration or something. It would then cancel the runtime execution of the job should it exceed that duration.

This would provide building blocks where you could combine (1) and (2) if you need more fine-grained control over execution.


I could maybe even make a smaller cut by providing the timestamp it was scheduled for allowing you to run or not from the worker code itself. I'd need to pull the run timestamp from redis to impl (1) which I'm not yet doing but it's simple to add. If I'm doing that, maybe it's easier to just pass it down on the Job struct.

Either way, these are good suggestions!

mrxsisyphus commented 1 week ago

Thank you for your quick reply. I still have some problems about above answers:

  1. (2) Here it says "cancel execution." Does this mean cancel the instance of the task that has exceeded the interval, or the next instance to be executed?
  2. Looking from APS library, the execution of the next task is determined by the execution of the previous task. So it's quite easy to implement a strategy like misfire_grace_time:

https://github.com/agronholm/apscheduler/blob/d5266c8e670f0817d36b3d02caf61d24fa4ac7ce/src/apscheduler/_schedulers/async_.py#L915

The current approach is that the execution of the previous and next tasks are not related. After getting the periodical tasks that meet the conditions from the Redis, they are directly pushed into the task queue. Here are some approaches I think of:

  1. Just like APS, determine the execution time of the next task after the last task runs out.
  2. Maintain the current approach, but make a judgement before the task is pushed into the queue. If instances of the current task are still running, then just update the score and cancel the push.
  3. Enhance the current middleware system. All instances of tasks are still pushed into the queue. However, the middleware can sense the situation of the previous instances of current task, to decide whether or not to execute this one.
film42 commented 1 week ago

(2) Here it says "cancel execution." Does this mean cancel the instance of the task that has exceeded the interval, or the next instance to be executed?

I was just wondering if having a generic middleware to prevent unbounded job duration would be helpful. This is something anyone can write today for themselves but if there was a need to bundle one with the crate, we can. Let's say out of scope for this ticket.

All instances of tasks are still pushed into the queue. However, the middleware can sense the situation of the previous instances of current task, to decide whether or not to execute this one.

With the current Job struct you cannot see the expected execution time (or the schedule itself like you can with APS). However, I can push this information through so you can see it and write something for your use case.

Btw, this sounds like what I've used unique jobs for (https://github.com/sidekiq/sidekiq/wiki/Ent-Unique-Jobs) in the past. You can have a cron job run once a minute and then have a uniqe_for: 59.seconds (something like that) so only one job runs within that interval. It's not quite the same as APS but I'm thinking in terms of what are the minimal changes needed to enable the most flexibility, if that makes sense.

For example, you could build a poor man's middleware uniqueness lock, even only having it target a job.class == MyWorker::class_name().

1. -- Server Middleware Start
2. count = INCR <lock_key> EX <unique_for_seconds>
3. execute_job() if count == 1 else skip
4. DEL <lock_key>
5. -- Server Middleware End

NOTE: <lock_key> could be format!("unique_lock::{job.class}") and <unique_for_seconds> could be 59 seconds

That won't prevent the jobs from being inserted in the first place. Sidekiq ruby has a client middleware that makes doing stuff like this a little bit easier, but it will work if you're only looking to limit periodic jobs if another job is already running.

mrxsisyphus commented 1 week ago

Thank you for your quick response. Using 'unique_for' + 'middleware' is a good method. I think the real key to this approach is how to set 'unique_for_seconds'. We can estimate the expected execution time for the current task by obtaining the next execution time for the current task from the Job object (or other methods), and then subtract the current time from it. But this time alone is not enough, because under normal circumstances, for periodic tasks, the time gap between the previous and the next task instances equals this time. Therefore, we also need to add a certain time offset. For example, if offseted time is precisely twice the expected execution time, the next periodic task can be controlled to not run within twice the expected time.