tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Getting ERROR:saq:Upkeep task failed unexpectedl #159

Closed kdenz closed 5 days ago

kdenz commented 6 days ago

Hi! Looking forward to use this library instead of RQ / ARQ.

Been getting the following error after running with Docker CMD ["python", "-m", "saq", "app.worker.worker_settings"]

Error

worker-1        | ======== Running on http://0.0.0.0:8080 ========
worker-1        | (Press CTRL+C to quit)
worker-1        | ERROR:saq:Upkeep task failed unexpectedly
worker-1        | Traceback (most recent call last):
worker-1        |   File "/usr/local/lib/python3.12/site-packages/saq/worker.py", line 189, in poll
worker-1        |     await func(arg or sleep)
worker-1        |   File "/usr/local/lib/python3.12/site-packages/saq/queue/redis.py", line 243, in sweep
worker-1        |     for job_id, job_bytes in id_jobs:
worker-1        |                              ^^^^^^^
worker-1        | TypeError: 'NoneType' object is not iterable

worker.py

import logging

from saq import CronJob, Queue
from saq.types import Context, SettingsDict

from app.core.config import settings
from app.core.db import db

logging.basicConfig(
    level=logging.INFO,
)
logger = logging.getLogger(__name__)

async def startup(ctx: Context):
    logger.info(f"Starting: {ctx}")
    await db.connect()

async def shutdown(ctx: Context):
    logger.info(f"Shutting down: {ctx}")
    await db.disconnect()

async def before_process(ctx: Context):
    logger.info(f"Before processing job: {ctx['job']}")

async def after_process(ctx: Context):
    logger.info(f"After processing job: {ctx['job']}")

async def test_cron(ctx: Context):
    logger.info(f"Test cron: {ctx}")

redis_url = f"redis://:{settings.WORKER_REDIS_PASSWORD}@{settings.WORKER_REDIS_ENDPOINT}:{settings.WORKER_REDIS_PORT}"

job_queue = Queue.from_url(redis_url)

worker_settings: SettingsDict = {
    "queue": job_queue,
    "concurrency": 10,
    "startup": startup,
    "shutdown": shutdown,
    "before_process": before_process,
    "after_process": after_process,
    "cron_jobs": [CronJob(cron="* * * * * */5", function=test_cron)],
    "functions": [test_cron],
}

Also logs are not outputting to docker with PYTHONUNBUFFERED=1

Any clue what's going on?

tobymao commented 5 days ago

what version are you on? you may need to run with -vv to get your info logging

i can't reproduce this issue, what version are you on?

figured it out, if there are multiple workers this can happen

kdenz commented 5 days ago

@tobymao dope! logging works now, thanks