mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Reconnect does not work with version 8.2.4 but works with version 8.2.3 (8.2.2, 8.2.1, 8.2.0 as well) #505

Closed smfx closed 1 year ago

smfx commented 1 year ago

Hello!

First of all, thank you for your great work! I probably found a bug with aio-pika 8.2.4 version. I'm using windows, docker image with rabbitmq, python 3.9.

Code example for reproducing:

import asyncio

import aio_pika
from aio_pika import ExchangeType
from aio_pika.abc import AbstractIncomingMessage
import aiorun

async def _consume(message: AbstractIncomingMessage) -> None:
    async with message.process(requeue=True, ignore_processed=True):
        print(
            f"[x] {message.routing_key}"
            f"[x] Decoded {message.body.decode()}"
        )

async def _check_tasks() -> None:
    while True:
        tasks = asyncio.all_tasks()
        print(f"Number of tasks: {len(tasks)}")
        for task in tasks:
            print(f"Task: {task.get_name()} {task.get_coro()}")
        print("================================")
        await asyncio.sleep(10)

async def _receive() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/"
    )
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=20)
    exchange = await channel.declare_exchange(
        "broker_publish",
        ExchangeType.TOPIC,
    )
    queue = await channel.declare_queue(
        "broker_events",
        durable=False,
        exclusive=False,
        auto_delete=False,
    )
    await queue.bind(exchange, routing_key="#")
    await queue.consume(_consume)

async def main() -> None:
    asyncio.create_task(_receive())
    asyncio.create_task(_check_tasks())

if __name__ == "__main__":
    aiorun.run(
        main(),
        stop_on_unhandled_errors=False,
    )

Steps:

1) Run the script (with aio-pika 8.2.4). You will see 7 running tasks (I think on Linux it would be 6). 2) Publish message to queue. You will see this message in terminal. 3) Restart docker container with RabbitMQ. 4) Thera are only 6 running tasks after reconnect (Channel._reader is missing) 5) Try to publish another message. No result in terminal. In RMQ GUI no consumers related to this queue.

6) Repeat steps 1-5 with aio-pika 8.2.0. Number of tasks will remain after reconnect and the script will receive message.

I can provide more information if needed. Thank you very much!

mosquito commented 1 year ago

Hi @smfx, thanks for your kind words. I tried to reproduce your issue on my MacOS and it worked for me with code from the master branch:

Number of tasks: 4
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-2 <coroutine object _receive at 0x101b97300>
Task: Task-4 <coroutine object Connection.connect at 0x101b97760>
Task: Task-1 <coroutine object main at 0x1017cd310>
================================
Number of tasks: 7
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-73 <coroutine object StreamReader.readexactly at 0x101b97680>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-21 <coroutine object Connection.__writer at 0x101b97990>
Task: Task-24 <coroutine object Channel._reader at 0x101b976f0>
Task: Task-22 <coroutine object Connection.__heartbeat at 0x101b97c30>
Task: Task-20 <coroutine object Connection.__reader at 0x101b97920>
================================
# <---- call `docker restart aio_pika_rabbitmq` here
Unexpected connection close from remote "amqp://guest:******@127.0.0.1:5672/", Connection.Close(reply_code=320, reply_text="CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'")
NoneType: None
Number of tasks: 3
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-84 <coroutine object OneShotCallback.__task_inner at 0x101c052a0>
================================
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Number of tasks: 3
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-84 <coroutine object OneShotCallback.__task_inner at 0x101c052a0>
================================
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Connection attempt to "amqp://guest:******@127.0.0.1/" failed: Server connection unexpectedly closed. Reconnecting after 5 seconds.
Number of tasks: 3
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-84 <coroutine object OneShotCallback.__task_inner at 0x101c052a0>
================================
Number of tasks: 6
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-123 <coroutine object Connection.__heartbeat at 0x101c04580>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-122 <coroutine object Connection.__writer at 0x101c060a0>
Task: Task-121 <coroutine object Connection.__reader at 0x101c06030>
Task: Task-125 <coroutine object StreamReader.readexactly at 0x101b97840>
================================
Number of tasks: 6
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-123 <coroutine object Connection.__heartbeat at 0x101c04580>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-122 <coroutine object Connection.__writer at 0x101c060a0>
Task: Task-121 <coroutine object Connection.__reader at 0x101c06030>
Task: Task-125 <coroutine object StreamReader.readexactly at 0x101b97840>
================================

