mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Question: what is the best way for recreating connection in pooling #576

Open PuppyDay opened 10 months ago

PuppyDay commented 10 months ago

For example, I have code:

class RabbitMQ:
    def __init__(self, config):
        self.config = config
        connection_pool = Pool(self.__get_connection, max_size=config['connection_pool_size'])
        self.queue_pool = Pool(self.__get_channel, connection_pool, max_size=config['channel_pool_size'])
        self.pickle_protocol = config['pickle_protocol']
        self.timeout = config['timeout']

    async def __get_connection(self):
        return await aio_pika.connect_robust(
            'amqp://{user}:{password}@{host}:{port}/'.format(**self.config)
        )

    async def __get_channel(self, connection_pool):
        async with connection_pool.acquire() as connection:
            channel = await connection.channel()
            await channel.set_qos(self.config['channel_pool_size'])

            exchanges = [
                await channel.declare_exchange(
                    exchange,
                    type=aio_pika.ExchangeType.DIRECT,
                    durable=True,
                    arguments=arguments,
                    passive=True,
                )
                for exchange, arguments in self.config['exchanges'].items()
            ]

            queues = [
                await channel.declare_queue(
                    queue,
                    durable=True,
                    arguments=arguments,
                    passive=True,
                )
                for queue, arguments in self.config['queues'].items()
            ]

            for exchange, queue in zip(exchanges, queues):
                await queue.bind(exchange)

            return list(zip(exchanges, queues))

    async def put(self, data):
        logger.debug('Will put next data into RabbitMQ: %s', data)
        async with self.queue_pool.acquire() as queue_data:
            exchange, queue = queue_data[0]
            await exchange.publish(
                    aio_pika.Message(
                        body=pickle.dumps(data, protocol=self.pickle_protocol)
                    ),
                    queue.name,
            )

I receive error in exchange.publish(..). Firstly it is Error ConnectionResetError(104, 'Connection reset by peer'), than it is constantly error RuntimeError('<Channel: "4"> closed') (the number in channel name can change). My guess is that this is because the closed connection remains in the pool and is not re-created. What is the best way delete or recreate connection/channel in pool? Or maybe there is another reason for this error?