procrastinate-org / procrastinate

PostgreSQL-based Task Queue for Python
https://procrastinate.readthedocs.io/
MIT License
800 stars 52 forks source link

Worker schedules #1105

Closed medihack closed 1 week ago

medihack commented 1 week ago

Another feature that would be nice to have in Procrastinate is to allow workers to process jobs only in specific time slots or on specific days (e.g., only on the weekend).

Here's some background on why this feature is quite important for us: We transfer large radiological image data in Procrastinate jobs for research purposes in parallel. As we don't want to stress the clinical infrastructure, we want to run some workers only at night or on the weekends.

I thought about maybe a command-line option like --schedule with a Cron-like argument, e.g. procrastinate --app=dotted.path.to.app worker --schedule "* 1-5 * * *" to process jobs at night only between 1 and 5 o'clock (croniter can be used to check that with the match method).

One thing to keep in mind in such a scenario is how to handle periodic tasks by such a scheduled worker (especially when those have other schedules than the worker itself).

It's nothing that I would be working on right away, but in the longer term (and also after #933 is done).

ewjoachim commented 1 week ago

I wonder if this should be part of procrastinate itself. I could see multiple ways of achieving this with what's avaliable already, I'll try to take the time to expand on this later, but if it appears this is already doable without too much hacking, it's worth discussing whether we want to add it or not.

This is not a "no", it's really just a "I need to be convinced we can't easily do it yet"

medihack commented 1 week ago

Sure, no hurry. I also thought about handling this in the user space with task scheduling. But it's not guaranteed that the task is picked up in some specific time slot. So, there must also be a custom retry strategy. This alone sounds more complicated to me. I also dislike that this way, a job (in a worst-case scenario) may be picked up several times just to be delayed. Then there is still the scenario of how to ensure that, for example, only three jobs of that task are processed in parallel in the daytime and eight at nighttime. I don't see a solution for that right now (even with locking). On the other hand, this would be easily doable with workers that support some kind of scheduling, e.g., with Docker Swarm in our case (an unscheduled worker service with 3 replicas and a scheduled worker service with 8 replicas). It's also not that hard to implement on the worker side (especially as we already have croniter as a dependency).

ewjoachim commented 1 week ago

This is blind code (not on my phone this time, but still didn't execute it)

from __future__ import annotations

import asyncio
import dataclasses
import datetime

import procrastinate

@dataclasses.dataclass
class ScheduleItem:
    start_time: datetime.time
    queues: list[str]

def get_queues_at_time(
    schedule: list[ScheduleItem], dt: datetime.datetime
) -> list[str]:
    time = dt.time()
    for item in schedule:
        if item.start_time <= time:
            return item.queues
    return []

def get_next_schedule_item(
    schedule: list[ScheduleItem], dt: datetime.datetime
) -> tuple[datetime.datetime, list[str]]:
    time = dt.time()
    for item in schedule:
        if item.start_time > time:
            return datetime.datetime.combine(dt.date(), item.start_time), item.queues

    item = schedule[0]
    return datetime.datetime.combine(
        dt.date() + datetime.timedelta(days=1), item.start_time
    ), item.queues

async def run_worker_with_schedule(
    app: procrastinate.App,
    schedule: list[ScheduleItem],
):
    current_queues = get_queues_at_time(schedule=schedule, dt=datetime.datetime.now())
    while True:
        next_run, next_queues = get_next_schedule_item(
            schedule=schedule, dt=datetime.datetime.now()
        )
        worker = app.run_worker_async(queues=current_queues)
        try:
            await asyncio.wait_for(
                worker,
                timeout=(next_run - datetime.datetime.now()).total_seconds(),
            )
        except asyncio.TimeoutError:
            # gracefully wait for the worker to end current tasks
            await worker
        current_queues = next_queues

if __name__ == "__main__":
    import logging

    logging.basicConfig(level=logging.DEBUG)

    app = procrastinate.App()
    app.add_queue("default")
    app.add_queue("low")

    schedule = [
        ScheduleItem(start_time=datetime.time(8, 0), queues=["default"]),
        ScheduleItem(start_time=datetime.time(12, 0), queues=["low"]),
        ScheduleItem(start_time=datetime.time(16, 0), queues=["default"]),
    ]

    asyncio.run(run_worker_with_schedule(app=app, schedule=schedule))
ewjoachim commented 1 week ago

Another possibility would be to have the specific jobs you want use queues listened on by dedicated workers with autoscaling rules that switch those workers on and off at specific time. No worker, no task. https://docs.aws.amazon.com/autoscaling/ec2/userguide/ec2-auto-scaling-scheduled-scaling.html for AWS for example, I guess you'll find similar concepts everywhere.

medihack commented 1 week ago

Scheduled scaling can be problematic in my situation, as I don't want a long-running task to be terminated. I just want to ensure that a task is only started in a specific time slot (and then it should run as long as it takes). I guess there seems to be a similar problem with the above code (unfortunately, I don't fully get the scheduling there 😞), as there is some explicit timeout, too. What I am looking for is to tell a worker an explicit date/time range when it should fetch the next job and outside this date/time range should just wait. But maybe this is too application specific πŸ˜ͺ.