To be honest, I modified your code a bit to not install aiorun.

import asyncio

import aio_pika
from aio_pika import ExchangeType
from aio_pika.abc import AbstractIncomingMessage

async def _consume(message: AbstractIncomingMessage) -> None:
    async with message.process(requeue=True, ignore_processed=True):
        print(f"[x] {message.routing_key}"
              f"[x] Decoded {message.body.decode()}")

async def _check_tasks() -> None:
    while True:
        tasks = asyncio.all_tasks()
        print(f"Number of tasks: {len(tasks)}")
        for task in tasks:
            print(f"Task: {task.get_name()} {task.get_coro()}")
        print("================================")
        await asyncio.sleep(10)

async def _receive() -> None:
    connection = await aio_pika.connect_robust()
    channel = await connection.channel()
    await channel.set_qos(prefetch_count=20)
    exchange = await channel.declare_exchange("broker_publish",
                                              ExchangeType.TOPIC)
    queue = await channel.declare_queue("broker_events", durable=False,
                                        exclusive=False, auto_delete=False)
    await queue.bind(exchange, routing_key="#")
    await queue.consume(_consume)

async def main() -> None:
    await asyncio.gather(_receive(), _check_tasks())

if __name__ == "__main__":
    asyncio.run(main())
mosquito commented 1 year ago

Maybe there is a problem only on Windows, I have nowhere to check it now, maybe it will appear later. If you manage to find out the details feel free to ping here.

smfx commented 1 year ago

Thank you for your assistant! But it seems the problem persists in your output too. You had the following tasks before reconnect:

Number of tasks: 7
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-73 <coroutine object StreamReader.readexactly at 0x101b97680>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-21 <coroutine object Connection.__writer at 0x101b97990>
Task: Task-24 <coroutine object **Channel._reader** at 0x101b976f0>
Task: Task-22 <coroutine object Connection.__heartbeat at 0x101b97c30>
Task: Task-20 <coroutine object Connection.__reader at 0x101b97920>

Pay attention too Channel._reader. After <---- call `docker restart aio_pika_rabbitmq` here there is no this task:

Number of tasks: 6
Task: Task-3 <coroutine object _check_tasks at 0x101b97370>
Task: Task-123 <coroutine object Connection.__heartbeat at 0x101c04580>
Task: Task-1 <coroutine object main at 0x1017cd310>
Task: Task-122 <coroutine object Connection.__writer at 0x101c060a0>
Task: Task-121 <coroutine object Connection.__reader at 0x101c06030>
Task: Task-125 <coroutine object StreamReader.readexactly at 0x101b97840>

So, if you publish message (after reconnect) to the exchange it wont be processed (with aio-pika 8.2.0 it would be processed). Actually, I can see the queue wont be restored after docker restart. Maybe, it is intentional behavior, but my application just stops receiving messages after docker restart with no errors :) I will try to find MacOS for testing :)

dvf commented 1 year ago

@smfx to reproduce, have you tried killing the RabbitMQ process in the Docker container?

It's likely that RabbitMQ is respecting SIGTERM which is triggered on docker restart.

mosquito commented 1 year ago

I already found the reason, the fact is that the garbage collector removes the channel object and sometimes the connection. in the above example, after the function ends, neither the channel nor the connection remains referenced.

smfx commented 1 year ago

Hi!

I looked a little bit more:

1) I reproduced this behavior on Mac. 2) the problem appears on version 8.2.4, everything is fine on 8.2.3. I can compare branches on this weekend and try to figure out how to fix it if needed. 3) I was thinking how I could test these situations (client side) so that a version update would not cause the bugs to appear. Tried to write test with testcontainers library. @mosquito Maybe you have thoughts on this? I would really appreciate if you give me advise how to test such situations. Or testing third-party library is pointless? On Windows, the test runs if you set the TC_HOST=localhost environment variable.

