taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

Periodic tasks on top of the per-second main loop runner #324

Open gencurrent opened 2 months ago

gencurrent commented 2 months ago

Updates

1) Per-second main event loop cycle; 2) ScheduledTask.period field is added; 3) Periodic tasks execution feature has been added.

Design ways

  1. The current approach with iterating over scheduled tasks each second.
  2. The 2-stages approach: a. Iterate over each minute as in the AS-IS solution. b. Add a second 60-seconds over-each-second iteration cycle to iterates through to be launched each minute. The second approach looks like:
    while True:
    ...
    next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(minutes=1)
    while datetime.now().replace(second=0, microsecond=0) != next_minute:
    ...
    next_second = datetime.now().replace(microsecond=0) + timedelta(seconds=1)
    await asyncio.sleep((next_second - datetime.now()).total_seconds())

Notes:

The example to run the feature against is main_test.py in the root directory:

# # broker.py
import asyncio
from taskiq.brokers.inmemory_broker import InMemoryBroker

from taskiq.schedule_sources import LabelScheduleSource
from taskiq import TaskiqScheduler

broker = InMemoryBroker()

scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)

@broker.task(schedule=[{"cron": "* * * * *", "args": [1]}])
async def each_minute_cron(value: int) -> int:
    print(f"The {each_minute_cron.__qualname__} task has been launched")
    await asyncio.sleep(0.5)
    return value + 1

@broker.task(schedule=[{"period": 2, "args": [1]}])
async def each_2_seconds_task(value: int) -> int:
    print(f"The {each_2_seconds_task.__qualname__} task has been launched")
    await asyncio.sleep(0.5)
    return value + 1