mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.19k stars 187 forks source link

Once have a slowly process message, rabbitmq will stack unacked message #168

Open x007007007 opened 5 years ago

x007007007 commented 5 years ago

It doesn't make sense, I try to work on it

import asyncio
import aio_pika

async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    async with connection:
        queue_name = "test_queue"

        # Creating channel
        channel = await connection.channel()    # type: aio_pika.Channel

        # Declaring queue
        queue = await channel.declare_queue(
            queue_name,
            auto_delete=True
        )   # type: aio_pika.Queue

        async for message in queue:
            with message.process():
                print(message.body)
                await asyncio.sleep(0.1)    # ack 10 /s
                if queue.name in message.body.decode():
                    break

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

2018-12-05 19_51_36-rabbitmq management

mosquito commented 5 years ago

Just apply QoS with set_qos for the channel. May be should add it to documentation.