mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

ConnectionAbortedError for long running operations #572

Open drajvver opened 10 months ago

drajvver commented 10 months ago

Hello

I have a problem with long running operations and rabbit. When operation takes less than ~90s, the implementation behaves as it should, acking the incoming message and taking the next one in order. But when message handler takes longer it's breaking with an error: Server connection error: ConnectionAbortedError(10053, 'An established connection was aborted by the software in your host machine', None, 10053, None)

My code looks rather simple and looks like this:

async def main(loop: asyncio.AbstractEventLoop):
    rabbit_host = os.getenv("RABBIT_HOST", "localhost")
    rabbit_login = os.getenv("RABBIT_LOGIN", "guest")
    rabbit_pass = os.getenv("RABBIT_PASSWORD", "guest")

    # 1. init rabbitmq connections
    connection = await aio_pika.connect_robust(
        f"amqp://{rabbit_login}:{rabbit_pass}@{rabbit_host}/", loop=loop
    )

    print("Waiting for messages. To exit press CTRL+C")

    async with connection:
        receive_channel: aio_pika.abc.AbstractChannel = await connection.channel()
        await receive_channel.set_qos(1)

        receive_queue: aio_pika.abc.AbstractQueue = await receive_channel.declare_queue(
            "queue_request",
            durable=True,
            arguments={"x-queue-type": "quorum", "x-delivery-limit": 5},
        )

        async with receive_queue.iterator() as queue_iter:
            # Cancel consuming after __aexit__
            async for message in queue_iter:
                print('Getting message from queue')
                if connection.is_closed:
                        print("Connection is closed, need to reconnect, reconnecting")
                        await connection.reconnect()

                async with message.process():
                    message_dict = json.loads(message.body.decode())
                    message_to_publish = handleMessage(message=message_dict) # this handler can take more than 90s, which fails the message
                    pikamsg = aio_pika.Message(body=message_to_publish.encode())

                    if connection.is_closed:
                        print("Connection is closed, need to reconnect, reconnecting")
                        await connection.reconnect()

                    publish_channel: aio_pika.abc.AbstractChannel = (
                        await connection.channel()
                    )
                    await publish_channel.default_exchange.publish(
                        message=pikamsg, routing_key="queue_response"
                    )

                    await message.ack()                    

                    print("Acked source message and published response message successfully")
                    await publish_channel.close()

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Handling fails after the first iteration, in line async with message.process(): and the first message is not acked.

As for ACK - the documentation is all over the place for that and looks like it's totally lacking information about how the iterator behaves and where to put the no_ack flag, if anywhere and with which value...

I thought that maybe adding these if connection.is_closed: with reconnect() would help, but it doesn't even get there at all, even when connection.is_closed = True

I would be super grateful for any explanation what might be wrong.