tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
532 stars 37 forks source link

Random sweeping of jobs #124

Closed paul-finary closed 3 months ago

paul-finary commented 3 months ago

Hi,

I'm working on an application relying on SAQ for its job scheduling part. I have ~50k objects that need to be kept up to date with data from external providers. Each of those object have a different "update cycle", where a job runs every X hours and updates the object with data from the providers. Once the object update is finished, the job reschedules itself with the _after_process hook using arguments from the meta tag. Each object have only one job, hence the custom job id in the form saq:job:default:job(scheduled=True, object_id='XXXX').

Using the _after_process allow me to re-enqueue the job without triggering the unique constraint, because the job is removed from the active/queued queue just before.

I've been tracking a bug in my application for a while now, where some of my objects get thrown out of their update cycle (meaning, their job don't reschedule themselves and are not updated anymore), and I realised that the jobs linked to those objects were swept seemingly randomly. For each example I found, the reason for the sweep was that the job was in the QUEUED state, but was found in the active queue.

Here are my logs:

I, [2024-05-30T14:37:36.123 #7] INFO - Scheduled [b"saq:job:default:job(scheduled=True, object_id='61a8d5212c1653d2c4808b79')", b"saq:job:default:job(scheduled=True, id='61a9b4d175c7f28379b8f17c')"]

I, [2024-05-30T14:37:37.243 #7] INFO - Processing Job<function=job, kwargs={'object_id': '61a9b4d175c7f28379b8f17c'}, queue=default, id=saq:job:default:job(scheduled=True, object_id='61a9b4d175c7f28379b8f17c'), scheduled=1717079856, progress=0.0, start_ms=603675106, attempts=1, status=Status.ACTIVE, meta={}>

I, [2024-05-30T14:37:44.805 #7] INFO - Finished Job<function=job, kwargs={'object_id': '61a9b4d175c7f28379b8f17c'}, queue=default, id=saq:job:default:job(scheduled=True, object_id='61a9b4d175c7f28379b8f17c'), scheduled=1717079856, progress=1.0, process_ms=7694, start_ms=603675106, total_ms=603682800, attempts=1, result={'code': 200, 'asset': Object}, status=Status.COMPLETE, meta={'reschedule_kwargs': {'object_id': '61a9b4d175c7f28379b8f17c', 'deferred': 604338}}>

I, [2024-05-30T14:37:44.807 #7] INFO - Enqueuing Job<function=job, kwargs={'object_id': '61a9b4d175c7f28379b8f17c'}, queue=default, id=saq:job:default:job(scheduled=True, object_id='61a9b4d175c7f28379b8f17c'), scheduled=1717684202, progress=0.0, attempts=0, status=Status.QUEUED, meta={}>

I, [2024-06-06T14:30:02.652 #7] INFO - Scheduled [b"saq:job:default:job(scheduled=True, object_id='60dcbd77fc93fcd7e58dc214')", b"saq:job:default:job(scheduled=True, object_id='61a9b4d175c7f28379b8f17c')", b"saq:job:default:job(scheduled=True, object_id='6287ba5451ba86567cb348f0')"]

I, [2024-06-06T14:30:02.883 #7] INFO - Sweeping job Job<function=job, kwargs={'object_id': '61a9b4d175c7f28379b8f17c'}, queue=default, id=saq:job:default:job(scheduled=True, object_id='61a9b4d175c7f28379b8f17c'), scheduled=1717684202, progress=0.0, attempts=0, status=queued, meta={}>

I, [2024-06-06T14:30:02.906 #7] INFO - Finished Job<function=job, kwargs={'object_id': '61a9b4d175c7f28379b8f17c'}, queue=default, id=saq:job:default:job(scheduled=True, object_id='61a9b4d175c7f28379b8f17c'), scheduled=1717684202, progress=0.0, total_ms=604338078, attempts=0, error=swept, status=Status.ABORTED, meta={}>

I've been looking at the source code without getting somewhere, I can't seem to find any lead as to why random jobs get swept like that. Do you have any idea or pointers ? I'm assuming I do something wrong in my scheduling to lead in a state where a job is in the active queue but is in a QUEUED status, but I can't find what.

Thanks for the help

tobymao commented 3 months ago

how are you re-enqueing jobs in after_process? maybe it's better to have jobs be set on a cron rather than having jobs re-enqueue themselves.

sweeping happens automatically. by default it happens every 60 seconds. if finds all jobs that are in the active queue. if the job is not active or is stuck then it will get swept.

when a job is processing, it dequeues, atomically, it moves a job from queued to active.

the job status is set to active in memory, and then update is called. it is possible for the sweep to happen before the update gets called thus sweeping the job for being in the queued state.

paul-finary commented 3 months ago

Thanks for your response, the thing is that each job is rescheduled on its own cycle (with jitter), to avoid having 50k jobs being processed at the same time, and smooth them over a period of time.

The jobs a re-enqueued like so:

# Automatically reschedule jobs
async def _after_process(ctx: Context | None) -> None:
    if ctx is None:
        return

    job = ctx["job"]

    reschedule_kwargs = job.meta.get("reschedule_kwargs")
    if reschedule_kwargs:
        await job.queue.enqueue(job.function, **reschedule_kwargs)

I see, thanks for the explanation.

tobymao commented 3 months ago

can you let me know if my change fixes the issues?

tobymao commented 3 months ago

@paul-finary can you confirm if the latest release fixes the issue?

paul-finary commented 3 months ago

Hi,

Thanks for the quick fix, I tested and it appears that the issue is still present, albeit more rare. The reason is the same (job in QUEUED state in the active queue). I tried increasing the asyncio.sleep duration, but this led to another error popping: "Job XXX doesn't exist".

For now, I'll just try and only sweep jobs when they're stuck.

Let me know how I can help !

barakalon commented 3 months ago

I'm seeing this as well. Seems to happen when a job's args are really large, so downloading/deserializing take a relatively long time. I'm assuming the latest release will help a lot in my case.