ewjoachim commented 1 week ago

No, the timeout in the code above doesn't actually stop the worker. After the "timeout", we tell the worker to stop, and it will stop gracefully (don't accept new task, wait until remaining tasks end, stop). As long as we don't cancel again while it's cancelling, I'm pretty sure it won't stop a running task.

Regarding the code note being clear: sorry about that. I'll explicit it here.

I don't know how familiar you are with asyncio, so I'll detail things here. Sorry if this too much of an asyncio 101, please don't take it as an assumption that I know better πŸ˜…. It did take me a long time to form a correct mental model of the things here, and I'm still learning a lot. Everything I'm saying should match what you can find in the docs.

Asyncio tasks

So you can create a coroutine with async def, and await it with await, this part is usually the one that is the most visible and the most well understood. But if we want to achieve the promise of asyncio, namely concurrency, we're missing an important primitive: the ability to create asyncio tasks. asyncio tasks are a way to declare you want a coroutine to run in the event loop but you don't want the calling code to be stuck on the same line while waiting for this coroutine to end (like what would happen if you awaited the coroutine).

This means you might create multiple tasks that will do things concurrently, and any time any of the tasks or your main code would await something, you'd give back control to the event loop that would run a part of one of the tasks (or the running code).

Note that you may await a task: in this case, it mean you really want to wait until the underlying coroutine finishes.

asyncio provides synchronization primitives such as asyncio.gather() when you need to wait for a bunch of tasks to end.

Timeouts and cancellation

One very useful thing you can do with coroutines and tasks is asyncio.wait_for: this lets you wait for a coroutine, but if it lasts more than a certain time, it's cancelled.

Cancelling a coroutine means injecting a CancelledError: the next time it awakes from await, instead of executing the following statement, it will raise. The idea of cancellation is that it's supposed to be something you're you're able to handle gracefully. Having an exception be raised from anywhere in your code would be hard to handle: you'd need to try/except every single await statement, and it would be very tedious, or you'd need to put a single try/except at the top of your coroutine, but by the time you've raised up that point, you've completely lost your context, so it defeats the purpose of gracefully doing anything.

Fortunately, there's a mechanism for handling this: shielding. By calling asyncio.shield() on a coroutine, you're wrapping your coroutine in another coroutine that will receive CancelledError when cancelled but not pass it below. This way, the code that calls shield may handle the CancelledError by signalling to the underlying coroutine (e.g. using synchronisation mechanisms such as Events) that it needs to stop ASAP

Procrastinate

That's all nice, what does it have to do with anything?

It turns out that 1/ the worker has a builtin way of signalling it's expected to stop gracefully asap: worker.stop. Also, app.run_worker_async() has code in place so that cancelling it once calls .stop() and then waits for the worker to stop.

Those 2 elements means that if you call coro = app.run_worker_async(); asyncio.wait_for(coro, timeout), you'll run the worker for timeout, then request it to stop, and if you then await coro, you'll wait for it to actually stop (if you wanted to use a timeout to avoid waiting forever for the worker to stop, you'd need a second asyncio.wait_for)

The script

Last element of the puzzle: what does the rest of the script do?

I'm pretty sure there should be a smarter way to express the schedule, but the important part of the script is the idea of running the worker for a given time, stopping it gracefully, and rerunning with a different list of queues. that part is achieved just a few lines, and doesn't scream hackish to me.

medihack commented 1 week ago

Oh, that makes so much more sense now. Thanks so much. You are the best πŸ™‚. Especially, I wasn't aware of shielding. Of course, the scheduling could also be used with date/time ranges and without any extra queues. Also, this example nicely demonstrates the strengths of asyncio. I will close the issue as I am convinced now that this is the more flexible option. I will post my implementation here sometime when it's done.