Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

[question]: Reconnection aioamqp #65

Closed zloidemon closed 8 years ago

zloidemon commented 8 years ago

Hello,

How to correct handle event and reconnect to the server? (I don't see callbacks on_connect/on_disconnect/etc like this.

dzen commented 8 years ago

Hello @zloidemon.

There is two things:

The protocol.heartbeat() method will raise if the heartbeat timeout is reached. https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/protocol.py#L265 https://www.rabbitmq.com/heartbeats.html

The on_error parameter, from the Protocol() instance is called when the connexion is closed: https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/protocol.py#L46

the hearbeat() will be a quicker way to discover TCP timeout btw.

dzen commented 8 years ago

@zloidemon I think we should provide some docs on this. Did you tried any solution ?

RemiCardona commented 8 years ago

As of today's master, the best way to monitor the connection is to watch one of the following attributes and methods on the AmqpProtocol:

Most of these are redundant with one another so they will probably be pruned at some point.

As for the heartbeat, as per issue #96, it is now fully handled by the protocol class, the heartbeat() coroutine is useless and deprecated.

Hope that clears things up a bit.

Cheers

kerma commented 7 years ago

Hello!

What happened with the idea to provide some docs on this? I've been looking into different ways how to tackle it and so far came up with nothing very elegant.

The problem is relevant however. Take an example webapp, which needs: a) Fail on startup while the connection to amqp cannot be established b) If connection closes after startup, reconnect in the background

This seems to work:

import aioamqp
import asyncio
import logging
import sys

logging.basicConfig(level=logging.DEBUG)

async def connect(reconnect: bool=False):

    async def on_error_callback(exception):
        logging.debug('on_error_callback: %s', str(exception))
        await asyncio.sleep(2)
        await connect(reconnect=True)

    try:
        transport, conn = await aioamqp.connect(
            on_error=on_error_callback,
        )
    except aioamqp.AmqpClosedConnection:
        logging.debug("AmqpClosedConnection, will call on_error")
    except OSError as e:
        if reconnect:
            logging.warning(str(e))
            await asyncio.sleep(2)
            await connect(reconnect=True)
        else:
            raise e

loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(
        connect()
    )
except OSError as e:
    logging.error(e)
    sys.exit(1)

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
except Exception as e:
    logging.error(e)
finally:
    pass  # teardown

loop.close()

The on_error= and OSError mix is confusing and takes quite some time to figure out. Should I have a background task checking is_open instead, effectively creating my own "heartbeat". Or is there a way to leverage the protocol heartbeat somehow ?

RemiCardona commented 7 years ago

I wouldn't use the on_error callback. It's error-prone and confusing. With recent versions, it can be called twice for a single disconnection, and I honestly wouldn't be surprised if it could be called 3 times or more in some weird edge cases.

Here's a snippet from our own production code to handle reconnection:

    @asyncio.coroutine
    def main_loop(self):
        try:
            while self.keep_running():
                yield from self.connect()
                yield from self.amqp_protocol.wait_closed()
                yield from self.disconnect()
        finally:
            yield from self.disconnect()

The connect() and disconnect() method mostly call aioamqp.connect() and amqp_protocol.close() (we still have some code to make sure the transport is closed too but it's mostly redundant with recent changes in aioamqp).

wait_closed() is the important bit here, and that's what this code relies on to get notified when the connection goes down. As long as you enable heartbeats (non-zero value), you should see connections losses through this.

Hope this helps. Cheers.