taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

Run missed task from persistent storage #328

Open Cheesy008 opened 1 month ago

Cheesy008 commented 1 month ago

Hello. Is it possible to add functionality so i could run missed tasks if scheduler was restarted? For example, task runs every 10 hours. After some amount of time when there are 10 minutes left before the task starts, the scheduler is being restarted due to some technical issues. After that the task is missed and scheduler has to start cycle from scratch. What i want to do is to store scheduler tasks in Redis and on restarts check if task is missed. I went through the source code but I couldn't find a proper way how to solve it. Can you give some recommendations please?

s3rius commented 1 month ago

Sure. The easiest way will be to create your own schedule source that checks in database what tasks have run and execute required tasks accordingly.

Cheesy008 commented 1 month ago

@s3rius I was thinking about this idea. However if i put tasks to database then after restart scheduler couldn't find out when to start missed task because it doesn't have created date. A task has crontab string which means that it has to run every N minutes/hours. And when this task will pass through the function get_task_delay it doesn't run it immediately

s3rius commented 1 month ago

You can remember when was the last run of the task and decide whether to run the task based on this value. It might be a bit tricky with cron, but should be possible.

Cheesy008 commented 1 month ago

@s3rius Where should i store this date value? I would like to put in ScheduledTask but I don't think that extending this model is a good idea. I've came up with the following logic:

  1. In the schedule soure startup method I get all the tasks and put them to Redis. I add default datetime value to each task.
  2. In the get_schedules I pull all the tasks from Redis. I calculate time of each task according to creation date, crontab and current time, and decide whether this task must run immediately by changing time field, so it could pass through get_task_delay. If time doesn't expire then it will work as it is.
  3. In the post_send method I delete the task from Redis and set the new one with current timestamp.

Maybe I overcomplecated some things or came up with the wrong idea. Would be very grateful to see your comment