Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Isolate KeyErrors raised during frame dispatch #79

Closed foolswood closed 8 years ago

foolswood commented 8 years ago

Should dispatch_frame (which may delegate to user callbacks) raise a KeyError, it isn't an unknown channel, so the log is misleading, and the exception should not be swallowed.

dzen commented 8 years ago

Hello,

When/How did you get this error ?

foolswood commented 8 years ago

Had a bug in a consume callback that happened to raise a KeyError, which caused the "unknown channel" log to occur.

dzen commented 8 years ago

Would you please add a test with a callback that raise a KeyError ?

foolswood commented 8 years ago

I can, but without reworking the test system it will pass on both branches, given that it trades a log for an internal error (which becomes a log in the tests at the moment).

dzen commented 8 years ago

The problem is that we're in an async context when consuming messages from RabbitmMQ so as when receiving anything from it. Notifying the code of an error in the callback is hard.

Maybe the error_callback should be fired ? I'm really not sure this is a good solution..

Note: the task here ( https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/protocol.py ) is not supervised. so if it stop we will be stuck. The exception catched here ( https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/protocol.py#L261 ) should stop everything isn't it ?

foolswood commented 8 years ago

If the user callback fails to handle a message, that is bad, but I wouldn't have thought it was terminal for the whole server (or even necessarily subsequent consumes from the same queue).

foolswood commented 8 years ago

This makes it consistent with the behaviour for all errors that aren't KeyError. Deciding what the behaviour should be is a slightly bigger task.

foolswood commented 8 years ago

I am happy to follow up with a pull request addressing this issue once consensus is reached.

dzen commented 8 years ago

Hello @foolswood,

There are two ideas that came up from this PR:

I've still two interrogations:

dzen commented 8 years ago

@foolswood I edited this comment twice. @mastak any idea ?

mastak commented 8 years ago

Hello,

As for me the idea of a Future as a result of basic_consume not quite good. What will happen to consumer_tag (basic_consume generate and return consumer_tag)? Also main mission of the basic_consumeset callback of new messages. And if basic_consume raise some exception, it must be part of basic_consume process: set callback (but not part of user callback).

So, in my opinion, error callback/exception handler would be better.

PS asyncio has exception_handler too :) https://docs.python.org/3/library/asyncio-eventloop.html#error-handling-api

dzen commented 8 years ago

Yes, but the exception will be raised in the callback and then in the basic_deliver process, not when registering the callback to the queue using basic_consume.

dzen commented 8 years ago

I'm still not convinced by the callback parameter.

One of the first issue I addressed was the consumtion API: https://github.com/Polyconseil/aioamqp/issues/2

In my opinion, callback are not really a good design, and asyncio is built to use futures, or task, to have an iterative program, and avoid the use of callbacks.

On its first versions, I tried to use an asyncio.Queue to address the issue. ( https://github.com/Polyconseil/aioamqp/blob/32d60abebd5c232595d4eb2827434cb6e10a2f16/aioamqp/channel.py#L359 )

What is the API we WANT to use in the client code ? we probably should start there.

Do we really want to have a callback, or an API like:


    class Message():
        #body, envelope, properties

    @asyncio.coroutine
    def unqueue(consumer):
        # A receive should be an iterable
        print(receiver.consumer_tag)
        for message in consumer:
            try:
                process(message)
            except WhatEver:
                yield from consumer.channel.basic_cancel()

    @asyncio.coroutine
    def initialize()
        transport, protocol = yield from aioamqp.connect()
        channel = yield from protocol.channel()

        yield from channel.queue_declare(queue_name='hello')

        consumer = yield from channel.basic_consume(queue_name='hello')

        asyncio.async(unqueue(consumer))

The problem of this kind of API is that we will increase the logic/complexity in the lib which will probably have to use asyncio synchronization methods to not read all messages from the current channel and wait until we unqueues this message.

foolswood commented 8 years ago

I fiddled with a similar scheme in one of my toy projects, it ended up being quite painful as asyncio is already using the generator mechanisms and you ended up with a generator that yielded coroutines.

It also increases the complexity in the lib user code, you end up with consumes looking (from their perspective) like get() only more complex to set up.

foolswood commented 8 years ago

Given that this doesn't change the more general error handling situation, could we fix this and come back to the wider point?

dzen commented 8 years ago

Yes we have to!

On Fri, Mar 4, 2016 at 10:21 AM, David Honour notifications@github.com wrote:

Given that this doesn't change the more general error handling situation, could we fix this and come back to the wider point?

— Reply to this email directly or view it on GitHub https://github.com/Polyconseil/aioamqp/pull/79#issuecomment-192201032.

Benoît CALVEZ Polyconseil | 26 rue de Berri | 75008 Paris

dzen commented 8 years ago

Merged ! with https://github.com/Polyconseil/aioamqp/commit/bae422c2e77fbfab43520b61ff0bb39decb3e2d9