Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Can't create channel in basic_consume callback #127

Closed chemiron closed 7 years ago

chemiron commented 7 years ago

Hi, I need to create a temporary channel in the consume callback but when I call protocol.channel() method, the execution hangs. The bellow is the test code you can use to repeat the issue:

import asyncio
import aioamqp

AMQP_SETTINGS = {
    'host': 'localhost',
    'port': 5672,
}

class Test:
    proto = None

    @asyncio.coroutine
    def callback(self, channel, body, envelope, properties):
        yield from self.proto.channel()  # the problem is here, comment the line and you will see output
        print(body)

    @asyncio.coroutine
    def connect(self):
        transport, self.proto = yield from aioamqp.connect(**AMQP_SETTINGS)
        channel = yield from self.proto.channel()
        yield from channel.queue_declare(queue_name='hello')
        yield from channel.basic_consume(self.callback, queue_name='hello', no_ack=True)

    @asyncio.coroutine
    def send(self):
        channel = yield from self.proto.channel()
        yield from channel.basic_publish(
            payload='Hello World!',
            exchange_name='',
            routing_key='hello'
        )

    @asyncio.coroutine
    def execute(self):
        yield from self.connect()
        yield from self.send()
        yield from asyncio.sleep(5)

test = Test()
asyncio.get_event_loop().run_until_complete(test.execute())

Is it a bug or amqp doesn't allow create a new chennel inside the callback ?

mwfrojdman commented 7 years ago

I think it's because the callback is executed from within the frame dispatcher task, and the dispatcher deadlocks. The dispatcher reads frames from the server in a loop and dispatches them to handlers, with the basic.deliver handled by calling your callback function. Opening a new channel is synchronous: It sends channel.open to the server, and then waits for channel.open-ok to be received. The deadlock happens because the dispatcher is executing callback, which is waiting for channel.open-ok, but the dispatcher is not reading said frame because it's stuck in the callback.

One way to work around the problem would probably be to create a task from the callback so the deadlock is avoided, along these lines:

@asyncio.coroutine
def handle_message(self, channel, body, envelope, properties):
    yield from self.proto.channel()
    print(body)

@asyncio.coroutine
def callback(self, channel, body, envelope, properties):
    loop.create_task(self.handle_message(channel, body, envelope, properties))

I bet the same deadlock happens for all other synchronous operations on the connection/channel, too.

EDIT: don't wait for task to finish in callback, that prevents the fix.

chemiron commented 7 years ago

@mwfrojdman Thank you for you answer, it works for me. But I have one more question: basic_publish has a flag 'mandatory'; according to the rabbitmq documentation if the flag is set and the queue doesn't exist, server will return an unroutable message with a Return method. How can I check it with aioamqp ?

mwfrojdman commented 7 years ago

I don't think you can. The basic.return method frame will end up in Channel.dispatch_frame, which raises a NotImplementedError because the method is not in the dictionary of supported methods. The exception is caught inAmqpProtocol.run, which logs it with the message "error on dispatch".

Basic.return is a bit problematic, because it's not possible to route it back to the publisher reliably. Ack and reject have delivery-tag which makes that easy, but return doesn't, so if there's been many messages published on the channel, it's difficult to know which one of them was returned. Comparing the message body/properties might be one way.

Another complication is that if not using RabbitMQ's publisher confirm extension, you simply can't know if the message was published successfully, or is still being processed and is going to be returned.

RemiCardona commented 7 years ago

https://github.com/Polyconseil/aioamqp/issues/127#issuecomment-266693670

This is an unfortunate side-effect of aioamqp's current design. This is being worked on as part of PR #118. Any help would be much appreciated.

Thanks