benjamin-hodgson / asynqp

An AMQP library for asyncio
MIT License
84 stars 29 forks source link

It's possible to lock up queue.consume() so that it never emits any callback. #91

Open CompPhy opened 7 years ago

CompPhy commented 7 years ago

First off, I would like to say nice work. I've been playing around with several of the asyncio/amqp options and so far I like asynqp the best.

However, I found an issue with 5.1 release and I'm not sure what's up. Basically, I've built a test system where I can produce messages into RabbitMQ much faster than I can consume them. Ideally you want the consumer to just keep up as best as possible; but still work properly. HOWEVER, the queue.consume() setup seems to not emit any callbacks if it's still receiving messages from the queue.

This behavior seems to exist regardless of no_ack settings.

With no_ack=True; then RabbitMQ says messages are being delivered, and they're probably in memory on the consumer. However, the callback NEVER emits until the queue is completely empty. In my test case, this means my consumer is essentially stuck waiting for a callback. If I stop the producer, then the consumer will finally unblock, but only AFTER RabbitMQ says all the messages were delivered.

With no_ack=False; the callbacks essentially act the same, and will not emit as long as there's new stuff showing up in the message queue. The only difference is that RabbitMQ shows all the messages as unacknowledged instead of delivered.

I would expect that while it is certainly possible to receive multiple messages, and buffer them, there should be some limit to this before it breaks the loop and starts to emit callbacks. I had my queue up to like 500k messages, and climbing, and it was still blocking. Maybe there needs to be some sort of configurable limit here, or a timeout?

Or, is there just something really broken with my implementation? I don't think so, since it's pretty simple at this point and follows the documentation as closely as possible. Although, maybe I'm wrong. Can anyone else reproduce this issue?

CompPhy commented 7 years ago

So I did a little more digging by breaking into the running application with my debugger. I do see that is is emiting the callback, but really slowly. I put a break point on my call back so I could do a single iteration. During one iteration, the self.reader.pending_frames grew by like 5000 messages, and I can see that RabbitMQ thinks all of these messages went to the no acknowledged state, but it only emited the callback a single time. For some reason it's pulling a bunch of stuff off the queue even though it hasn't fully handled other things that were already downloaded to the application stack; and are just waiting for callback to get emitted.