mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Should I keep aio_pika connection always open? #537

Open jalvespinto opened 1 year ago

jalvespinto commented 1 year ago

In some examples the consumer main function ends with await asyncio.Future() so the process does not finish. What is the best practice when you have multiple consumers spread across multiple modules? My idea is to have dataclass with the necessary data for creating each consumer and have a single process for all workers. Something like this:

#consumer_registry.py
from attrs import define
consumers = []

@define
class Consumer:
    set_oqs: int
    queue: dict
    handler: Callable

    def __attrs_post_init__(self):
        consumers.append(self)
#some_consumer.py
first_consumer = Consumer(...)
second_consumer = Consumer(...)
#main.py
import anyio
import aio_pika
from consumer_registry import consumers
import some_consumer

def main():
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
        async with connection:
            for c in consumers:
                channel = await connection.channel()
                await channel.set_qos(prefetch_count=c.set_oqs)
                queue = await channel.declare_queue(**c.queue)
                await queue.consume(c.handler, no_ack=True)
            await anyio.sleep_forever()

if __name__ == "__main__":
   anyio.run(main())

My idea is to have this running as a pod on k8s and have a periodic health check, but otherwise leave the process running like above. Is it fine doing that way?

mosquito commented 1 year ago

The AMQP protocol is designed in such a way that the connection should be open for as long as possible. However, if your task is to periodically process accumulated messages, then the connection should be open only for the duration of message processing.

jalvespinto commented 1 year ago

Tks @mosquito. That is what I understood. And that is why I thought I was missing something. Like having a main app that would run indefinitely and that I could add consumers to it, maybe using an decorator. Like we usually have for routes in web frameworks. Sorry if I am saying something silly (not much experience on my part).