tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Example usage in an ASGI framework? #71

Closed ThinksFast closed 1 year ago

ThinksFast commented 1 year ago

Hi, I am trying to use saq in an ASGI framework called starlite. I'm struggling to get a basic SAQ implementation working smoothly.

Below is a minimal example where the worker gets started and the job gets scheduled, but during shutdown, the code hangs. It seems like the jobs don't get aborted or cleaned up.

Can you help suggest a working configuration?

Python Requirements:

starlite
uvicorn
saq
python-dotenv

Save file as main.py and run using: uvicorn main:app

import asyncio
import logging
import os

import uvicorn
from dotenv import load_dotenv
from saq import CronJob, Queue, Worker
from starlite import LoggingConfig, Starlite, get

########################################################
#################### ENV & LOGGING CONFIG  ################

load_dotenv()

REDIS_URL = os.getenv("REDIS_URL")

logging_config = LoggingConfig(
    loggers={
        "app": {
            "level": "DEBUG",
            "handlers": ["queue_listener"],
            "propagate": False,
        }
    }
)

logger = logging.getLogger("app")

########################################################
################  SCHEDULED JOB ########################

async def scheduled_job(ctx) -> None:
    logger.info("I'm scheduled to run by SAQ")

#########################################################
###### SAQ CONFIG: SCHEDULES REQUEST EVERY 10 SECONDS ###### 

queue = Queue.from_url(REDIS_URL)

tb = CronJob(scheduled_job, cron="* * * * * */10")  # Every 10 seconds
worker = Worker(queue=queue, functions=[], cron_jobs=[tb])

async def tasks_on_startup() -> None:
    logger.info("Starting SAQ worker")

    asyncio.create_task(worker.start())
    asyncio.create_task(worker.schedule())

    logger.info("SAQ started and tasks scheduled")

async def tasks_on_shutdown() -> None:
    logger.info("Stopping SAQ worker")

    asyncio.create_task(worker.abort())
    asyncio.create_task(worker.stop())

    logger.info("SAQ worker should be dead")

##################################
######## BASIC STARLITE APP #########

@get("/")
def hello_world() -> dict[str, str]:
    """Handler function that returns a greeting dictionary."""
    return {"hello": "world"}

app = Starlite(
    route_handlers=[hello_world],
    on_startup=[tasks_on_startup],
    on_shutdown=[tasks_on_shutdown],
    logging_config=logging_config,
    debug=True,
)

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
tobymao commented 1 year ago

saq is designed to run it's own process. you can do saq.worker.start

i've never run saq directly inside an asgi worker, i know there's a library that does this

check out the usage here

https://github.com/litestar-org/starlite-pg-redis-docker/blob/0c3622f6c483117ed6638e49e82c3545e111573e/app/main.py#L46

peterschutt commented 1 year ago

OP issue is that the signals are handled by the worker and not propagated, so the worker does shut down, but the process managing the worker doesn't get the shutdown signal.

Subclassing the worker to prevent registration of the signal handlers does the trick:

class Worker(saq.Worker):
    SIGNALS: list[Signals] = []