Open stinovlas opened 10 months ago
I totally agree that for some systems that might be the case. And here's what I think. We have middlewares and post_send
events on each scheduler. You can create a simple storage that might be attached to all schedule sources.
import datetime
from typing import Any, Coroutine, List
from redis.asyncio import Redis
from taskiq import ScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.utils import maybe_awaitable
class ScheduleProtector(ScheduleSource):
def __init__(self, redis_url: str, source: ScheduleSource) -> None:
self.redis = Redis.from_url(redis_url)
self.source = source
def get_schedules(self) -> Coroutine[Any, Any, List[ScheduledTask]]:
return self.source.get_schedules()
def add_schedule(self, schedule: ScheduledTask) -> Coroutine[Any, Any, None]:
return self.source.add_schedule(schedule)
def delete_schedule(self, schedule_id: str) -> Coroutine[Any, Any, None]:
return self.source.delete_schedule(schedule_id)
async def pre_send(self, task: ScheduledTask) -> Coroutine[Any, Any, None]:
res = await self.redis.get(task.schedule_id)
if res is not None:
try:
ts = int(res)
if ts > int(datetime.datetime.utcnow().timestamp()):
raise RuntimeError("The task has already been sent.")
except ValueError:
pass
return await maybe_awaitable(self.source.pre_send(task))
async def post_send(self, task: ScheduledTask) -> None:
ts = datetime.datetime.utcnow().timestamp()
await self.redis.set(task.schedule_id, int(ts), ex=59)
return await maybe_awaitable(self.source.post_send(task))
It might be used like this:
labeled_source = ScheduleProtector("redis://localhost/0", LabeledSource(broker))
I've created a small example of such thing. Please try it out. If it would work for you, we might merge this thing into taskiq-redis.
Also, about the scheduler logic. It was updated a little bit. I will present these changes a bit later. Now we just ask all scheduler sources once a minute and wait for time when task should be send to send it right on time. Now code is much cleaner and comprehensive in this part of a project.
Thank you for the prompt response :-).
This example indeed mostly works (with some minor changes and corrections). What's left is catching the exception in TaskiqScheduler.on_ready
. Perhaps we could create AbortError
to signify the intention of aborting the send in pre_send
. This is not strictly necessary, but produces an uncaught exception otherwise.
I don't think that ts > int(datetime.datetime.utcnow().timestamp())
can ever happen on scheduler restart. The stored value always has to be lower than current time, since we don't store value higher than current time. This might work:
async def pre_send(self, task: ScheduledTask) -> None:
res = await self.redis.get(task.schedule_id)
if res is not None:
try:
ts = int(res)
if ts > datetime.datetime.now(tz=datetime.timezone.utc).replace(second=0, microsecond=0).timestamp():
raise RuntimeError("The task has already been sent.")
except ValueError:
pass
return await maybe_awaitable(self.source.pre_send(task))
Also, some kind of prefix would be in order if this is added to taskiq_redis
so we don't populate redis with schedule ids in a global namespace. I'd advise to use datetime.datetime.now(tz=datetime.timezone.utc)
instead of datetime.utcnow()
, because the latter doesn't do what most people expect on server with timezone different from UTC.
If the implementation in taskiq_redis
is built over taskiq_redis.RedisScheduleSource
, we might actually reuse connection_pool
from the source instead of creating a new one in the protector.
I'm available for implementation in taskiq_redis
and/or code review, if you want :-).
I totally agree with your statements. It was just a draft implementation. Please feel free to add this thing in taskiq-redis project. I will review your PR.
As far as I know, current design of scheduler works like this:
This creates a possible duplication of tasks. If I restart scheduler in a minute where there is a cron scheduled task, this task is sent twice (once before the restart and once after). On the other hand, it allows the scheduler to be restarted without missing a task, assuming the restart takes less then one minute to perform (which is quite reasonable assumption for scheduler).
Ideally, tasks should be idempotent, but that's not always possible. It would be great if taskiq kept in mind whether particular task has already been sent to worker queue in one minute and if so, wouldn't send it again after restart. This is obviously not possible with
LabelScheduleSource
since it doesn't have any permanent storage, but it might be possible withtaskiq_redis.RedisScheduleSource
.What do you think about this? Should I propose an enhancement in
taskiq_redis
? Does something need to be done intaskiq
itself?