mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.23k stars 187 forks source link

Does aio_pika support multithreading? #482

Closed zhenzi0322 closed 2 years ago

zhenzi0322 commented 2 years ago

If supported, how is the multi-threaded consumer implemented

mosquito commented 2 years ago

You can process the messages in the thread pool:


def in_thread_processor(body: bytes):
    """ This code will be executed in a thread pool """
    pass

async def on_message(msg: aio_pika.IncomingMessage):
    async with msg.process():
        loop = asyncio.get_running_loop()
        await loop.run_in_executor(None, in_thread_processor, msg.body)
zhenzi0322 commented 2 years ago

Hello, this is my sample code:

from config import mq_config
from aio_pika import IncomingMessage
import aio_pika
import asyncio

def in_thread_processor(body: bytes):
    """ This code will be executed in a thread pool """
    print("message content: ", body)

class Task:

    def __init__(self, host, username, password, queue_name, exchange, port=5672):
        self.exchange = exchange
        self.queue_name = queue_name
        self.host = host
        self.username = username
        self.password = password
        self.port = port

    async def run(self, queue_type):
        full_queue_name = self.queue_name + "_" + queue_type
        full_queue_key = self.queue_name + "." + queue_type
        self.connection = await aio_pika.connect(host=self.host, port=self.port, login=self.username,
                                                 password=self.password)
        async with self.connection:
            # Creating channel
            self.channel = await self.connection.channel()

            # Will take no more than 10 messages in advance
            await self.channel.set_qos(prefetch_count=5)
            # Declaring queue
            self.exchange1 = await self.channel.declare_exchange(name=self.exchange, durable=True)

            queue = await self.channel.declare_queue(name=full_queue_name, durable=True)
            await queue.bind(exchange=self.exchange1, routing_key=full_queue_key)
            await queue.consume(callback=self.on_message)

    async def on_message(self, msg: IncomingMessage):
        async with msg.process():
            loop = asyncio.get_running_loop()
            await loop.run_in_executor(None, in_thread_processor, msg.body)

if __name__ == '__main__':
    task = Task(
        host=mq_config.get("host"),
        username=mq_config.get("username"),
        password=mq_config.get("password"),
        queue_name='idcard',
        exchange=mq_config.get("exchange")
    )
    asyncio.run(task.run(queue_type="create"))

Why does the program end as soon as I start it?

mosquito commented 2 years ago

I recommend you pay attention to aiomisc, judging by your code example you are want to organize your tasks in a similar way.

The main problem in this example is that you exit the program, you should replace the call asyncio.run with:

...

if __name__ == '__main__':
    ...
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task.run(queue_type="create"))
    loop.run_forever()
zhenzi0322 commented 2 years ago

I recommend you pay attention to aiomisc, judging by your code example you are want to organize your tasks in a similar way.

The main problem in this example is that you exit the program, you should replace the call asyncio.run with:

...

if __name__ == '__main__':
    ...
    loop = asyncio.get_event_loop()
    loop.run_until_complete(task.run(queue_type="create"))
    loop.run_forever()

Ok, thanks, I was trying to execute multiple tasks in the queue at a time, it would be too slow to execute one task at a time