tobymao / saq

Simple Async Queues
https://saq-py.readthedocs.io/en/latest/
MIT License
576 stars 39 forks source link

Heartbeat-based task interruption not working as expected #126

Closed evgenii-moriakhin closed 4 months ago

evgenii-moriakhin commented 4 months ago

Heartbeat-based task interruption not working as expected

I've noticed that tasks with a heartbeat are not being interrupted properly in the saq library. Here's a minimal example demonstrating the issue:

import asyncio
from redis.asyncio import Redis
from saq import Queue, Worker

should_executed_once = 0

async def handler(ctx):
    global should_executed_once
    await asyncio.sleep(5)
    should_executed_once += 1

async def main():
    queue = Queue(Redis())
    worker = Worker(queue, [handler], timers={"sweep": 1})
    asyncio.create_task(worker.start())
    await asyncio.sleep(1)

    job1 = await queue.enqueue("handler", heartbeat=1)
    await job1.refresh(0)
    assert job1.status == "aborted"
    assert should_executed_once == 0

    job2 = await queue.enqueue("handler")
    await job2.refresh(0)

    assert should_executed_once == 2  # Should be 1

    await worker.stop()
    await worker.queue.disconnect()

asyncio.run(main())

The first job is set to abort due to a heartbeat timeout, but the handler still executes. I think the issue might be in the Queue.sweep function. Instead of just updating the status with await job.finish(Status.ABORTED, error="swept"), it should actually abort the task with await job.abort("swept") (or, possibly use a combination of these methods).

Why was the current implementation chosen? Are there any potential side effects of changing to job.abort()?

barakalon commented 4 months ago

job.abort() makes sense to me.

But also, looking at https://github.com/tobymao/saq/issues/127 - maybe instead of job.abort(), we should treat stuck jobs as failures and possibly retry?

tobymao commented 4 months ago

jobs may be blocking and sweeping needs to happen asyncronously.

i'm not exactly sure what the problem is here, perhaps you need to decrease your sweep timer

the sweeper may not actually control the process to actually stop a task. that can only happen by the actual worker. so you see how these things must be done in an async way and can take time to do. there's no guarantees.

barakalon commented 4 months ago

Yeah but you can abort and at least give the worker a chance to cancel the task, if it’s still running

tobymao commented 4 months ago

isn’t that what we do?

barakalon commented 4 months ago

@evgenii-moriakhin pointed out we only mark jobs as aborted, we don’t actually set the abort key

tobymao commented 4 months ago

ah i see, yea should be an easy fix

evgenii-moriakhin commented 4 months ago

Good fix that also addresses #127. Interrupting always before retry is what I would have suggested after seeing #127.

However, one aspect concerns me. In the case of "normal" job processing ABORTED set only if the last attempt was completed. If there are still attempts left, ABORTED is not set, and a retry occurs. This seems logical.

except asyncio.CancelledError:
    if job:
        aborted = self.job_task_contexts.get(job, {}).get("aborted")
        if aborted:
            await job.finish(Status.ABORTED, error=aborted)
        else:
            await job.retry("cancelled")

Code that relies on waiting for job completion through job.refresh(0) works correctly.

In the current enhancement in release 13.0, the sweep code first updates the status to ABORTED, then performs a retry. Clients relying on job.refresh(0) may consequently receive an unfinished job.

How correct is this behavior? It seems to slightly break the previous status change behavior and might affect some existing code (including mine). The code I provided in the issue will fail on assert job1.status == "aborted". (If we set multiple retries for the first job)

I would expect it to wait for the last retry to finish.

evgenii-moriakhin commented 4 months ago

I understand this isn't elegant, but I see a solution in allowing .abort to not update the status (in case the Job can still be retried). In fact, for sweep to work like the main Job processing logic, we need functionality that simply tries to cancel the asyncio.Task

evgenii-moriakhin commented 4 months ago
import asyncio

from redis.asyncio import Redis
from saq import Queue, Worker

job_executed = 0

async def handler(ctx):
    global job_executed
    await asyncio.sleep(3)
    job_executed += 1

async def main():
    queue = Queue(Redis())
    worker = Worker(queue, [handler], timers={"sweep": 1})
    asyncio.create_task(worker.start())
    await asyncio.sleep(1)

    await queue.enqueue("handler", heartbeat=1, retries=2)
    await asyncio.sleep(10)

    assert job_executed == 1  # It should be 0 due to heartbeat interruptions!

    await worker.stop()
    await worker.queue.disconnect()

asyncio.run(main())

I've found another place with an error

Although the cancellation occurs the first time, the second time the job completes to the end without being interrupted by the heartbeat

PS And very strangely, if we set the sleep inside the handler from 3 to 2, the counter equals 2 executions...

I think we need more tests for edge cases. The functionality has become more complex, and tests are necessary.

tobymao commented 4 months ago

I understand this isn't elegant, but I see a solution in allowing .abort to not update the status (in case the Job can still be retried). In fact, for sweep to work like the main Job processing logic, we need functionality that simply tries to cancel the asyncio.Task

i’m not sure i understand your suggestion. the sweeper doesn’t have the ability to cancel the task. the actual process may be running on a separate machine.

evgenii-moriakhin commented 4 months ago

’m not sure i understand your suggestion. the sweeper doesn’t have the ability to cancel the task. the actual process may be running on a separate machine.

I mean to pass a flag (such as only_cancel for example) along with the abort request, if more attempts are planned for the job. And in this case, not update its status to ABORTED.

tobymao commented 4 months ago

and what if the job is not actually running? it will never be set the aborted right?

evgenii-moriakhin commented 4 months ago

Yes, thanks

I mean something like this: Perhaps with the flag finish it will be clearer

if stuck:
    swept.append(job_id)
    if job.retryable:
        await self.abort(job, error="swept", finish=False)
    else:
        await self.abort(job, error="swept", finish=True)

In case the job is retryable, it will still go into QUEUED status afterwards. If not, the status is updated as before.

Please correct me if I'm mistaken.

barakalon commented 4 months ago

The sweeper sets the status, like it was doing before

tobymao commented 4 months ago

02fb13c

so we try to abort it, if it does, the status can change, and then it will retry. you can check the retryable flag