Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Exception swallowing in AmqpProtocol.run #90

Open shturman opened 8 years ago

shturman commented 8 years ago

aioamqp 0.7.0 RabbitMQ 3.6.1

Got an exception:

Traceback (most recent call last): File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 262, in run yield from self.dispatch_frame() File "/usr/local/lib/python3.5/dist-packages/aioamqp/protocol.py", line 217, in dispatch_frame yield from channel.dispatch_frame(frame) File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 111, in dispatch_frame yield from methods(frame.class_id, frame.method_id) File "/usr/lib/python3.5/asyncio/coroutines.py", line 200, in coro res = func(_args, *_kw) File "/usr/local/lib/python3.5/dist-packages/aioamqp/channel.py", line 665, in server_basic_cancel consumer_tag = frame.arguments['consumer_tag'] KeyError: 'consumer_tag'

And the client has stopped consuming messages after it. I don't see any possibility to handle such situation due to exception swallowed in AmqpProtocol.run.

dzen commented 8 years ago

Hello,

We clearly want to tackle the problem with having an exception in the Task AmqpProtocol.run and give a pleasant solution to know what happened.

Could you please let us know how you initialized your queues and consumers ?

shturman commented 8 years ago
     transport, protocol = await aioamqp.connect(
            host='host here',
            port=5672,
            login='login',
            password='password',
            virtualhost='vhost',
            on_error=self.disconnect,
            heartbeat=15)

    channel = await protocol.channel()
    await channel.exchange('exchange', 'direct', durable=True)
    await channel.queue(
        queue_name='messages',
        durable=True,
        arguments={
            'x-dead-letter-exchange': 'dead-exchange',
            'x-message-ttl': 1200000}
    )
    await channel.basic_qos(prefetch_count=300, connection_global=False)
    await channel.queue_bind('messages', 'exchange', 'consume.messages')

    await channel.basic_consume(message_processor, queue_name='messages')
dzen commented 8 years ago

I think will see this later this week. We didn't really have python 3.5 for the moment. Do you have the same behavior when running the examples ?

shturman commented 8 years ago

Everything works well until exception like above. It is an exceptional situation I would say.

opedge commented 8 years ago

I have the same issue (sometimes) in our production. Do you have any workaround?

RemiCardona commented 8 years ago

There are no workaround for now, and this is very unfortunate. I've come up with this issue myself and I'll see if I can't improve things somewhat. I'll report back if/when I find something.

Cheers

adamhooper commented 3 years ago

My workaround is to subclass and pass this as the protocol_factory:

class AioamqpProtocolWithIssue90Solved(aioamqp.protocol.AmqpProtocol):
    async def run(self):
        # rewrite of aioamqp.AmqpProtocol.run() to nix the exception catch-all
        while not self.stop_now.done():
            try:
                await self.dispatch_frame()
            except AmqpClosedConnection as exc:
                aioamqp.protocol.logger.info("Close connection")
                self.stop_now.set_result(None)

                self._close_channels(exception=exc)
            # except Exception:
            #     logger.exception('error on dispatch')

Why does this catch-all exception handler exist? What errors does it catch that it should catch?