mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Exceptions in async callbacks in Connection.close_callbacks are ignored #629

Open OriHizkiaho opened 1 month ago

OriHizkiaho commented 1 month ago

I have the following code:

import aio_pika
import asyncio

async def close_callback(sender, exc: Exception):
    raise Exception("Error")

async def create_consumer():
    connection = await aio_pika.connect_robust("amqp://user:bitnami@localhost/")
    connection.close_callbacks.add(close_callback)
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=1)
    queue = await channel.declare_queue(name="queue-name")
    exchange = await channel.declare_exchange(name="exchange-name")
    await queue.bind(exchange, routing_key="#")
    return queue

async def handle_message(message: aio_pika.abc.AbstractIncomingMessage):
    async with message.process():
        print("Received a message")

async def start():
    queue = await create_consumer()
    await queue.consume(handle_message)
    await asyncio.sleep(1000)

if __name__ == "__main__":
    asyncio.run(start())

When I stop my RabbitMQ container, I expect the code to exit with an exception, because my close_callback raises one. Instead the exception is silenced, and the code recovers when I run the container back.

I think it happens because CallbackCollection.__call__ adds all the async callbacks to futures list, and runs return asyncio.gather(*futures, return_exceptions=True).

From asyncio docs: If *return_exceptions* is True, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first rasied exception will be immediately propagated to the returned future.

Connection._on_connection_close calls CallbackCollection.__call__, but does nothing with the return value, so the exceptions are ignored.