taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

Add messages rejection #322

Open vasilbekk opened 2 months ago

vasilbekk commented 2 months ago

Thank you so much for creating such a cool library, but I found a problem.

rabbitmq has support for dead messages, and taskiq-aio-pika even creates a queue for them.

But in the Receiver implementation in tasker, there is only message.ack(), and there is no option for message.reject()

And in Receiver.callback(), ack() is always called even if the task failed with an error

I rewrote the class a little bit, and added the optional reject() callable field to AckableMessage (I called RejectableMessage), also rewrote the broker so that in listen() it returns RejectableMessage instead of AckableMessage, and rewrote `Receiver.callback() so that in the case of result.is_err it calls reject(), and not ack()

I could clean up a bit and send a PR with message rejection implementation (especially useful in rabbitmq) if needed.

It would also be interesting to get an answer to 2 questions

  1. Why is the creation of a queue for dead messages implemented in taskiq-aio-pika, but it is essentially always empty, was it laid down for the future or broke?

  2. Perhaps there is another implementation of repeating error messages and I do not see it, I would be glad to hear suggestions

a1d4r commented 2 days ago

Hello, Sergey! I've encountered the same problem. I would rather expect sending failing tasks to the dead letter queue rather than ack'ing them. Could you please share a draft PR or a gist? I'm pretty sure it might help many people including me.