mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

RobustExchange raises exception if you try to publish something when connection is broken #508

Open uthunderbird opened 1 year ago

uthunderbird commented 1 year ago

I am not sure if it is correct behavior.

I am using aio-pika==8.2.0 but pretty sure it will do the same on latest versions.

import asyncio
import logging

import aio_pika

logging.getLogger().setLevel('INFO')

async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://admin:admin@127.0.0.1/",
    )

    async with connection:
        routing_key = "botmanager-q"

        channel = await connection.channel()

        while True:
            await asyncio.sleep(3)
            logging.info(f"channel.is_initialized: {channel.is_initialized}")
            logging.info(f"channel.is_closed: {channel.is_closed}")
            # The script fails if below code is commented
            # while channel.is_closed:
            #     logging.info("Waiting channel will be restored")
            #     await asyncio.sleep(1)
            await channel.default_exchange.publish(
                aio_pika.Message(body=f"Hello {routing_key}".encode()),
                routing_key=routing_key,
            )
            logging.info("Published")

if __name__ == "__main__":
    asyncio.run(main())

Here is my script to reproduce what I'm talking about. As you can see, here we use the same channel for publishing messages.

Now you should go to RabbitMQ management panel, and force close connection, and you will see something like ERROR:aiormq.connection:Unexpected connection close from remote "amqp://admin:******@127.0.0.1:5672/", Connection.Close(reply_code=320, reply_text='CONNECTION_FORCED - Closed via management plugin') in your log.

And, soon after this moment, the script will be crashed with such exception: aiormq.exceptions.ChannelInvalidStateError: <Channel: "1" at 0x7fafa2967240> closed

It looks like a correct behavior for aiormq because it's low-level enough, but not for aio-pika.

I think RobustChannel should be actually robust. If it should be fixed, I can try to fix it by myself and make some PR soon.

mosquito commented 1 year ago

The thing is that the library does not take away the guarantee of publishing messages from the user. Only custom business logic dictates the behaviour at this point, should it catch the exception and wait for the .ready() method and retry, or write to the log this broken state and skip the message or something else.

Robust means this will restore state after reconnect, but not guarantee the delivery in this case.

In my opinion, there is no more explicit behaviour for the user in the event of a connection failure than to say so explicitly and try to make recovery attempts in the background.