mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Impossible to handle Unexpected connection close #599

Open tayp1n opened 7 months ago

tayp1n commented 7 months ago

How to handle Unexpected connection close and raise an exception?

I've got the logic below:

class AMQPHandler:
    def __init__(self) -> None:
        self.connection: AbstractRobustConnection | None = None
        self.channel: AbstractChannel | None = None

    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.CorePublisherSettings

        connection = await aio_pika.connect_robust(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
            timeout=config.CONNECTION_TIMEOUT,
        )
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=10)

        exchange = await channel.declare_exchange(
            config.EXCHANGE_NAME,
            config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.declare_queue(name=q_name)
            await queue.bind(exchange, q_name)
            await queue.consume(self.handle_message)
            logger.info("Queue declared", extra={"queue": q_name})

        self.connection = connection
        self.channel = channel

        logger.info("AMQP handler initialized")

Now, when Rabbit dropes, connection reconnects every 5 seconds. How to make it raise an exception which I could handle and stop a micro-service?

Alviner commented 7 months ago

You can use a regular connection instead of robust one.

Also event connection.connected can be checked in healthcheck handler

tayp1n commented 7 months ago

What I also did is

 def on_connection_closed(self, *args, **kwargs):
        sys.exit(1)

    async def init(self) -> None:
        import settings

        logger.info("Initializing AMQP handler")

        config = settings.BaseMessageBrokerSettings

        conn = await aiormq.connect(
            config.get_dsn(),
            loop=asyncio.get_event_loop(),
        )
        channel = await conn.channel()

        conn.closing.add_done_callback(self.on_connection_closed)
        channel.closing.add_done_callback(self.on_connection_closed)

        await channel.exchange_declare(
            exchange=config.EXCHANGE_NAME,
            exchange_type=config.EXCHANGE_TYPE,
            auto_delete=config.EXCHANGE_AUTO_DELETE,
            durable=True,
        )
        for key in config.BINDING_KEYS:
            q_name = (
                f"{key}.{config.PREFIX_BINDING_KEYS}"
                if config.PREFIX_BINDING_KEYS
                else key
            )
            queue = await channel.queue_declare(queue=q_name, durable=True)
            self.queues.append(queue)

            await channel.queue_bind(exchange=config.EXCHANGE_NAME, queue=q_name)
            await channel.basic_consume(queue.queue, self.handle_message, no_ack=True)
            logger.info("Queue declared", extra={"queue": q_name})

        self.conn = conn
        self.channel = channel

        logger.info("AMQP handler initialized")
Dreamsorcerer commented 7 months ago

Your original code uses connect_robust(), i.e. the function used to create a connection that automatically reconnects... Sounds like you don't want to use that.

gaby commented 3 months ago

I have the same issue. With connect_robust if the connection to RabbitMQ is lost, the consumer stops getting messages after re-established.

The logs show the consumer reconnecting every 5secs, Rabbit becomes reachable again, the logs stop and no new messages show up. In the UI RabbitMQ has 0 consumer for that queue.

I'm using the exact code from here (Master/Worker) but with a dict instead of task_id: https://aio-pika.readthedocs.io/en/latest/patterns.html

@mosquito Any idea?

I'm using the aio-pika 9.4.0 with Python 3.10.

Darsstar commented 3 months ago

This might be fixed by #622 in 9.4.1. It sounds similar enough to what I encountered debugging #615.