mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://docs.aio-pika.com/
Apache License 2.0
1.26k stars 190 forks source link

Messages are not sent as expected at versions past 7.0.0 #455

Open PrometheusTheHarbinger opened 2 years ago

PrometheusTheHarbinger commented 2 years ago

I was testing performance of a side project using aio-pika and encountered this bug. Previously when I sent a lot of messages one after another it worked fine. However, after switching to 7.1.2 messages either were sent very slowly or only part of them reached RabbitMQ. I wrote a script to exclude influence of that project's code and the problem persists. It is enough just to switch versions of aio-pika to reproduce this behavior. Script works just fine on 6.8.2, and fails on 7.0.0b2, 7.0.0, 7.1.2. Did not check on other releases.

import asyncio
import json
import threading
import time

import aio_pika

def separate_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def send_message(msg, ex, routing_key):
    await ex.publish(msg, routing_key=routing_key)

async def main(loop_for_subroutines) -> None:
    config = json.loads(open('config.json').read())
    connection = await aio_pika.connect_robust(**config['connect_credentials'])
    msgs = []
    for i in range(200):
        msgs.append(aio_pika.Message(body=f'Hello world{i}!'.encode()))
    channel = await connection.channel()
    ex = await channel.get_exchange(config['exchange'], ensure=False)
    for msg in msgs:
        asyncio.run_coroutine_threadsafe(send_message(msg, ex, config['routing_key']), loop=loop_for_subroutines)

if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    athread = threading.Thread(target=separate_loop, args=(loop,), daemon=True)
    athread.start()
    asyncio.run(main(loop))
    time.sleep(5)  # One's gotta finish sending.
mosquito commented 2 years ago

Could you please prepare two cProfile files, on good version and broken?

Like this python -m cProfile -o profile-$variant.cprof script.py

PrometheusTheHarbinger commented 2 years ago

I would gladly do so, but broken one never finishes. I have to kill it specifically with SIGINT. In some cases, no messages are sent and it exits with code zero. But this is something I can't reproduce, it just happens sometimes.

PrometheusTheHarbinger commented 2 years ago

I can attach profiling of 6.8.2, which works alright, if that can be of any help. Or I can also profile 7.1.2, but I'll have to kill it mid-way.

PrometheusTheHarbinger commented 2 years ago

I got lucky and catched the case where no messages were sent on profiling. Link, I also noticed that in the case where I kill the script, some messages are delivered pretty fast, usually around eleven of them, and then they just stop.