mosquito / aiormq

Pure python AMQP 0.9.1 asynchronous client library
Other
259 stars 56 forks source link

Basic.Cancel sent from the broker is unactionable on user end #116

Open hpointu opened 2 years ago

hpointu commented 2 years ago

When the server sends a Basic.Cancel notification, aiormq simply removes the consumer tag from its consumer list.

This leaves the end-user of the lib with no way of knowing it'll not receive messages any more, although the server explicitly said so.

Shouldn't we expose this notification to the end user? That way the end user can react to this and stop listening.

martin-schulze-e2m commented 10 months ago

Just fell into this trap. My code follows pretty much aio-pika's asynchronous consumer example. When it gets a Basic.Cancel frame for its consume queue, it stops receiving messages without any warning. Ideally, I'd like to restart the service but have no way of telling when this happens.

Maybe we can simply forward the "message" to the consume callback by changing https://github.com/mosquito/aiormq/blob/eb1ebe5d1381f551a10cb99d07a95395e59c7cc3/aiormq/channel.py#L425 to should_add_to_rpc=True? However, that will likely be a breaking change.

I could also see an optional, specific on_cancelled callback that can be provided to basic_consume.

Update

For now I work around that with async polling on the channel's consumer list:

    async with connection:
        channel: aio_pika.abc.AbstractChannel = await connection.channel()
        queue: aio_pika.abc.AbstractQueue = await channel.declare_queue(config.rmq.queue)
        consumer_tag = await queue.consume(process_message, timeout=5)
        async def wait_for_cancel(polling_interval=5):
            """
            Check every polling_interval seconds if our consumer is still scheduled for receiving messages.

            Work around https://github.com/mosquito/aiormq/issues/116
            """
            if isinstance(channel.channel, aiormq.Channel):
                while True:
                    await asyncio.sleep(polling_interval)
                    if consumer_tag not in channel.channel.consumers:
                        logger.warning(
                            f"Consumer tag {consumer_tag} not found in channel.consumers."
                            f" Probably was cancelled due to queue deletion."
                        )
                        break
            else:
                raise ValueError(
                    f"Can only work with aiormq.Channel channels, not {channel.channel.__class__}!"
                )

        try:
            # wait until cancelled from outside
            await wait_for_cancel()
        finally:
            logger.info("queue.consume() stopping")