mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.23k stars 187 forks source link

Unable to ack message using QueueIterator, after loosing connection #501

Open nffdiogosilva opened 1 year ago

nffdiogosilva commented 1 year ago

Hello,

I have found what seems to be similar issues talking about this error (#312, #379) but since they haven't been addressed, I'll leave my take here.

Basically, while consuming a message, if a connection is lost, even though a reconnect occurs, the message won't be able to be acknowledged or rejected. It gets stuck. And it will be indeterminately unacked while the consumer is running.

Script to reproduce issue:

import os
import asyncio
import logging
from time import sleep

import aio_pika
from aio_pika.queue import QueueIterator

AMQP_CONNECTION_STRING = os.environ.get("AMQP_CONNECTION_STRING", "<add default here>")
QUEUE_WAIT_MESSAGE_TIMEOUT = 0.05  # seconds

ROUTING_KEY = "queue-testing"
TOTAL_MESSAGES_TO_PUBLISH = 5

async def process_message(
    message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
    print(message.body)

    seconds = 250
    print(f"Sync sleep {seconds} seconds to force loosing connection...")
    sleep(seconds)

    seconds = 10
    print(f"Async sleep {seconds} to allow connection to fail in heartbeat...")
    await asyncio.sleep(10)

    print("Trying to ack message... (it will stay stuck here)")
    await message.ack()

class QueueIteratorWithHighConsumeTimeout(QueueIterator):
    """To guarantee that the process message has a big timeout.

    That is bigger than current sleeps being performed in the `process_message` 
    function.
    """

    async def consume(self) -> None:
        """Consumes from a queue."""
        self._consumer_tag = await self._amqp_queue.consume(
            self.on_message,
            timeout=3600,  # 1 Hour
        )

async def main() -> None:
    # to see aio pika logging (i.e: heartbeat logs)
    logging.basicConfig(level=logging.DEBUG)

    # Creates connection
    connection = await aio_pika.connect_robust(AMQP_CONNECTION_STRING)

    async with connection:
        # Creates channel
        channel = await connection.channel(publisher_confirms=True)

        # Set prefetch to 1
        await channel.set_qos(prefetch_count=1)

        # Declare/Gets queue
        queue = await channel.declare_queue(ROUTING_KEY, durable=True)

        # Publishes some messages to queue
        for __ in range(TOTAL_MESSAGES_TO_PUBLISH):
            await channel.default_exchange.publish(
                aio_pika.Message(body=f"Hello {ROUTING_KEY}".encode()),
                routing_key=ROUTING_KEY,
            )

        # Starts consuming messages from queue
        async with QueueIteratorWithHighConsumeTimeout(
            queue, timeout=QUEUE_WAIT_MESSAGE_TIMEOUT
        ) as queue_iter:
            while True:
                try:
                    async for message in queue_iter:
                        await process_message(message)
                except asyncio.TimeoutError:
                    # Its catching the timeout error that occurs when the
                    # QUEUE_WAIT_MESSAGE_TIMEOUT is exceeded.
                    pass

if __name__ == "__main__":
    asyncio.run(main())

Logs generated:

DEBUG:aio_pika.queue:Start to consuming queue: <RobustQueue(queue-testing): auto_delete=False, durable=True, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Consume object at 0x1036afba0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.ConsumeOk object at 0x1036b2ac0> in channel #1 weight=51 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Received frame <Basic.Deliver object at 0x103620dc0> in channel #1 weight=75 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Received frame <pamqp.header.ContentHeader object at 0x1035fb760> in channel #1 weight=61 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Received frame <pamqp.body.ContentBody object at 0x103648bb0> in channel #1 weight=27 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>

b'Hello queue-testing'
Sync Sleep 250 seconds to force loosing connection...
Async sleep 10 to allow connection to fail in heartbeat...

DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=0, frames=[<pamqp.heartbeat.Heartbeat object at 0x10361c250>], drain_future=None)
DEBUG:aiormq.connection:Reader exited for <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Cancelling cause reader exited abnormally
DEBUG:aiormq.connection:Sending <Connection.Close object at 0x1036aae50> to <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Writer on connection amqp://guest:******@localhost:5672/vhost closed
DEBUG:aiormq.connection:Writer exited for <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270>
DEBUG:aiormq.connection:Closing connection <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fa270> cause: TimeoutError()
INFO:aio_pika.robust_connection:Connection to amqp://guest:******@localhost:5672/vhost closed. Reconnecting after 5 seconds.
DEBUG:aio_pika.queue:Cancelling queue iterator <QueueIteratorWithHighConsumeTimeout: queue='queue-testing' ctag='ctag1.5d7a6c279db64bb88d093e154c73222a'>
DEBUG:aio_pika.queue:Queue iterator <QueueIteratorWithHighConsumeTimeout: queue='queue-testing' ctag='ctag1.5d7a6c279db64bb88d093e154c73222a'> channel closed
DEBUG:aiormq.connection:Connecting to: amqp://guest:******@localhost:5672/vhost
DEBUG:aio_pika.channel:Start reopening channel <aio_pika.robust_channel.RobustChannel object at 0x1035fb670>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Channel.Open object at 0x1036c9900>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Channel.OpenOk object at 0x103600740> in channel #1 weight=16 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Confirm.Select object at 0x1036bd180>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Confirm.SelectOk object at 0x103648370> in channel #1 weight=12 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Qos object at 0x1036cc5e0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.QosOk object at 0x1036cd670> in channel #1 weight=12 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aio_pika.queue:Declaring queue: <RobustQueue(queue-testing): auto_delete=False, durable=True, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Queue.Declare object at 0x10361deb0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Queue.DeclareOk object at 0x1036cc4f0> in channel #1 weight=34 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aio_pika.queue:Start to consuming queue: <RobustQueue(queue-testing): auto_delete=False, durable=True, exclusive=False, arguments=None
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=1, frames=[<Basic.Consume object at 0x10361dcf0>], drain_future=None)
DEBUG:aiormq.connection:Received frame <Basic.ConsumeOk object at 0x103629cc0> in channel #1 weight=51 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Received frame <Basic.Deliver object at 0x1035e5e20> in channel #1 weight=75 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Received frame <pamqp.header.ContentHeader object at 0x10361cf70> in channel #1 weight=61 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Received frame <pamqp.body.ContentBody object at 0x1035fb3a0> in channel #1 weight=27 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>

Trying to ack message... (it will stay stuck here)

DEBUG:aiormq.connection:Received frame <pamqp.heartbeat.Heartbeat object at 0x103648df0> in channel #0 weight=8 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=0, frames=[<pamqp.heartbeat.Heartbeat object at 0x103648e20>], drain_future=None)
DEBUG:aiormq.connection:Received frame <pamqp.heartbeat.Heartbeat object at 0x1035fb310> in channel #0 weight=8 on <Connection: "amqp://guest:******@localhost:5672/vhost" at 0x1035fad10>
DEBUG:aiormq.connection:Prepare to send ChannelFrame(channel_number=0, frames=[<pamqp.heartbeat.Heartbeat object at 0x103648e20>], drain_future=None)

Any ideas?

krasoffski commented 1 year ago

If you add to print channel status like message.channel.is_closed you will notice that it returns True

    print(f"Trying to ack message... (it will stay stuck here), is_closed: {message.channel.is_closed}")
    await message.ack()

Comparing with others clients (for different languages) there is should be something like AlreadyClosedError for trying to write to closed channel but not deadlock (hung).

There are bunch of issues in underlying layer aiormq which might be root cause of this problem: