mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Messages are NACK'ed multiple times around queue deletion #630

Open lfse-slafleur opened 1 month ago

lfse-slafleur commented 1 month ago

Hello!

Thank you for aio-pika, it is a great library.

I have been running into some issues when using aio-pika in the orchestrator component for another opensource project called OMOTES (https://github.com/Project-OMOTES/orchestrator/). We use aio-pika to create quite a few queue's, receive 1 or more messages on the queue and delete the queue after. Now, we have found an issue where occassionally Rabbitmq (3.12.14-management) raises a PRECONDITION_FAILED - unknown delivery tag exception to the subscriber of the queue's.

Using wireshark, I have managed to analyze the issue. Attached: rabbitmq_unknown_delivery_tag.zip

Context:

This leads to the issue as analyzed in the attached wireshark pcap:

This is also documented here: https://www.rabbitmq.com/amqp-0-9-1-reference#basic.nack.multiple

Couple of more comments regarding this issue:

In order to reproduce this issue more easily, I have prepared the following subscriber and producer scripts:

"""Publisher"""
import asyncio

import aiormq

async def main():
    body = b"Hello World!"

    # Perform connection
    connection = await aiormq.connect("amqp://guest:guest@localhost//")

    # Creating a channel
    channel = await connection.channel()
    await channel.queue_declare("ready")
    await channel.basic_get("ready", timeout=None)

    for i in range(0, 100):
        queue_name = f"test_{i}"
        for j in range(0, 5):
            await channel.basic_publish(body, routing_key=queue_name)

    await connection.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
"""Subscriber"""
import asyncio

import aio_pika
from aio_pika.abc import AbstractChannel, AbstractRobustQueue

class MessageHandler:
    channel: AbstractChannel
    queue: AbstractRobustQueue

    def __init__(self, channel: AbstractChannel, queue: AbstractRobustQueue) -> None:
        self.channel = channel
        self.queue = queue

    async def run(self) -> None:
        async with self.queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process(requeue=True):
                    print(f"Received {self.queue.name}: {message.body!r}")
                    break
        await self.queue.delete(if_empty=False)

async def main():
    connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")
    async with connection:
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        all_tasks = []
        for i in range(0, 100):
            queue_name = f"test_{i}"
            queue = await channel.declare_queue(queue_name, auto_delete=False)
            handler = MessageHandler(channel, queue)
            all_tasks.append(loop.create_task(handler.run()))
        print("All handlers are waiting")
        await channel.declare_queue("ready")
        await channel.default_exchange.publish(
            aio_pika.Message(body="ready!".encode()), routing_key="ready"
        )
        print("Send ready to publisher to start sending")
        for task in all_tasks:
            await task

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

To start the necessary rabbitmq: docker run --rm -ti -p "5672:5672" -p "15672:15672" rabbitmq:3.12-management

Tested with aio-pika 9.3.1 and rabbitmq 3.12.14-management

lfse-slafleur commented 1 month ago

It appears the the culprit is here: https://github.com/mosquito/aio-pika/blob/master/aio_pika/queue.py#L469. The message is NACK'ed and multiple is set indiscriminetely, even if the deliver-tag of other messages belongs to other subscriptions & queues.

lfse-slafleur commented 1 month ago

Workaround: Keep each queue & subscription in a separate channel. This prevents from the 'multiple = True' part of the NACK to not reference deliveries of other queues & subscriptions.

lfse-slafleur commented 1 month ago

I am working on a test case to reproduce the issue as well as a fix. Should be relatively easy to fix by not nack'ing multiple messages but rather the messages that have been received. RabbitMQ should reschedule it the moment the subscription is removed without nack'ing or ack'ing the message anyways.