Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Message consumption is totally NOT async #150

Closed Skorpyon closed 7 years ago

Skorpyon commented 7 years ago

Today I was surprized, because aioamqp don't launch message callbacks in async loop.

A little explanations here:

protocol.py

@asyncio.coroutine
def run(self):
    while not self.stop_now.done():
        try:
            yield from self.dispatch_frame()   # <-- HERE WE WAIT NEXT FRAME
        except exceptions.AmqpClosedConnection as exc:
        ...

async def dispatch_frame(self, frame=None):
    """Dispatch the received frame to the corresponding handler"""
    ...
    if frame.channel is not 0:
        channel = self.channels.get(frame.channel)
            if channel is not None:
                await channel.dispatch_frame(frame)  # <-- HERE WE WAIT UNTIL CHANNEL DISPATCH FRAME
            else:
                logger.info("Unknown channel %s", frame.channel)
            return
            ...

channel.py

    @asyncio.coroutine
    def basic_deliver(self, frame):
    ...
    yield from callback(self, body, envelope, properties)  # <-- HERE FINALY WE WAIT UNTIL CALLBACK DONE

So each time, when it got frame, it launch callback and wait until it return result. If you have difficult logic in callback works with DB and do long work, like me, you will be surprized when all your system stuck in one callback and even dont receive new messages from AMQP provider.

You may easy check it with basic example code from spec, just add asyncio.sleep():

import asyncio

async def callback(channel, body, envelope, properties):
    print(body)
    await asyncio.sleep(30, loop=channel._loop)

await channel.basic_consume(callback, queue_name='hello', no_ack=True)

So, just feed hello exchange with hundred messages. You will see, how it print first body and go to sllep for 30 seconds. All next messages will be lost or delayed for unpredictable time.

For me I temporary solved it this way: protocol.py

@asyncio.coroutine
def basic_deliver(self, frame):
...
    self._loop.create_task(callback(self, body, envelope, properties))

It just create new task in loop for callback, dont wait until it finished and unlock loop for new message.

I'm not very professional in async, maybe exists more elegant way. But right now aioamqp is not usable with this issue.

Really it require deep check for same problems in sending messages and internal logic. I hope you fix it ASAP, because few my projects totally depend from your perfect library. Regards.

RemiCardona commented 7 years ago

Hi @Skorpyon,

You've hit one of the major API limitations of aioamqp as it stands today: callbacks.

Here are a few things you should know:

Until we manage to get there, creating tasks is a user responsibility to avoid blocking aioamqp's processing loop. So you'll have to call self._loop.create_task(...) from your own callback. Yes, I understand it doesn't feel right, but it is the best solution.

Hope that helps,

Cheers