Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

How to efficiently drain events from a consumer without spiking the CPU #172

Closed vgoklani closed 6 years ago

vgoklani commented 6 years ago

I need to drain messages from multiple queues. I have a list of queues (self.queues) and I used the approach below:

    async def _start(self):
        while True:
            for queue_name in self.queue_names:
                await self.channel.basic_consume(self.callback, queue_name=queue_name, no_ack=True)
                await asyncio.sleep(0.01)

    def start(self):
        self.loop.run_until_complete(self._start())

The CPU spikes hard If I remove the asyncio.sleep(0.01) statement...

Is there a more efficient way of continuously draining the queue?

dzen commented 6 years ago

Hi @vgoklani

You're registering consumering callback in a loop, and creating a new consumer in rabbitmq. You're not actually unqueing messages

vgoklani commented 6 years ago

Hi @dzen

Here is my full code:

class RabbitMQConsumerAsyncio(object):
    def __init__(self, username, password, host, port, external_callback):
        self.loop = asyncio.get_event_loop()
        self.transport, self.protocol, self.channel = None, None, None
        self.queue_names = []
        self.external_callback = external_callback
        self.loop.run_until_complete(self.connect(host, port, username, password))

    def on_error(self, message_str):
        log.error(message_str)

    async def connect(self, host, port, username, password):
        self.transport, self.protocol = await aioamqp.connect(host=host, port=int(port), login=username, password=password, on_error=self.on_error)
        self.channel = await self.protocol.channel()
        await self.channel.basic_qos(prefetch_size=0, prefetch_count=100, connection_global=False)

    async def _add_queue(self, queue_name, exchange_name, exchange_type, routing_key):
        assert exchange_type in ("direct", "fanout", "topic")

        await self.channel.queue_declare(queue_name=queue_name, durable=True, exclusive=True)
        await self.channel.exchange_declare(exchange_name=exchange_name, type_name=exchange_type, durable=True)
        await self.channel.queue_bind(queue_name, exchange_name, routing_key=routing_key)
        self.queue_names.append(queue_name)

    def add_queue(self, queue_name, exchange_name, exchange_type, routing_key):
        self.loop.run_until_complete(self._add_queue(queue_name, exchange_name, exchange_type, routing_key))

    async def callback(self, channel, body, envelope, properties):
        await self.external_callback(body)
        await channel.basic_client_ack(delivery_tag=envelope.delivery_tag)

    async def _start(self):
        while True:
            for queue_name in self.queue_names:
                await self.channel.basic_consume(self.callback, queue_name=queue_name)#, no_ack=True)
                await asyncio.sleep(0.1)

    def start(self):
        self.loop.run_until_complete(self._start())

I make an instance of RabbitMQConsumerAsyncio, define an external callback which processes each message, then call start() which is supposed to consume the messages continuously.

Is this the correct way of draining all the messages from a queue?

RemiCardona commented 6 years ago

You just need to call basic_consume exactly once per queue. Then your callback will be called for each incoming message. IOW, just drop the while True.

dzen commented 6 years ago

@vgoklani,

I think there is something wrong here : basic_consume does not dequeue one message. It tells rabbit to drain all those message, and they will be processed by the callback as described in the documentation : http://aioamqp.readthedocs.io/en/latest/api.html#consuming-messages

If you want to get messages one by one, you should use basic_get but it's not efficient.

vgoklani commented 6 years ago

Hi @dzen Thank you for the response.

Unfortunately when I run the code without the while loop, it simply exits:

    async def consume(self):
        await self.channel.basic_consume(self.callback, queue_name=self.queue_names[0])

    def start(self):
        self.loop.run_until_complete(self.consume())

I would like the consumer to persist forever, and catch all new messages. Here it simply consumes, then stops and exists.

dzen commented 6 years ago

@vgoklani it exists because await returns imediately.

please take a look in the example folder : https://github.com/Polyconseil/aioamqp/blob/master/examples/receive.py

vgoklani commented 6 years ago

thanks @dzen I see the example and it works. However, it's not clear to me why the loop continuously retrieves messages when using run_forever(), but stops when using run_until_complete. Why does adding run_until_complete(receive) before run_forever() allow the code to run correctly?