mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Async processing apparently not async #538

Closed laithalissa closed 1 year ago

laithalissa commented 1 year ago

We're trying to use aio-pika for the first time, we've adapted some code which previously read from GCP pubsub to use rabbitmq with this library.

The code attempting to read a batch of 12 messages from a rabbitmq queue, and process each message in parallel.

When running the code, the logs suggest the messages are being processed in sequence.

I've checked that the Prefetch count on rabbit's admin page for the queue is correctly reported as 12.

This codebase is stuck on an older version of python, so we have to use a slightly outdated way of declaring an aio loop (see below).

Any help debugging this would be greatly appreciated.

Our version of aio-pika is

aio-pika==7.2.0
aiormq==6.2.3

Here's what our code looks like, with the non-relevant stuff removed

async def callback(message: aio_pika.IncomingMessage) -> None:
    # Do stuff
    await message.ack()

async def on_message(message: aio_pika.IncomingMessage):
    try:
        return await callback(message)
    except Exception:
        logger.exception("Unhandled exception")
        await message.nack(requeue=False)

async def pubsub_server() -> None:
    logger.info(
        "Establishing connection to AMQP server: %s:%s",
        config.AMQP_HOST, config.AMQP_PORT
    )
    connection = await aio_pika.connect(
        host=config.AMQP_HOST,
        port=config.AMQP_PORT,
        login=config.AMQP_USER,
        password=config.AMQP_PASS,
    )

    async with connection:
        channel = await connection.channel()
        logger.info(
            "Setting channel prefetch count to %d with max processing time %d",
            config.MAX_MESSAGES, config.MAX_PROCESSING_TIME_SECONDS
        )

        # MAX_MESSAGES is 12
        await channel.set_qos(prefetch_count=config.MAX_MESSAGES)

        logger.info("Establishing connection to queue: %s", config.TASK_QUEUE)
        queue = await channel.get_queue(config.TASK_QUEUE)
        await queue.consume(
            callback=on_message,
            timeout=config.MAX_PROCESSING_TIME_SECONDS
        )

        logger.critical("Stopped listening for messages")
        await asyncio.Future()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(pubsub_server())
mosquito commented 1 year ago

@laithalissa hello there, the fixed code bellow.

import asyncio
import logging
from types import SimpleNamespace

import aio_pika

config = SimpleNamespace()
config.AMQP_HOST = "localhost"
config.AMQP_PORT = 5672
config.AMQP_USER = "guest"
config.AMQP_PASS = "guest"
config.TASK_QUEUE = "tasks"
config.MAX_MESSAGES = 12
config.MAX_PROCESSING_TIME_SECONDS = 10

async def on_message(message: aio_pika.abc.AbstractIncomingMessage):
    async with message.process():
        logging.info("New message: %s", message.message_id)
        await asyncio.sleep(5)

async def subscriber() -> None:
    logging.info(
        "Establishing connection to AMQP server: %s:%s",
        config.AMQP_HOST, config.AMQP_PORT
    )
    connection = await aio_pika.connect(
        host=config.AMQP_HOST,
        port=config.AMQP_PORT,
        login=config.AMQP_USER,
        password=config.AMQP_PASS,
    )

    async with connection:
        channel = await connection.channel()
        logging.info(
            "Setting channel prefetch count to %d with max processing time %d",
            config.MAX_MESSAGES, config.MAX_PROCESSING_TIME_SECONDS
        )

        # MAX_MESSAGES is 12
        await channel.set_qos(prefetch_count=config.MAX_MESSAGES)

        logging.info("Subscribing to the queue: %s", config.TASK_QUEUE)

        # The reason for your problem is that you are using the get_queue
        # method, the behavior of which you don't understand.
        # It is worth noting that it is not very well described in the
        # documentation, so there is nothing to blame you.
        # In short, my advice is to avoid using this method unless you 
        # **really need it**.
        queue = await channel.get_queue(config.TASK_QUEUE, ensure=True)
        logging.critical("Consume done")

        await queue.consume(
            callback=on_message,
            timeout=config.MAX_PROCESSING_TIME_SECONDS
        )

        logging.critical("Stopped listening for messages")
        await asyncio.Future()

async def publisher() -> None:
    connection = await aio_pika.connect(
        host=config.AMQP_HOST,
        port=config.AMQP_PORT,
        login=config.AMQP_USER,
        password=config.AMQP_PASS,
    )

    logging.info("Start publishing")

    async with connection:
        channel = await connection.channel()
        for i in range(24):
            await channel.default_exchange.publish(
                aio_pika.Message(str(i).encode()),
                routing_key=config.TASK_QUEUE
            )
        logging.info("Publish done")

async def main():
    logging.basicConfig(level=logging.INFO)
    await asyncio.gather(subscriber(), publisher())

if __name__ == "__main__":
    asyncio.run(main())

My opinion you should use queue = await channel.declare_queue(config.TASK_QUEUE) instead channel.get_queue or keep in ming what the ensure flag really do.

laithalissa commented 1 year ago

Thank you very much for you quick reply, it's much appreciated.

Your code sample led me to discover the root cause. A developer had introduced a blocking call in the callback coroutine which was obviously stopping consume from handling a second message.

I solved it by converting the offending line to await loop.run_in_executor(...).

Thank you also for your advice on declaring the queue. I wanted to ensure the core pipeline existed in rabbitmq to make the developer's lives easier. You're right that it's more standard to declare the queue in consumer code, but I didn't think there was really a difference between declare and create, if the queue was configured with the same settings. I'll see about using declare_queue with passive=True instead.