Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.27k stars 108 forks source link

ENH: Add async support #28

Closed sorasful closed 2 years ago

sorasful commented 2 years ago

Hi there ! Thanks for your work !

Is your feature request related to a problem? Please describe. It'll be great to be able to run tasks that use async code.

Describe the solution you'd like A transparent asyncio support to be able to run async code.

gaby commented 2 years ago

👍🏻 for this feature!

Miksus commented 2 years ago

Thanks! Indeed I think this would be great. We could have a new execution type that works like:

@app.task("daily", execution="async")
def do_things():
    ....

Should not be too hard to implement either. We need it here: https://github.com/Miksus/red-engine/blob/ba26ec20992a6b3e774d7d8c92e67bc7beea437d/redengine/core/task.py#L349-L365

And then we could do something similar as with the thread execution but just with async loop: https://github.com/Miksus/red-engine/blob/ba26ec20992a6b3e774d7d8c92e67bc7beea437d/redengine/core/task.py#L489-L501

I'm on a bit of baby steps on Asyncio but sounds to be as easy as the thread support.

After that, I think we could start thinking of turning the scheduling system itself as async.

ddanier commented 2 years ago

Would love to see this, too. This would enable using Rocketry for my FastAPI projects. 👍

Miksus commented 2 years ago

Making progress with this and I think I managed to make the core parts of the scheduler as async.

I'm a bit unsure whether this should be async task by default:

@app.task("daily", execution="main")
async def do_things():
    ....

or if it always needs async as execution type:

@app.task("daily", execution="async")
async def do_things():
    ....

If it doesn't have the async execution type, it violates the idea of conveniently switching between the execution types by simply changing the argument in the task. However, it would be nice if you could just add the async def my_task() and the system uses async by default and for def my_task(), main is used instead. That would be convenient.


By the way @ddanier, if you wish to use FastAPI with Rocketry, it's already possible. I made a working proto today that successfully integrates such with current version of Rocketry. The trick is to wrap the FastAPI with a threaded task:

from rocketry import Rocketry
from rocketry.args import TerminationFlag
import asyncio

app = Rocketry()

@app.task("true", execution="thread")
def run_rocketry_api(flag=TerminationFlag()):
    asyncio.run(run_server(flag))

Then to the run_server:

from fastapi import FastAPI
import uvicorn

app_fastapi = FastAPI()

async def run_server(flag, tick=0.5):
    server = uvicorn.Server(config=uvicorn.Config(app_fastapi, workers=1, loop="asyncio"))

    run = asyncio.create_task(server.serve())

    while not flag.is_set():
        await asyncio.sleep(tick)
    server.should_exit = True

I'm making some sort of a tutorial about this hopefully soon. The flag is to ensure the thread (and the task) terminates when the scheduler is finished (ie. makes ctrl + c to work).

Then just add some routes and modify the runtime:

@app_fastapi.get("/tasks/{task_name}")
async def get_task(task_name):
    task = app.session[task_name]
    return {
        "name": task.name,
        "last_run": task.last_run,
        "last_success": task.last_success,
        "start_cond": str(task.start_cond),
        "end_cond": str(task.end_cond),
        "disabled": task.disabled,
        "force_run": task.force_run
    }

@app_fastapi.patch("/tasks/{task_name}")
async def update_task(task_name, values:dict):
    "Update Rocketry's task attributes on runtime"
    task = app.session[task_name]
    for key, val in values.items():
        setattr(task, key, val)

App here is ROcketry's app. There is a slight bug in JSON encoding of tasks thus the get_task needed to return a dict.

gaby commented 2 years ago

@Miksus Should be able to close this one, right?

Miksus commented 2 years ago

Oh, that's indeed true. This is already part of the framework.