Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Publish hangs with 'publisher confirms' enabled #149

Open greatvovan opened 6 years ago

greatvovan commented 6 years ago

My service reads messages from one queue and outputs processed messages to another exchange. I want to ack incoming message only after successful publishing of outgoing message. It worked well untill I met some issues with lost messages and I decided to enable 'publisher confirms' feature. After this my program started to hang forever on publish(). Here is the simple code to illustrate problem.

import asyncio
import aioamqp
from aioamqp.channel import Channel, Envelope

class RabbitClient:
    def __init__(self):
        self.consumer = None # type: Channel
        self.publisher = None  # type: Channel
        self.exchange_out = 'exchange_out'
        self.queue_out = 'queue_out'
        self.queue_in = 'queue_in'
        self.rabbit_url = 'amqp://rabbit'

    async def connect(self):
        # Connection
        _, protocol = await aioamqp.from_url(self.rabbit_url)
        self.consumer = await protocol.channel()
        self.publisher = await protocol.channel()

        # Enable publisher confirms
        await self.publisher.confirm_select()

        # Output
        await self.publisher.exchange(self.exchange_out, 'fanout', auto_delete=False,
                                      passive=False, durable=True)
        await self.publisher.queue(self.queue_out, durable=True, auto_delete=False)
        await self.publisher.queue_bind(exchange_name=self.exchange_out,
                                        queue_name=self.queue_out, routing_key='')

        # Input
        await self.consumer.queue(self.queue_in, durable=True, auto_delete=False)
        await self.consumer.basic_qos(prefetch_count=1, prefetch_size=0, connection_global=False)
        print('Connected')

    async def publish_message(self, msg: str):
        await self.consumer.publish(msg, '', self.queue_in)  # works here
        print('Message published: ' + msg)

    async def start_consuming(self):
        await self.consumer.basic_consume(queue_name=self.queue_in, callback=self.on_message)
        print('Consumer started')

    async def on_message(self, channel: Channel, payload: bytes, envelope: Envelope, _):
        await self.publish_and_ack(payload, envelope.delivery_tag)

    async def publish_and_ack(self, payload: bytes, delivery_tag: str):
        try:
            s = payload.decode('utf-8')
            print(f"Got message: '{s}', publishing to out exchange...")
            await self.publisher.publish(s + ' processed', self.exchange_out, '')   # HANGS HERE!
            print('Sending ack')
            await self.consumer.basic_client_ack(delivery_tag)
            print('Done')
        except:
            await self.consumer.basic_client_nack(delivery_tag)

if __name__ == '__main__':
    client = RabbitClient()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(client.connect())
    loop.run_until_complete(client.publish_message('test1'))
    loop.run_until_complete(client.publish_message('test2'))
    loop.run_until_complete(client.start_consuming())
    loop.run_forever()

I tried to play with it and found the following:

It seems really weird. Am I doing everything correctly? What is the best practice of using channels with this lib?

mwfrojdman commented 6 years ago

There's an asyncio task running the dispatcher that reads messages from the server and calls handler methods; Channel.basic_deliver is called to deliver messages to your consumer callback (https://github.com/Polyconseil/aioamqp/blob/cbb137920b000db3d99adfa986ad9a7dc9b58876/aioamqp/channel.py#L641). The problem is the callback (your on_message function) is executed in the same task as the dispatcher, so while the callback hasn't returned, no messages will be handled from the server.

Confirm select changes the messaging with the server so for the basic.public messages you send (by calling Channel.publish), the server replies with an ack message. Publish does not return until it has received that ack. So what happens is that Channel.dispatch_frame is calling Channel.basic_deliver, which is stuck waiting for on_message to return, but on_message is calling publish_and_ack which waits for and ack for the server, but Channel.dispatch_frame is stuck and will cannot read the next message.

If I'm reading the code correctly, having another channel should not help if confirm_select() is used, because the connection's main dispatcher just delegates to the channels, and doesn't process the next message from the server until the channel method returns. The publisher confirms are channel-specific, so it might be that it works because there's not actually any confirms used on the channel.

One workaround is to call loop.create_task for publish_and_ack(), which allows the library's dispatcher to process the next messages. But then you have to clean up those tasks once they finish somewhere else to avoid warnings.

greatvovan commented 6 years ago

If I'm reading the code correctly, having another channel should not help if confirm_select() is used, because the connection's main dispatcher just delegates to the channels

But it does help. Tried to set up yet another (third) channel, it works with it too. To sum up:

Looks like it does not works only in that channel which was used to call confirm_select().

One workaround is to call loop.create_task for publish_and_ack()

Yes, I mentioned this at second point in the initial post. I don't like it and at the same time another workaround (enabling confirms in a separate channel) work pretty well. Are there any pros to use your workaround, namely?