mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Publish errors while consuming don't get raised #574

Open TBoshoven opened 10 months ago

TBoshoven commented 10 months ago

RabbitMQ 3.12 Python 3.11.3 aio-pika 9.1.5 aiormq 6.7.7

When I try to publish a message using an exchange I don't have access to from a consume callback, the error is not bubbled up to me like it would under normal conditions. The channel also enters a bad state and the consumer ends stuck in a loop because the message cannot be acknowledged.

Reproduction:

async with await aio_pika.connect_robust(
        host='localhost',
        virtualhost=vhost_name,
        login=user_name,
        password=password,
) as conn:
    async with conn.channel(on_return_raises=True) as channel:
        queue = await channel.get_queue('queue')

        async def consume(m: aio_pika.abc.AbstractIncomingMessage):
            try:
                await channel.default_exchange.publish(aio_pika.Message(b'message'), 'some_queue')
            finally:
                await m.ack()

        await queue.consume(consume)
        try:
            await asyncio.sleep(10)
        finally:
            await conn.close()

(publish a message to queue queue to trigger) Here, I have not given the user access to the default exchange, so I'd expect some kind of exception. I get no exception, but I do get errors on the m.ack(), causing the whole thing to loop.

If I run the publish in a separate channel, I get a proper aiormq.exceptions.ChannelAccessRefused error when awaiting the publish operation (and the ack() runs as expected).

If desired, I can provide a more complete example, including permissions. Please let me know if I'm doing something wrong (other than publishing to an exchange I don't have permissions for :grin: ).