Closed patillacode closed 5 years ago
All the storage logic for the Redis broker is defined in https://github.com/Bogdanp/dramatiq/blob/master/dramatiq/brokers/redis/dispatch.lua so that may help.
The answer is, essentially, "it depends". Once the message has been pulled by the worker, you can no longer prevent it from running w/o adding some custom middleware specifically for this.
I would implement this as a custom lua script like this:
local n = redis.call("lrem", "dramatiq:default", "some-message-id")
-- If at least one message was removed from the queue, then we're guaranteed that no worker has pulled it yet.
if n > 0 then
redis.call("hdel", "dramatiq:default.msgs", "some-message-id")
end
Replace dramatiq:
with your namespace and default
with your queue name.
The .DQ
suffix is used by delay queues. Every queue in dramatiq is actually stored as two separate queues: one for normal messages and one for messages with a delay (i.e. ones that should run some number of seconds in the future).
Hope that helps!
Thanks for your answer @Bogdanp
I ended up manually removing it in a couple of lines:
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_client.hdel("dramatiq:default.DQ.msgs", "message_id")
It feels a bit overkill to have a middleware just for this. Have you thought about adding this as a built-in function?
Thanks again.
The way you're doing it there could cause a crash. You should only remove the message data (i.e. the stuff in *.msgs
if you were able to successfully remove the message from the queue (i.e. the lrem
call in my code above).
Have you thought about adding this as a built-in function?
Dramatiq's primary broker is RabbitMQ (what I personally use it with) and it doesn't support message removal so even though I agree that this might sometimes be useful, it's just not something that can be supported with RMQ w/o jumping through some crazy hoops.
I know this is an old issue, but I am just now having to deal with it. @Bogdanp, @patillacode based on your replies on here, I created a cancel_task() definition by doing the following:
# cancel a dramatiq task
redis_client = redis.Redis(host=settings.REDIS_LOCAL, port=6379, db=0)
n = redis_client.lrem("dramatiq:default.DQ", 1, task_id)
if n > 0:
redis_client.hdel("dramatiq:default.DQ.msgs", task_id)
Would this cause a crash?
@cchacholiades You should use lua to perform the two operations, otherwise you risk some other process modifying the data in between.
I have checked other issues and removing all tasks could be done as you mentioned here
But what if we need to just cancel.delete just one task? For what I have read in the docs and other issues raised I guess there is no built-in way to do this.
But I am not sure of how to even do it manually via
redis-cli
since I don't quite understand howdramatiq
stores/handles this. I can get all hashes viaHGETALL "dramatiq:default.DQ.msgs"
:But I also see data in the
dramatiq:__acks__.95ea30c4-96c6-4b32-9f77-add0fc3ff722.default.DQ
key that points to the hashed values:Do you have any input on how to delete one task before it triggers? Should I remove in both queues?
Thanks for this great project.