Code of the test with testcontainers ```python import asyncio from typing import Dict from aio_pika.abc import AbstractIncomingMessage from aio_pika import ExchangeType, connect_robust, Message from testcontainers.rabbitmq import RabbitMqContainer import pytest import json incoming_message_list = [] class TestConsumer: """Silly test consumer.""" def __init__( self, rmq_connection_str: str, exchange_name: str, exchange_type: ExchangeType, queue_name: str, ): self._rmq_connection_str = rmq_connection_str self._exchange_name = exchange_name self._exchange_type = exchange_type self._queue_name = queue_name self._connection = None self._queue = None self._queue_consumer_tag = None async def start_consuming(self) -> None: """Connecting to queue and waiting for messages.""" self._connection = await connect_robust( self._rmq_connection_str, ) channel = await self._connection.channel() await channel.set_qos(1) exchange = await channel.declare_exchange( self._exchange_name, self._exchange_type, ) self._queue = await channel.declare_queue( self._queue_name, durable=False, exclusive=False, auto_delete=False, ) await self._queue.bind(exchange, routing_key="#") self._queue_consumer_tag = await self._queue.consume(self._consume) async def _consume( self, message: AbstractIncomingMessage ) -> None: """Message handler.""" async with message.process(): incoming_message_list.append(message.body.decode()) async def publish_message( message: Dict, connection_string: str, exchange_name: str, exchange_type: ExchangeType = ExchangeType.TOPIC, ) -> None: """Sends message to exchange.""" _connection = await connect_robust(connection_string,) channel = await _connection.channel() exchange = await channel.declare_exchange( exchange_name, exchange_type, ) await exchange.publish( Message(json.dumps(message).encode()), routing_key="test" ) @pytest.mark.asyncio async def test_receive_message_after_reconnect(): """Check that messages are still being delivered after reconnecting.""" rmq_container = RabbitMqContainer() # Пробрасываем порты, чтобы после рестарта они не поменялись rmq_container.ports = {5672: 5672} with rmq_container as rmq: # Arrange connection_string = f"amqp://guest:guest@" \ f"{rmq.get_container_host_ip()}:" \ f"{rmq.get_exposed_port(rmq.RABBITMQ_NODE_PORT)}/" exchange_name = "test_exchange" queue_name = "test_queue" consumer = TestConsumer( rmq_connection_str=connection_string, exchange_name=exchange_name, exchange_type=ExchangeType.TOPIC, queue_name=queue_name, ) await consumer.start_consuming() # Waiting consumer connection await asyncio.sleep(5) rmq.get_wrapped_container().restart() # Waiting rmq restart await asyncio.sleep(15) # Act connection_string = f"amqp://guest:guest@" \ f"{rmq.get_container_host_ip()}:" \ f"{rmq.get_exposed_port(rmq.RABBITMQ_NODE_PORT)}/" await publish_message( {"test": "message111"}, connection_string=connection_string, exchange_name=exchange_name, ) await asyncio.sleep(0.1) # Assert assert len(incoming_message_list) == 1 # Cleanup incoming_message_list.clear() ```
smfx commented 1 year ago

@dvf Hello! No, I did not try this. For me it is easier to restart container :) But I tried to connect to the RMQ on the remote server (K8S cluster) and imitate network error (switch off wifi) and the problem remains.

mosquito commented 1 year ago

@smfx see this https://github.com/mosquito/aio-pika/issues/505#issuecomment-1312778682

olii commented 1 year ago

I already found the reason, the fact is that the garbage collector removes the channel object and sometimes the connection. in the above example, after the function ends, neither the channel nor the connection remains referenced.

So the workaround for this is to keep a robust connection and robust channel in a variable in my application code? Shouldn't there be any other way instead of using weak references in the robust connection? https://github.com/mosquito/aio-pika/blob/master/aio_pika/robust_connection.py#L51

MartinWallgren commented 1 year ago

We are seeing the same thing in version 9.0.7

If we don't hold a reference to the RobustChannel, we always fail to resume consuming after connection issues to RabbitMQ,

MartinWallgren commented 1 year ago

Why do we need weak references for channels in RobustConnection? Do we have a risk of stale channel instances lingering there?

mosquito commented 1 year ago

@MartinWallgren this already reworked in #533