Flared / dramatiq-abort

Dramatiq extension to abort message
http://flared.github.io/dramatiq-abort
GNU Lesser General Public License v3.0
40 stars 15 forks source link

dramatiq-abort does not abort DQ jobs #3

Closed cchacholiades closed 3 years ago

cchacholiades commented 3 years ago

Hi, I am trying to use this package as I need to be able to cancel / abort tasks in my dramatiq delayed queue.

from dramatiq import get_broker
from dramatiq_abort import Abortable, backends, abort

def cancel_task(self, task_id):
    """
    Cancels a REDIS task given a task_id
    """
    redis_client = redis.Redis(host=settings.REDIS_LOCAL, port=6379, db=0)

    abortable = Abortable(backend=backends.RedisBackend(client=redis_client))
    get_broker().add_middleware(abortable)

    abort(task_id)

I am not sure if this is the correct way to use it. The above code doesn't produce any errors, nor it aborts the scheduled task. Any ideas?

Update:

isra17 commented 3 years ago

Hum, can you by any chance create a test case that replicate the issue?

There's a few test case existing here: https://github.com/Flared/dramatiq-abort/blob/master/tests/test_abortable.py

cchacholiades commented 3 years ago

@isra17 Thank you for replying. I think I have just resolved the issue now... so I am using Django, I think I had to instantiate dramatiq-abort before importing dramatiq in my tasks.py file. That's what I did, and I can now see the jobs being skipped.

isra17 commented 3 years ago

I see you closed again, found a fix/not a bug?

cchacholiades commented 3 years ago

@isra17 I am having trouble identifying the root cause of the problem. At this point I think I exhausted all of possible scenarios; At some point I thought the issue has to do with the way I deploy my app however that's not the case. Question: Is there a number of how many tasks can be aborted consecutively?

Or maybe are there any cases in which abort will not work and is expected?

isra17 commented 3 years ago

This shouldn't. The way it works it stores the task id in Redis and then the working check everynow and then if one of the incoming or running task has been flagged to be aborted by checking Redis for the task id. Kind of like:

is_aborted = Redis.pop(task_id, False)

One way to debug could be to check in Redis if the key are there and/or add some logging to the code to check what is being polled in Redis and what are the results.

isra17 commented 3 years ago

The code is somewhat simple: https://github.com/Flared/dramatiq-abort/blob/master/src/dramatiq_abort/backends/redis.py

You should be able to add some log easily in poll / wait_many.

cchacholiades commented 3 years ago

It is very strange, I can see sometimes the jobs being aborted and other times not. No cohesion which makes it difficult to debug. And we actually rely on the fact that tasks which have been set multiple times are cancelling each other and only the job that has been scheduled last will execute. Most of the jobs are scheduled some time in the future, could be days ahead or minutes. I use dramatiq's send_with_options to set them up.

cchacholiades commented 3 years ago

@isra17 I created a sample django project with how I've set things up and instructions to reproduce. Can you please have a look? https://github.com/cchacholiades/djdabortbug

I've noticed if scheduled job is 60000 ms (1 min) from now it skips no problem. In my example I have it 271000 ms (4.51666667 minutes) and it doesn't.

isra17 commented 3 years ago

You mean, it stops working after 90000ms ?

isra17 commented 3 years ago

I would probably add a kwarg abort_ttl to the abort and/or __init__ function to allow setting a higher value for your use case. Do you want to create a PR for that?

cchacholiades commented 3 years ago

@isra17 Awesome stuff thank you! And yes I created a PR already let me know if this does it.

cchacholiades commented 3 years ago

@isra17 maybe can you link the PR to this issue and close it?

isra17 commented 3 years ago

Fixed by #8

Kastakin commented 2 years ago

Is there a limit on the TTL accepted by Redis?

In my application I have a queue where each job can take various hours to complete and multiple jobs can be scheduled so I needed a TTL that was as big as possible. At first I tried utilizing sys.maxsize as the abort_ttl value but it didn't work and jobs were not aborted correctly. Setting manually the abort_ttl solved the issue but I just wanted to understand if there was a limit in Redis worth documenting.

isra17 commented 2 years ago

Seems like Redis accept a max int value (2**31-1). I will accept a PR that document the limit or raise an exception if over it (In the redis backend).

Kastakin commented 2 years ago

Seems like Redis accept a max int value (2**31-1). I will accept a PR that document the limit or raise an exception if over it (In the redis backend).

I will try to contribute in the coming days, thanks for checking out the issue