mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Consumer doesn't resume after Connection was stuck error #563

Closed felipecwb closed 9 months ago

felipecwb commented 11 months ago

I have a similar code for example:

import asyncio
import logging

import aio_pika
from aio_pika.abc import AbstractIncomingMessage

RABBITMQ_URI = "amqp://guest:guest@rabbitmq.development.lan:5672/?heartbeat=5"
EXCHANGE_NAME = "shared_messages"

async def consume(queue_name) -> None:
    async def on_close(*args, **kwargs):
        print("\n>>> CLOSED")

    async def on_reconnect(*args, **kwargs):
        print("\n>>> RECONNECTED")

    async def process_message(message: AbstractIncomingMessage):
        async with message.process():
            print(">>> RECEIVED: ", message.body)

    connection = await aio_pika.connect_robust(RABBITMQ_URI, timeout=10)
    connection.close_callbacks.add(on_close)
    connection.reconnect_callbacks.add(on_reconnect)

    async with connection:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=100)

        # setup queue
        queue = await channel.declare_queue(queue_name, durable=True, auto_delete=False)
        await queue.bind(EXCHANGE_NAME)

        # start consuming
        await queue.consume(process_message)

        try:
            return await asyncio.Future()
        finally:
            await connection.close()

async def main():
    logging.basicConfig(level=logging.DEBUG)

    try:
        await asyncio.gather(
            asyncio.create_task(consume("testing.queue.a"))
            asyncio.create_task(consume("testing.queue.b"))
        )
    except:
        logging.exception("exception")

if __name__ == "__main__":
    asyncio.run(main())

Is working good when Rabbitmq Server got restarted and messages are processed again... but when I turn off WiFi and turn on again after the connection.__heartbeat_grace_timeout the reconnect "works" but I didn't received any new message.

logs:

DEBUG:aiormq.connection:Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
WARNING:aiormq.connection:Server connection <Connection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" at 0x7eff8efd8090> was stuck. No frames were received in 18 seconds.
DEBUG:aiormq.connection:Writer exited for <Connection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" at 0x7eff8efd8090>
DEBUG:aiormq.connection:Reader exited for <Connection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" at 0x7eff8efd8090>
DEBUG:aiormq.connection:Closing connection <Connection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" at 0x7eff8efd8090> cause: CancelledError()

...
>>> CLOSED
INFO:aio_pika.robust_connection:Connection to amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5 closed. Reconnecting after 5 seconds.
WARNING:aio_pika.robust_connection:Connection attempt to "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" failed: [Errno -2] Name or service not known. Reconnecting after 5 seconds.
DEBUG:aio_pika.robust_connection:Waiting for connection close event for <RobustConnection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" 1 channels>
DEBUG:aio_pika.robust_connection:Connection attempt for <RobustConnection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" 1 channels>
DEBUG:aiormq.connection:Connecting to: amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5

>>> RECONNECTED
DEBUG:aio_pika.robust_connection:Connection made on <RobustConnection: "amqp://guest:*******@rabbitmq.development.lan:5672/?heartbeat=5" 1 channels>
... heartbeats frames ...
...but stop processing messages.

is there any way of asyncio.Future to raise an execption on this case?

versions:

felipecwb commented 11 months ago

I fixed with:

async def consume(connection, queue_name) -> None:
    # ... queue creating and consume...

    # release this function with channel close
    channel_closed_event = asyncio.Event()
    queue.channel.close_callbacks.add(
        lambda channel, exception: channel_closed_event.set()
    )

    try:
        await channel_closed_event.wait()
    finally:
        await queue.cancel(consumer_tag)

async def main():
    # ... await connection ...
    while True:
        try:
            await asyncio.gather(
                consume(connection, "testing.queue.a")
                consume(connection, "testing.queue.b")
            )
        except:
            logging.exception("exception")

        await asyncio.sleep(5)

I just while and wait for channel to close after trigger consume

sylvain1811 commented 10 months ago

I'm also facing this issue.

It seems the connection is correctly recreated after a "Connection was stuck", but not the channels.

Steps to reproduce using two Docker containers, consumer and rabbitmq

  1. Start both containers, publish messages, and validate the correct processing of these messages
  2. Disconnect the consumer container from the Docker network using the command docker network disconnect <network> <container>
  3. Wait for 60+ seconds, ensure that rabbitmq indicates closing the connection due to a lack of client heartbeat
  4. Wait for 120+ seconds, ensure that consumer indicates its reconnection attempts, after the message "Connection was stuck"
  5. Reconnect the consumer container to the Docker network using the command docker network connect <network> <container>
  6. Ensure that the reconnection has taken place (check RabbitMQ logs, RabbitMQ management)
  7. Publish new messages and validate their proper processing.
  8. Check that the connection has no associated channels (RabbitMQ management)

Here is my consumer:

connection = await aio_pika.connect_robust(os.environ["RABBITMQ_CONN_STRING"], timeout=5)

async with connection:
    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange(
        "my-exchange", type=ExchangeType.TOPIC, durable=True, auto_delete=False
    )

    # Declaring queue
    queue = await channel.declare_queue("my-queue", durable=True, auto_delete=False)

    # Binding queue to exchange
    await queue.bind(exchange, "my-routing-key")

    # Iterate over messages
    async with queue.iterator() as queue_iter:
        async for message in queue_iter:
            try:
                async with message.process():
                    logging.info(message.body.decode())
mosquito commented 10 months ago

@sylvain1811 please provide aio-pika and aiormq versions

sylvain1811 commented 10 months ago

@mosquito Sure! Here it is:

aio-pika==9.1.4
aiormq==6.7.6

Python : 3.9

mosquito commented 10 months ago

@sylvain1811 Please double check it with latest 9.2.2, this should be fixed in #571

sylvain1811 commented 10 months ago

@mosquito It's working fine with the latest release of aio-pika (9.2.2), thank you!

pboehm commented 10 months ago

@mosquito thank you for this amazing library!

I'm sorry to report that for me, even with aio-pika==9.2.2 and aiormq==6.7.7, the channels of a robust connection are not restored when a connection gets stuck. When closing the connection via RabbitMQ Management everything is restored correctly and the restoration of all resources is logged.

I can reproduce it by using the 2 docker containers and using the docker network disconnect|connect ...... procedure described by @sylvain1811:

aozupek commented 10 months ago

I'm having the exact same issue (aio-pika:9.2.2, aiormq:6.7.7). After the Server connection <Connection: "amqp://guest:******@localhost:5672//" at 0x1048d6a30> was stuck. No frames were received in 183 seconds. log, connection re-establishes however channels are not restored. When I attempt to consume or send a message after the connection is re-established I receive the ChannelInvalidStateError exceptions.

aiormq.exceptions.ChannelInvalidStateError: <Channel: "1" at 0x104c76da0> closed aiormq.exceptions.ChannelInvalidStateError: Channel closed by RPC timeout

Has anyone found a solution yet?

mosquito commented 9 months ago

Thanks to @aozupek the problem is resolved and a new fixed version released as aio-pika==9.2.3

pboehm commented 9 months ago

@aozupek @mosquito with 9.2.3 it works like a charm. Thanks for your work!