mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.24k stars 189 forks source link

Auto Reconnect Error: Future exception was never retrieved #125

Open tilsche opened 6 years ago

tilsche commented 6 years ago

I am trying to setup a robust connection like this:

#!/usr/bin/env python3
import asyncio
import logging
from aio_pika import connect_robust

logging.basicConfig(level=logging.DEBUG)

connection = None
channel = None

def handle_exception(loop, context):
    logging.error('exception in event loop: {}'.format(context['message']))
    try:
        logging.error('Future: {}'.format(context['future']))
    except KeyError:
        pass
    try:
        logging.error('Handle: {}'.format(context['handle']))
    except KeyError:
        pass
    try:
        ex = context['exception']
        logging.error('Exception: {} ({})'.format(ex, type(ex).__qualname__))
    except KeyError:
        pass

async def rmq_connect():
    global connection, channel
    connection = await connect_robust('amqp://guest:guest@127.0.0.1/')
    connection.add_reconnect_callback(lambda c: logging.info('Reconnecting to {}'.format(c)))
    channel = await connection.channel()

loop = asyncio.get_event_loop()
loop.set_exception_handler(handle_exception)
loop.run_until_complete(rmq_connect())

try:
    loop.run_forever()
except KeyboardInterrupt:
    logging.info('KeyboardInterrupt - closing connection')
finally:
    loop.run_until_complete(connection.close())

loop.close()

Basically this works. During a reconnect, there are some reported Exceptions such as

ERROR:root:exception in event loop: Task exception was never retrieved ERROR:root:Future: <Task finished coro=<RobustConnection.connect() done, defined at aio_pika/robust_connection.py:109> exception=The AMQP connection was closed: ()> ERROR:root:Exception: (ConnectionClosed)

and

ERROR:root:exception in event loop: Future exception was never retrieved ERROR:root:Future: <Future finished exception=ConnectionError('Auto Reconnect Error',)> ERROR:root:Exception: Auto Reconnect Error (ConnectionError)

But if the connection resumes, and I stop the program, then at the very end I get a lot of those

ERROR:root:exception in event loop: Future exception was never retrieved ERROR:root:Future: <Future finished exception=ConnectionError('Auto Reconnect Error',)> ERROR:root:Exception: Auto Reconnect Error (ConnectionError)

This makes me worry that there remains something unclear from the reconnection that could gather over time. Is there something to be done differently in the code with respect to error handling and cleanup?

tilsche commented 6 years ago

Looking at it a bit more in detail, these seem to be the RobustConnection._closing futures. It seems that the error is delayed by the GC, which is why it happened for me in-between and at the end.

Would it make sense to cancel _closing before assigning a new future because we don't even know if someone will ever wait for this future?

romantolkachyov commented 6 years ago

Same here. I see such error every rabbitmq restart. And it's not delayed. I don't use set_exception_handler but handle errors in place (with monitoring and checking task.exception()).

skewty commented 6 years ago

https://github.com/mosquito/aio-pika/issues/112

Offers an intermediate solution until robust_connection() is more suitable to your requirements.