mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

RobustConnection and QueueIterator #312

Open ivonindza opened 4 years ago

ivonindza commented 4 years ago

This is about handling the scenario where RabbitMQ fails and is later restarted, specifically how to handle the messages which were prefetched with a QueueIterator before the connection broke.

Let's take a simple application which consumes from one queue and publishes to another. It uses the connect_robust call to make the connection. When I restart the RabbitMQ, while the consuming/publishing is happening, either: 1) An aiormq.exceptions.ChannelInvalidStateError will be raised, because the channel is closed 2) Or, if the application tries to do a message.ack() (or message.reject()) a RuntimeError with the message "Writer is None" is raised.

In the background, aio_pika will get a ConnectionResetError (Errno 104, Connection reset by peer) and will start reconnecting. Eventually, it will reconnect successfully and restore channels, exchanges, and queues. However, the messages that were prefetched before the connection broke end up in a weird state - they cannot be acked or rejected any more, instead will fail with the above-mentioned RuntimeError (Writer is None). As a consequence of this, the queue iterator cannot be closed cleanly, because it will try to call message.reject() for the messages remaining in the internal asyncio.Queue datastructure and fail on the first.

There are two possibilities now. If you try to close the iterator and create a new one, these prefetched messages will remain unacknowledged. So, your application won't process them and RabbitMQ won't be able to deliver them again (or to a different application). Example code for this:

RABBITMQ_URL = 'amqp://guest:guest@localhost/'
QUEUE_IN = 'input'
QUEUE_OUT = 'output'

async def main():
    conn = await aio_pika.connect_robust(RABBITMQ_URL)
    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)
        queue_in = await channel.declare_queue(QUEUE_IN, durable=True)
        queue_out = await channel.declare_queue(QUEUE_OUT, durable=True)

        while True:
            try:
                async with queue_in.iterator() as qiter:
                    async for message in qiter:
                        await channel.default_exchange.publish(
                            aio_pika.Message(
                                message.body,
                                delivery_mode = aio_pika.DeliveryMode.PERSISTENT
                            ),
                            routing_key = QUEUE_OUT
                        )
                        await message.ack()
                        await asyncio.sleep(1) # Slow down so as not to empty the queue too quickly
            except (aiormq.exceptions.ChannelInvalidStateError, RuntimeError) as e:
                await conn.connected.wait()

The second possibility is to continue consuming from the same queue iterator on reconnection. The prefetched messages will all fail with the RuntimeError (Writer is None) and also asyncio will issue a warning "socket.send() raised exception." This will release them back to RabbitMQ, so they will be consumed again later. However, this means that my application will still process them needlessly as they have no chance of being acked, but my application doesn't know about this (and can't differentiate these messages from the new ones that are consumed after reconnection). Example code for this scenario:

async def main():
    conn = await aio_pika.connect_robust(RABBITMQ_URL)
    async with conn:
        channel = await conn.channel()
        await channel.set_qos(prefetch_count=10)
        queue_in = await channel.declare_queue(QUEUE_IN, durable=True)
        queue_out = await channel.declare_queue(QUEUE_OUT, durable=True)

        async with queue_in.iterator() as qiter:
            while True:
                try:
                    async for message in qiter:
                        await channel.default_exchange.publish(
                            aio_pika.Message(
                                message.body,
                                delivery_mode = aio_pika.DeliveryMode.PERSISTENT
                            ),
                            routing_key = QUEUE_OUT
                        )
                        await message.ack()
                        await asyncio.sleep(1) # Slow down so as not to empty the queue too quickly
                except (aiormq.exceptions.ChannelInvalidStateError, RuntimeError) as e:
                    await rabbit_conn.connected.wait()

I noticed that the prefetched messages remain with the reference to obsolete aiormq.Channel instance from before the connection broke. However, just updating them with the new channel reference just breaks everything even more.

I would expect either to be able to use the prefetched messages with the new connection so that they can be acked, or alternatively that they are purged from the queue so they won't needlessly be processed by the application if it's not possible to ack them with the new connection.

I am using aio_pika version 6.4.1.

mosquito commented 4 years ago

Unfortunately, all it's impossible. If no_ack flag has been set all messages automatically acks after receiving. But in manual ack mode, RabbitMQ returns, all messages to the queue and passing it to another connection.

The failing as soon as possible the best scenario IMHO. You might be use async with message.process(): construction. It's a little bit simpler.

ivonindza commented 4 years ago

I don't want to fail the application, it's a service and it should survive the restart (e.g. failure or migration) of RabbitMQ.

In the second example above, when the prefetched messages raise an exception on the new connection, they stop being unacknowledged by RabbitMQ and go back to ready. If I can have this happen before I process them that would be enough (e.g. during the restore process after reconnecting).

mosquito commented 4 years ago

Sorry I can not imagine your case completely. I may provide some suggestions and recommendations.

  1. Do not prefetch too much. Set QoS for the channel.
  2. The network will down sometimes. Just handle it (e.g. add exception handling).
  3. aio-pika is the representation of AMQP abstractions to the python. And just contains several helpers and useful scenarios for configuring the RabbitMQ through AMQP0.9. It's not doing magic. It not changing broker behavior.
ivonindza commented 4 years ago

The problem is that prefetched messages cannot be acked on the new connection and that also they cannot be rejected when closing the queue iterator because the QueueIterator.close will fail with RuntimeError (Writer is None).

This line is basically the problem: https://github.com/mosquito/aio-pika/blob/88c39fcc4bcf7db45bbd73d68a6b347d9ee83f22/aio_pika/queue.py#L368

If this was instead:

try:
    await msg.reject(requeue=True)
except RuntimeError:
    pass

then the QueueIterator would be able to close cleanly and release messages back to RabbitMQ.

Darsstar commented 5 months ago

Does #615 fix it? (I believe it should)