Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

Clustered rabbitmq: having multiple hosts in connection parameters #186

Closed rrader closed 5 years ago

rrader commented 5 years ago

We have clustered rabbitmq with 3 instances running, so I'd like to pass all those instances to connection parameters of library. Is that possible? If no, how can I hook into the reconnection process to substitute next server? (if one of instances is down I don't want to keep trying to reconnect to it, I want to try another one)

Thank you!

dzen commented 5 years ago

Hello @rrader,

you may want to start digging in the connect function, or use this function in a wrapper: https://github.com/Polyconseil/aioamqp/blob/master/aioamqp/__init__.py#L15

rrader commented 5 years ago

Ended up using wrapper class like this (I removed business logic, so this code can't be used as is, it's just to show the idea)

Usage:

rmq = RMQConnectionService(app)
await rmq.connect()

# save rmq object for further usage
rmq.publish('channel1', '', 'queue1', b'some_data')

await rmq.disconnect()

Code:

class RMQConnectionService:
    def __init__(self, app):
        self._app = app

        # aioamqp objects
        self._transport = None
        self._protocol = None
        self._channels = {}

        self._is_connecting = False

    async def connect(self):
        if self._is_connecting:
            return

        try:
            self._is_connecting = True

            self._transport, self._protocol = await aioamqp.connect(
                host, port, login=user, password=password, virtualhost=vhost,
                on_error=self._on_error,
            )

            self._channels['channel1'] = await self._protocol.channel()
            # instantiate channels and save to self._channels dict

        except Exception:
            await asyncio.sleep(2)

            self._is_connecting = False
            await self.connect()
            return
        finally:
            self._is_connecting = False

    async def disconnect(self):
        assert self._protocol, self._transport

        for channel in self._channels.values():
            try:
                await channel.close()
            except aioamqp.ChannelClosed:
                pass

        try:
            await self._protocol.close()
        except aioamqp.AmqpClosedConnection:
            pass

        self._transport.close()

    async def reconnect(self):
        await self.disconnect()
        await self.connect()

    async def _on_error(self, exception):
        await self.reconnect()

    async def publish(
        self, channel_id, exchange_name, routing_key, message
    ):
        try:
            await (self._channels[channel_id]).basic_publish(
                message, exchange_name=exchange_name, routing_key=routing_key
            )
        except Exception as ex:
            await self.reconnect()
            await self.publish(
                channel_id, exchange_name, routing_key, message
            )