taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

Scheduler can send task twice #296

Open sector119 opened 4 months ago

sector119 commented 4 months ago

Hello

In run_scheduler_loop function in next_minute calculation if datetime.now() = 17:22:59.55555 and we replace seconds and microseconds with 0 and add timedelta(minutes=1) we get next_minute=17:23:00 and when we calculate delay with delay = next_minute - datetime.now(), current time can be > or just a bit lower than next_minute, so we get negative delay like -0.000299 or positive like 0.000373

Don't you think that we should check delay like:

delay = (next_minute - datetime.now()).total_seconds()

if int(delay) <= 0:
    delay = 60.0

await asyncio.sleep(delay)

Logs:

Next minute: 2024-02-22 14:13:00 Now: 2024-02-22 14:13:00.000299 Delay: -1 day, 23:59:59.999701 Delay total seconds: -0.000299 [2024-02-22 14:13:00,000][INFO ][run:delayed_send:130] Sending task billing:sync-payments.

Next minute: 2024-02-22 14:14:00 Now: 2024-02-22 14:13:00.001998 Delay: 0:00:59.998002 Delay total seconds: 59.998002 [2024-02-22 14:13:00,002][INFO ][run:delayed_send:130] Sending task billing:sync-payments.

s3rius commented 2 months ago

Actually you're right, but I think it can be resolved by calculating next_minute right after sending all tasks.

diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py
index 6a17a11..73f06f8 100644
--- a/taskiq/cli/scheduler/run.py
+++ b/taskiq/cli/scheduler/run.py
@@ -144,9 +144,6 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
     running_schedules = set()
     while True:
         # We use this method to correctly sleep for one minute.
-        next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
-            minutes=1,
-        )
         scheduled_tasks = await get_all_schedules(scheduler)
         for source, task_list in scheduled_tasks.items():
             for task in task_list:
@@ -165,7 +162,9 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
                     )
                     running_schedules.add(send_task)
                     send_task.add_done_callback(running_schedules.discard)
-
+        next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
+            minutes=1,
+        )
         delay = next_minute - datetime.now()
         await asyncio.sleep(delay.total_seconds())

In this setup it would be impossible to get into the situation with negative number of seconds. What do you think?