mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

How to consume msg by sub coroutine? #624

Open Cherrymelon opened 3 months ago

Cherrymelon commented 3 months ago

I use queue.consume but it seems work in sequence,I want that main thread create a new sub coroutine for handle msg when new msg coming here is my code consumer.py

import asyncio
import os

import aio_pika
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HelloWorld.HelloWorld.settings')
django.setup()
from django.conf import settings as s
from HelloWorld.HelloWorld.log import logger

async def on_message(message):
    # print(f" [x] Received message {message!r}")
    print(f"     Message body is: {message.body!r}")
    if message.body == b'2':
        print('finish')
        await asyncio.sleep(1)
    await message.ack()

async def main():
    conn = await aio_pika.connect(s.RABBIT_MQ)

    channel = await conn.channel()
    await channel.set_qos(prefetch_count=1)

    queue = await channel.declare_queue(name=s.MQ_QUEUE, durable=True)
    exchange = await channel.declare_exchange('direct')
    await queue.bind(exchange, routing_key=s.MQ_ROUTE_KEY)

    await queue.consume(on_message)

if __name__ == '__main__':
    logger.info('start consumer')
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()

producer.py

import asyncio
import os

import aio_pika
import django

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'HelloWorld.HelloWorld.settings')
django.setup()
from django.conf import settings as s
from HelloWorld.HelloWorld.log import logger

async def pub(mq_queue, mq_route_key, loop):
    conn = await aio_pika.connect_robust(s.RABBIT_MQ, loop=loop)
    channel = await conn.channel()
    # Declaring exchange
    exchange = await channel.declare_exchange("direct")

    queue = await channel.declare_queue(mq_queue, durable=True)
    await queue.bind(exchange, mq_route_key)
    for i in range(10):
        payload = str(i).encode()
        msg = aio_pika.Message(payload, delivery_mode=2)
        await exchange.publish(msg, routing_key=mq_route_key)
        logger.info('send success {}'.format(payload))
    logger.info('loop over')
    loop.stop()

if __name__ == '__main__':
    # django.setup()
    logger.info('asdfasdf')
    loop = asyncio.get_event_loop()
    loop.create_task(pub(mq_queue=s.MQ_QUEUE, mq_route_key=s.MQ_ROUTE_KEY, loop=loop))
    loop.run_forever()

actual output:

     Message body is: b'0'
     Message body is: b'1'
     Message body is: b'2'
finish
     Message body is: b'3'
     Message body is: b'4'
     Message body is: b'5'
     Message body is: b'6'
     Message body is: b'7'
     Message body is: b'8'
     Message body is: b'9'

expect output:

     Message body is: b'0'
     Message body is: b'1'

     Message body is: b'3'
     Message body is: b'4'
     Message body is: b'5'
     Message body is: b'6'
     Message body is: b'7'
     Message body is: b'8'
     Message body is: b'9'
     Message body is: b'2'
finish

whatever,2 should be lastest

Darsstar commented 2 months ago

You set prefetch_count to 1. I suggest to read https://www.rabbitmq.com/docs/confirms#channel-qos-prefetch.