Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.25k stars 105 forks source link

ENH Distributed execution #111

Open carlosporta opened 1 year ago

carlosporta commented 1 year ago

Hello,

Do you have any plans to add distributed execution?

Something like:

@app.task(daily, queue: RedbirdQueue | RabbitMQQueue | RedisQueue | InMemoryQueue | SQLALchemyQueue)
def do_daily():
    ...

worker = RocketryWorker(queues=[my_queues])
worker.run()
Miksus commented 1 year ago

Actually, I do. Though I think the distributed space (at least in Rocketry's context) consists of two issues:

  1. Synced logs: Put task logs to the same database and spawn multiple Rocketry apps
  2. Task queues: Put tasks to a queue where multiple Rocketry instances fetch and run tasks one at a time

The first one is implemented, you can already put task logs to an SQL database, MongoDB and CSV file. For more options, we need to extend Red Bird. However, I'm not sure how well it would stay in sync if there were a lot of app instances running with very small sleep. There could be a situation where an instance decides to run a task but before it logs it that it is running, another app instance has also decided to run the task. I'm not sure whether more locking mechanics need to be implemented. Perhaps something that could be customized if necessary (adding more hooks).

For the second option, the latest version of Rocketry was actually partly about making the foundation for this. There is a method task.run() that sets a run batch to the task. These run batches can be parametrized as well (like task.run(arg="a value")). I have been prototyping making a built-in metatask that should be running in the background (on a separate thread or using async) and that would call the run method of the next task based on a given queue. Perhaps the queue could be Red Bird repo. Perhaps Redis and RabbitMQ repos could be added to Red Bird.

At the moment one could create their own queues as well relatively easily, for example something like this:

queue = ...

@app.task(on_startup=True, permanent_task=True, execution="async")
async def task_queue():
    "This task runs constantly and runs other tasks"
    while True:
        if queue:
            now = datetime.datetime.now()
            batch = queue.pop()
            task = app.session[batch.task_name]
            task.run(**batch.parameters)
            while task.last_run is None or task.last_run < now or task.status == "run":
                await asyncio.sleep(1)

This queue task assumes the queue has a similar API as a list. This task queue should wait till the task starts (the first and second conditions in the second while) and till the task finishes (the third condition). Note that this queue will disregard the conditions in the other tasks.

I'm thinking of making something similar as prebuilt using Red Bird's repos as the queue. Did I understand your question correctly and was this the thing you were after?

carlosporta commented 1 year ago

I do not know how rocketry works internally, but here are some comments. 1. Synced logs: Put task logs to the same database and spawn multiple Rocketry apps I'm not sure how well it would stay in sync if there were a lot of app instances running with very small sleep What about using a new task status (enqueued) and a new task column (version). So when the scheduler gets a task to run, the scheduler needs to change the task status to enqueued, but for doing that, the task's version needs to be in sync with the read task's version. Example: scheduler --> get next task to run --> returns Task(name='my_task', status='run', version=1) scheduler --> update task status to enqueued where name='my_task' and version=1 --> if ok task.run() else pass # task already got by another scheduler

2. Task queues: Put tasks to a queue where multiple Rocketry instances fetch and run tasks one at a time I like this approach, but I did not understood how do you plan to use the metatask (separated thread or async). It is something similar to what I said?

# producer.py
rabbitmq_queue = RabbitMQQueue('localhost')
@rocketry.task('daily', name='remote_task', queue=rabbitmq_queue)
def remote_task():
    pass

# consumer_1.py
rabbitmq_queue = RabbitMQQueue('localhost')
worker = RocketryWorker(queues=[rabbitmq_queue])
worker.run()

# consumer_2.py
rabbitmq_queue = RabbitMQQueue('localhost')
worker = RocketryWorker(queues=[rabbitmq_queue])
worker.run()

Did I understand your question correctly and was this the thing you were after? Yes, I was after it. I will develop a queue and create a new file (consumer.py) with the code you provided.

Miksus commented 1 year ago

Actually, a bit of improvement is coming for this (I think). I'm exposing the lock so that one could give a custom class for how the task locking (a lock that is acquired before inspecting whether a task should run). Next I'm planning on making a lock abstraction in Red Bird so that one could, for example, set an item in a database act as the lock. So basically a repo from Red Bird could be used as a lock.

What comes to the task queues, I planned on doing some sort of an event stream which could be a solution. Or then there would be a separate queue mechanism (working via conditions) to ensure the events are as sync as possible with the repo.

carlosporta commented 1 year ago

What do you think about this architecture?

image

  1. A python client creates the Rocketry app
  2. The client can call the Rocketry app to run a task or the task is run as scheduled
  3. Rocketry app creates a new log on the database - task run or maybe task scheduled to run
  4. Rocketry sends the task to a task executor and waits for its result
  5. The executor sends the task to a free worker (the task executor has N workers running on Z machines)
  6. The executor waits for the task completion
  7. The executor returns the result to the Rocketry app
  8. The Rocketry app saves on the database the finished log

The main idea behind it, is to make a single point of database control. Only the Rocketry app is responsible for writing on the database and the executor is responsible for determining which worker will run the task. What do you think about it?

Udit107710 commented 1 year ago

Any update on this? I would like to actively work on contributing to this. Let me know if there has been any progress on this or there are any further pointers than the ones mentioned above. I will also invest some more time into looking at the current architecture and see how we can proceed on this.