Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Close callback or on error handler for protocol #77

Closed mastak closed 8 years ago

mastak commented 8 years ago

When server closed connection unexpectedly, my script (consumer) was working and do nothing (run_forever).

And all what I can do: run some function after 'basic_consume', which would be periodically check connection state. I think it wrong way for async scripts. :)

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 255, in run
    yield from self.dispatch_frame()
  File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 210, in dispatch_frame
    yield from self.channels[frame.channel].dispatch_frame(frame)
  File "/usr/local/lib/python3.5/site-packages/aioamqp/channel.py", line 110, in dispatch_frame
    yield from methods[(frame.class_id, frame.method_id)](frame)
  File "/usr/local/lib/python3.5/site-packages/aioamqp/channel.py", line 718, in basic_deliver
    content_body_frame = yield from self.protocol.get_frame()
  File "/usr/local/lib/python3.5/site-packages/aioamqp/protocol.py", line 183, in get_frame
    yield from frame.read_frame()
  File "/usr/local/lib/python3.5/site-packages/aioamqp/frame.py", line 434, in read_frame
    payload_data = yield from self.reader.readexactly(self.frame_length)
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 509, in readexactly
    block = yield from self.read(n)
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 482, in read
    yield from self._wait_for_data('read')
  File "/usr/local/lib/python3.5/asyncio/streams.py", line 423, in _wait_for_data
    yield from self._waiter
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 358, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/local/lib/python3.5/asyncio/tasks.py", line 290, in _wakeup
    future.result()
  File "/usr/local/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/asyncio/selector_events.py", line 702, in write
    n = self._sock.send(data)
ConnectionResetError: [Errno 104] Connection reset by peer
2016-02-12 19:07:02 71897a856041 aioamqp.protocol[1] INFO Close connection

As option we can add some callback/handler in Exception case protocol.py

    @asyncio.coroutine
    def run(self):
        while not self.stop_now.done():
            try:
                yield from self.dispatch_frame()
            except exceptions.AmqpClosedConnection as exc:
                logger.info("Close connection")
                self.stop_now.set_result(None)

                self._close_channels(exception=exc)
            except Exception:
                logger.exception('error on dispatch')
mastak commented 8 years ago

_on_error_callback is executed in another place, sorry it's my fail)