Miksus / rocketry

Modern scheduling library for Python
https://rocketry.readthedocs.io
MIT License
3.25k stars 105 forks source link

DOCS: Example Rocketry with FastAPI where uvicorn is started by Docker container #151

Open writeson opened 1 year ago

writeson commented 1 year ago

I'm working on a FastAPI project. I want to add scheduling to, and Rocketry looks like a good way to do this. I understand the documentation about how to start Rocketry and FastAPI as separate asyncio tasks. But I also see where the examples modify uvicorn so Rocketry can enter the exit code.

Right now, I'm using docker_compose.yml to start the uvicorn rather than starting uvicorn in my code. Is there a way to get Rocketry connected to the asyncio loop as a task after the asyncio loop is running (by FastAPI)? And is there a way to connect the Rocketry exit work to uvicorn after it's running? Or can I connect the Rocketry exit stuff to the FastAPI "shutdown" event?

Thanks! Doug

Miksus commented 1 year ago

Do you mean that whether you could launch Rocketry without creating the loop (without sync method app.run())?

There is the app.serve() which is an async function. This is basically the same as app.run() except it does not create the async loop. Therefore if you manage to call that (await app.serve()) in FastAPI, you possibly are able to connect to FastAPI's loop.

What comes to the exit, you need to signal to Rocketry to quit. This can simply be done with creating a shut condition that is true when FastAPI is exiting. Cannot recall FastAPI's exit but something like this:

@app.cond()
def is_fastapi_dead():
    # Get the info whether FastAPI is exiting somewhere
    return True or False

...
app.session.config.shut_cond = is_fastapi_dead
...
await app.serve()

Or did I understand incorrectly?

writeson commented 1 year ago

Hi Mikus, You've pretty much got what I ended up doing, and seems to work. Here it is:

###scheduler.py###

import asyncio

from rocketry import Rocketry
from rocketry.conds import after_success, every
from structlog import get_logger

logger = get_logger(__name__)

app = Rocketry(config={"task_execution": "async"})

async def start():
    """Start the scheduler"""
    logger.debug("Starting up Recurring Payment Scheduler")

    @app.task(every("5 minutes", based="finish"))
    def run_scheduler():
        logger.debug("Running scheduler")

    # hook the rocketry scheduler into the FastAPI event loop
    fastapi_event_loop = asyncio.get_running_loop()
    fastapi_event_loop.create_task(app.serve())

async def stop():
    """Stop the scheduler"""
    logger.debug("Stopping Recurring Payment Scheduler")
    app.session.shut_down()

Then in my server start up code I have this to start and stop rocketry when FastAPI starts and stops:

    from . import scheduler

    @fastapi_app.on_event("startup")
    async def startup_event():
        """Event handler for the startup event"""
        logger.info("Starting up Recurring Payment API")
        await scheduler.start()

    @fastapi_app.on_event("shutdown")
    async def shutdown_event():
        """Event handler for the shutdown event"""
        logger.info("Shutting down Recurring Payment API")
        await scheduler.stop()

Do you see anything I might be doing that's foolish? Thanks! Doug

gegnew commented 1 year ago

Hey @writeson , this seems like a really clever solution. Is the shutdown event working properly for you? For me it takes quite a long time and exits with a asyncio.exceptions.CancelledError

writeson commented 1 year ago

Hi @gegnew, so far, it seems to be working for me. However, I only have it logging "hello world" every 5 minutes, not using it for my intended purpose just yet

gegnew commented 1 year ago

I'll update you if I figure anything useful out. Anyway, nice work :wink:

writeson commented 1 year ago

Thanks, I'll be sure to post here if things pop up as I make more use of Rocketry in my app. ;)

gegnew commented 1 year ago

I've only had one issue with this so far: I'm running my app with Gunicorn, and it appears that Rocketry tasks are kicked off on all workers (which makes sense), but this means that the tasks are run n times (where n is the number of workers).

I haven't come up with a solution yet

writeson commented 1 year ago

That's a good point I hadn't considered. My app is run by uvicorn in a Kubernetes system, with one worker per pod. Though that could mean multiple instances of the app could be running. I wonder if using someing like RedLock (locking through redis) could be useful to ensure only a single instance is running the task? Whoever gets the lock first runs it, and all others bail.

carloe commented 1 year ago

I wonder if using someing like RedLock (locking through redis) could be useful to ensure only a single instance is running the task?

Any reason you could not just turn this into a kubernetes job (I.e a cron)?

I was trying to get the same kind of setup going that you are talking about. And after messing with it for a while I ended up going the k8 job route. My codebase is still monolithic, and the job uses the same image. Just with a different entry point. I am not sure if there are any downsides to this approach that I am overlooking, but it has been working pretty well for me so far.

writeson commented 1 year ago

I don't know to much about k8 jobs as my Dev Ops folks handle most of the k8 work. But, I will look into it, thanks!