Open huwylphimet opened 7 months ago
When I implemented it I expected for users to bring their own colons, as you can see in here. I guess it would be better for the library to give you a free one.
Both the cases you provided should work with namespaces as the namespace is applied by the backend at the _encode_key function, but you are welcome to test it.
Ah yes I see, thank you for pointing me these facts. So in that case it is free to add any separator like a column or not when defining the namespace. This is fine I think too. It just differs from other middleware like ResultMiddleware where the column separator seems to be hardcoded between the namespace and the (rest of the) key, see here.
Now I have one additional question: I saw that the requests (e.g. abort) are identified by threads, see here and here.
I guess this technic would not survive a reboot of the worker handling the actor since the thread would be restarted and get a new threading instance.
Is it somehow possible to override that get_current_thread()
method to return some persistent worker/actor identifier in order any actor would still abort any message where pending abort keys exists even after a reboot of the worker?
The thread identifier is not stored in Redis, it's in process memory. If the worker reboot, it's going to rebuild this mapping with the new threads. In short, as a worker process messages, it keeps a mapping of message => thread
, so that it know what thread to kill. If the worker reboot, it will pick up the message again and rebuild the mapping as it picks them up.
When I implemented it I expected for users to bring their own colons, as you can see in here. I guess it would be better for the library to give you a free one.
I guess it can make sense, consistency is good. PR are welcome.
Actually, if I understand it correctly, calling abort()
does not delete the dramatiq message on the redis hash queue but "only" signal actors to abort / cancel the current task.
If I want the worker to not retry that message, I should (try to) gracefully end the task within the actor by catching the Abort exception raised or whenabort_requested()
returns a positive value in order the message get "marked" as "handled" (meaning will be popped out the queue) by dramatiq.
But in case the worker would never catch the abort / cancel message because it could be down, then if, after the abort timeout has expired, I want to delete the remaining dramatiq message that has been aborted, I would have to do it manually on the message emitter side like that for example (since I only found a way in dramatiq to flush a queue but not a single messages):
redis_msg_id = message.options.get("redis_message_id", None)
redis_queue = f"{broker.namespace}:{message.queue_name}"
broker.client.lrem(redis_queue, 0, redis_msg_id)
broker.client.hdel(f"{redis_queue}.msgs", redis_msg_id)
I guess this feature of "force-deleting" the dramatiq message after aborting timeout has been reached would be out of scope of the abort middleware right?
As for the column suffix after the redis namespace, I'm finally not sure anymore it is needed to be changed. I guess that column is only some commonly seen convention but no standard rule, so maybe it should rather be un-harcoded in the dramatiq lib, I don't know...
When
abort()
orcancel()
is called, a key is pushed on to redis. That key seems to be named:redis namespace + dramatiq message_id
but the namespace is not properly separated by a column (':') with the _messageid, see here.For example if my redis namespace = "myapp" and the dramatiq message_id = "task123" then aborting that message will push a key "myappabort:task123" instead of "myapp:abort:task123" (notice the column just after the namespace)
Are the abort / cancel key names expected to be like that?
Also in the middleware, before processing the message, the abort / cancel key will be checked, see here.
Is the redis namespace taken into account here?
Does aborting / cancelling dramatiq message works when a custom redis namespace has been defined?