mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Use case: Robustly connecting to multiple RabbitMQ brokers with three queues each #579

Closed dbartenstein closed 9 months ago

dbartenstein commented 9 months ago

Hi aio-pika community! :wave:

We are having one push queue listener running as daemon, which connects to multiple (~10 currently) RabbitMQ brokers with three queues each. So ~30 queues currently and growing ...

Our current implementation using aio-pika has been running for more than a year now and works nicely, but does not seem to be able to handle reconnects, e.g. when RabbitMQ brokers are restarted. Some recent messages after which no reconnection happened:

Unexpected connection close from remote "amqps://zemtu_xyz_mq:******@api.abc.com:5671/3n3c3b9rjy9g", Connection.Close(reply_code=320, reply_text='CONNECTION_FORCED - Closed via management plugin')
NoneType: None
Unexpected connection close from remote "amqps://zemtu_mq:******@api.abc.com:5671/8hea4wa2gjv7", Connection.Close(reply_code=320, reply_text='CONNECTION_FORCED - Closed via management plugin')
NoneType: None

I browsed through the documentation (e.g. https://aio-pika.readthedocs.io/en/latest/quick-start.html#asynchronous-message-processing) but could not find a matching use case.

Core questions

Let me share some code fragments: main-loop of the listener:

def main():
    consumer: Consumer = Consumer()
    loop: AbstractEventLoop = asyncio.get_running_loop()
    connections = loop.run_until_complete(consumer.start(loop))
    try:
        self.log_info("Push queue listener up and running.")
        loop.run_forever()
    finally:
        self.log_error("Halting push queue listener.")
        for connection in connections:
            self.log_info(f"Close connection “{connection}”.")
            loop.run_until_complete(connection.close())
        loop.close()

Consumer-class connecting to multiple RabbitMQ brokers and three queues each:

class Consumer:
    def __init__(self):
        self.handlers: dict[str, MessageHandler] = {
            "events": EventMessageHandler(),
            "heartbeats": HeartbeatMessageHandler(),
            "trackings": TrackingMessageHandler(),
        }
        self.push_queues: list[PushQueueData] = self.get_push_queues()
        self.connections: list[AbstractRobustConnection] = []

    async def start(self, loop: AbstractEventLoop) -> Iterable[AbstractRobustConnection]:
        """
        Start consuming all configured push queues.
        """
        self._setup_pika_params(loop)
        logger.info("Connect to push queues.")

        for push_queue_data in self.push_queues:
            # Initialize connection
            try:
                connection = await self.consume_queues_for_account(push_queue_data)
            except (AMQPError, OSError) as error:
                logger.warning(error)
            except Exception as error:
                # "Catch-all" to prevent that processing of next push queues is skipped!
                logger.error(error)
            else:
                if connection:
                    self.connections.append(connection)

        if not len(self.push_queues):
            logger.exception("No push queues configured! Aborting!")
            sys.exit()

        return self.connections

    def _setup_pika_params(self, loop: AbstractEventLoop):
        self.PIKA_PARAMS: dict = {"ssl": True, "loop": loop}

    async def consume_queues_for_account(self, push_queue_data: PushQueueData) -> AbstractRobustConnection | None:
        """
        Consume push queues of given account.
        """
        logger.info(f"Account with label “{push_queue_data.label}”:")
        config = push_queue_data.connection_parameters | self.PIKA_PARAMS
        connection = await aio_pika.connect_robust(**config)

        # Create channel
        logger.info("[*] Create channel.", org=push_queue_data.org)
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        # Declare queues & start consuming
        for i, queue_name in enumerate(self.handlers):
            logger.info(f"[{i+1}] Declare queue “{queue_name}”.", org=push_queue_data.org)
            queue = await channel.declare_queue(queue_name, passive=True)
            handler: MessageHandler = self.handlers[queue_name]
            logger.info(
                f"[+] Start consuming from queue “{queue_name}” and assign message handler “{type(handler).__name__}”.",
                org=push_queue_data.org,
            )
            await queue.consume(handler.process_message, exclusive=True, no_ack=False)

        return connection

Thanks a lot for any hints! :bow: If helpful, I am happy to create a simplified sample for the project’s documentation for the use case "Multiple RabbitMQ broker connections with multiple queues each" :page_facing_up:

dbartenstein commented 9 months ago

It might be the missing durable=True when declaring queues! queue = await channel.declare_queue(queue_name, passive=True, durable=True)

See https://aio-pika.readthedocs.io/en/latest/apidoc.html#aio_pika.Channel.declare_queue

dbartenstein commented 9 months ago

It might be the missing durable=True when declaring queues! queue = await channel.declare_queue(queue_name, passive=True, durable=True)

See https://aio-pika.readthedocs.io/en/latest/apidoc.html#aio_pika.Channel.declare_queue

First tests on integration show that with durable=True the desired re-connection behavior is achieved! I will keep you posted.

dbartenstein commented 9 months ago

Deployed on production today. Looks good so far!