Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

"error on dispatch" CancelledError when using protocol.close() #133

Open eLvErDe opened 7 years ago

eLvErDe commented 7 years ago

Hello,

I might be doing something wrong but I think there might be a problem with dispatch_frame.

Here is what I receive when I my wrapping coroutine gets cancelled it tries to close/wait the AMQ connection by doing:

try:
    forever_running_code()
except asyncio.CancelledError:
    self.logger.info('Closing connection')
    yield self.connection.close(no_wait=False)

I receive the following error:

2017-03-02 17:14:57,184 ERROR    [aioamqp.protocol] error on dispatch
Traceback (most recent call last):
  File "/usr/lib/python3.4/site-packages/aioamqp/protocol.py", line 284, in run
    yield from self.dispatch_frame()
  File "/usr/lib/python3.4/site-packages/aioamqp/protocol.py", line 231, in dispatch_frame
    frame = yield from self.get_frame()
  File "/usr/lib/python3.4/site-packages/aioamqp/protocol.py", line 215, in get_frame
    yield from frame.read_frame()
  File "/usr/lib/python3.4/site-packages/aioamqp/frame.py", line 412, in read_frame
    data = yield from self.reader.readexactly(7)
  File "/usr/lib64/python3.4/asyncio/streams.py", line 659, in readexactly
    block = yield from self.read(n)
  File "/usr/lib64/python3.4/asyncio/streams.py", line 617, in read
    yield from self._wait_for_data('read')
  File "/usr/lib64/python3.4/asyncio/streams.py", line 451, in _wait_for_data
    yield from self._waiter
  File "/usr/lib64/python3.4/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib64/python3.4/asyncio/tasks.py", line 297, in _wakeup
    future.result()
  File "/usr/lib64/python3.4/asyncio/futures.py", line 266, in result
    raise CancelledError
concurrent.futures._base.CancelledError

Thanks in advance for your help!

Best regards, Adam.

RemiCardona commented 7 years ago

ATM we don't really guard against being cancelled, but maybe we should. I haven't really hit that particular problem. And I'm not quite sure what it would look like on aioamqp's side. I'm not sure I want to force a particular semantic for cancellation on anyone (ie, "shut everything down right now" or "try to close the connection nicely").

What I do in a production code is to run the amqp bits in a separate task, and coordinate cancellation through asyncio.Event objects. This way, the core amqp code never gets cancelled directly.

I'll leave this open so that we can think about this issue a bit more once we tackle the redesign project from #118 and other PRs.

eLvErDe commented 7 years ago

Thanks for the quick answer.

Any hope to get an example or syncing coroutines with events? Its way beyong m'y poor asyncio skills ;-) My wrapping class has an homemade close method that kills the coroutine dealing with aiompq connection with à cancel signal. I guess I can fix that the way yo do ?

Regards, Adam

EliRibble commented 7 years ago

I hit the same error and ended up just subclassing the Protocol to dodge the problem. The major issue for us is that we have sentry in place which emits errors any time the code posts a log message to an error log. Here's the dodge I used:

import aioamqp
import aioamqp.exceptions
import aioamqp.protocol

LOGGER = logging.getLogger(__name__)

class ProtocolSafe(aioamqp.protocol.AmqpProtocol):
    """
    This class exists to sidestep an issue
    https://github.com/Polyconseil/aioamqp/issues/115
    which we are plagued with in production
    """
    @asyncio.coroutine
    def read_frame(self):
        try:
            super().read_frame(self)
        except AttributeError:
            LOGGER.warning("Translating '%s' into a closed connection")
            raise aioamqp.exceptions.AmqpClosedConnection('Inferred from AttributeError')

...
yield from aioamqp.connect(..., protocol_factory=ProtocolSafe)
RemiCardona commented 7 years ago

@EliRibble AttributeError??? that sounds like a whole other issue. Got a stack trace by any chance?

Thanks

EliRibble commented 7 years ago

You're right, my mistake. The exception handler is the same but the root cause is different. I'll start a separate issue with details on it

remort commented 5 years ago

Reopen it please, 133 is closed but the problem is still here. I got:

ERROR:aioamqp.protocol:error on dispatch
Traceback (most recent call last):
  File "/home/remort/.local/share/virtualenvs/whiteboard-server-RT9QNbYR/lib/python3.7/site-packages/aioamqp/protocol.py", line 333, in run
    yield from self.dispatch_frame()
  File "/home/remort/.local/share/virtualenvs/whiteboard-server-RT9QNbYR/lib/python3.7/site-packages/aioamqp/protocol.py", line 280, in dispatch_frame
    frame = yield from self.get_frame()
  File "/home/remort/.local/share/virtualenvs/whiteboard-server-RT9QNbYR/lib/python3.7/site-packages/aioamqp/protocol.py", line 264, in get_frame
    yield from frame.read_frame()
  File "/home/remort/.local/share/virtualenvs/whiteboard-server-RT9QNbYR/lib/python3.7/site-packages/aioamqp/frame.py", line 453, in read_frame
    data = yield from self.reader.readexactly(7)
  File "/usr/lib/python3.7/asyncio/streams.py", line 679, in readexactly
    await self._wait_for_data('readexactly')
  File "/usr/lib/python3.7/asyncio/streams.py", line 473, in _wait_for_data
    await self._waiter
concurrent.futures._base.CancelledError

While trying to await protocol.close() or await channel.basic_cancel().