igorbenav / FastAPI-boilerplate

An extendable async API using FastAPI, Pydantic V2, SQLAlchemy 2.0, PostgreSQL and Redis.
MIT License
589 stars 69 forks source link

DB session from a worker function? #138

Closed kaStoyanov closed 4 months ago

kaStoyanov commented 4 months ago

Hey, Im trying for 2 days now to make this work,
How do I pass the session from an endpoint to the background worker function?

My idea is to insert a record in the db from my endpoint then process it from the background function.

Edit

I made it work by creating another function with contextmanager

@asynccontextmanager
async def async_get_context() -> AsyncSession:
    async_session = local_session
    async with async_session() as db:
        yield db
igorbenav commented 4 months ago

Nice one!

leonardovida commented 4 months ago

@kaStoyanov I solved it in a different way passing the async_get_db function to the worker's startup function

from arq.worker import Worker
from ...core.db.database import async_get_db

async def startup(ctx: Worker) -> None:
    ctx["db"] = await anext(async_get_db())
    logging.info("Worker Started")

async def shutdown(ctx: Worker) -> None:
    await ctx["db"].close()
    logging.info("Worker end")

And then getting the "db" object from ARQ ctx

from arq.worker import Worker

async def your_background_function(
    ctx: Worker,
    ...
) -> Any:
    db = ctx["db"]
    ...
igorbenav commented 4 months ago

This is the best way, @leonardovida! If you want, you can add it to the readme and submit a PR, otherwise I'll do it later

leonardovida commented 4 months ago

Let me do it @igorbenav ! I'll submit a PR to the docs tomorrow. Good excuse to start contributing more to this great project 😃