mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.23k stars 187 forks source link

QueryIterator#close rejects at most 1 message #497

Closed Darsstar closed 1 year ago

Darsstar commented 1 year ago

QueryIterator#close ends with these lines: https://github.com/mosquito/aio-pika/blob/84f743a90854da1c7a8f2e95ff2be8590b4d283b/aio_pika/queue.py#L401-L412

My own conclusion is that the QueueEmpty exception should be suppressed outside the local function, let me explain why.

Scenario 1: channel is closed. Early return. 0 messages rejected. OK

Scenario 2: empty queue. QueueEmpty is thrown but suppressed, then there is an unconditional return. 0 messages rejected. OK

Scenario 3: 1 message in the queue. One message is yielded, after which we reach the unconditional return. 1 message rejected. OK

Scenario 4: 2 or more messages in the queue. One message is yielded, after which we reach the unconditional return. 1 message is rejected. NOT WHAT WE WANT


Removing the unconditional return would turn it into an endless loop, which is also not what we want. QueueEmpty should be thought of as StopIteration in this case. It is the mechanism to exit the loop and should therefore not be suppressed within a loop iteration.


I believe this should work correctly since the QueueEmpty exception exits the loop for us, which is then suppressed.

        # Reject all messages
        with contextlib.suppress(asyncio.QueueEmpty):
            while not self._amqp_queue.channel.is_closed:
                msg: IncomingMessage = self._queue.get_nowait()
                await msg.reject(requeue=True)
mosquito commented 1 year ago

Thank you for this complete explanation. Would you want to make a PR?

mosquito commented 1 year ago

BTW: Now the thought came to me that it was possible to take only the last message and make .nack(requeue=True, multiple=True) It's better for the performance and whould just sending only one frame.