mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Potential memory leak - pamqp\base.py #628

Open ionutbaltariu opened 1 month ago

ionutbaltariu commented 1 month ago

I am investigating a memory leak in a service and I've managed to see the following with tracemalloc:

At the first run I get the following: [..]\venv\Lib\site-packages\pamqp\base.py:60: size=2616 B, count=44, average=59 B

After submitting some messages in the queue, it seems that the size only increases: [..]\venv\Lib\site-packages\pamqp\base.py:60: size=7746 B, count=129, average=60 B and eventually it reaches 14.3 KiB

I might be doing the test in a wrong manner, so please let me know what information is needed to actually confirm any eventual memory leak.

The used version is 9.4.1

This is the code:

async def run_async(self) -> None:
        logger.info("Will start the worker..")
        tracemalloc.start()
        self.internal_events_session = aiohttp.ClientSession()
        self.google_req_session = aiohttp.ClientSession()

        asyncio.create_task([..])

        logger.info("Declared request sessions..")

        logger.info("Connecting to RabbitMQ..")
        connection = await aio_pika.connect_robust(
            host=os.getenv(f"RABBIT_MQ_HOST"),
            port=int(os.getenv(f"RABBIT_MQ_PORT")),
            virtualhost="/",
            login=os.getenv("RABBIT_MQ_USERNAME"),
            password=os.getenv("RABBIT_MQ_PASSWORD")
        )

        queue_name = os.getenv("QNAME")

        logger.info("Creating channel..")
        channel = await connection.channel()

        logger.info("Setting QOS..")
        await channel.set_qos(prefetch_count=10)

        logger.info("Declaring queue..")
        queue = await channel.get_queue(queue_name)

        logger.info("Will start consuming..")
        await queue.consume(self.process_message)

        try:
            # Wait until terminate
            await asyncio.Future()
        finally:
            await connection.close()

And self.process_message has the following structure:

async def process_message(self, message: aio_pika.abc.AbstractIncomingMessage) -> None:
        async with message.process():
        [.. code with async operations]
        snapshot = tracemalloc.take_snapshot()
        top_stats = snapshot.statistics('lineno')

        for stat in top_stats[:20]:
                print(stat)

The other shown stats do not have significant differences in the memory size after 100+ runs of self.process_message, with pamqp\base.py:60 being the single one that constantly increases.

Darsstar commented 1 month ago

Since I apparently managed to resolve someone's memory leak with https://github.com/mosquito/aio-pika/pull/615 I wonder if you are hitting the same issue.

If their leak is caused by async for never finishing, preventing the Task te be gc-ed, like which is what I'm assuming than this would be a different leak. And as far as I can tell, the code you show shouldn't leak memory.

https://docs.python.org/3/library/gc.html has some info that might be usefull in your debugging endeaver. gc.collect() and gc.DEBUG_LEAK in particular. If processing the messages involves floats, that could be a cause, as is noted on gc.collect().

ionutbaltariu commented 1 month ago

hey! thanks for reaching out! I've managed to do so some prints of gc.get_count() and they indicate that there are no hanging objects, as far as I can understand.. It seems that the leak kind of saturates after consuming about 200 MB of RAM, but still manages to grow after (although slowly).

I'll try to come back with more info in order to pinpoint a potential leak.

LockedThread commented 4 weeks ago

hey! thanks for reaching out! I've managed to do so some prints of gc.get_count() and they indicate that there are no hanging objects, as far as I can understand.. It seems that the leak kind of saturates after consuming about 200 MB of RAM, but still manages to grow after (although slowly).

I'll try to come back with more info in order to pinpoint a potential leak.

Did you find the memory leak?