Polyconseil / aioamqp

AMQP implementation using asyncio
Other
280 stars 88 forks source link

What is happening when I debug my code? #191

Closed alairock closed 5 years ago

alairock commented 5 years ago

So, I noticed this really interesting behavior. The below code is what I was testing. Really straightforward. I have a breakpoint set on the callback. I have 5 messages per second building in my queue, (Another service is populating my queue)

When I run this script it slurps all the messages in the queue and then breaks on the callback. Everything working just as expected so far.

The really weird thing that happens is that while I am paused in execution of my code and after I slurped in all the events sitting in the queue, NEW events continue to get delivered. There is only one consumer, and that consumer is paused.

It's also good to know that I have all threads paused on my breakpoint as well, so it's not another thread.

Then after about 5 minutes, the consumer no receives new messages and they start building up in the queue again. No error messages happen in the console of rabbit, or in my code.

So my question is, how are the messages being delivered after my breakpoint? Are the messages after the "slurp and pause" lost or do they sit somewhere in transit waiting for the connection to open up again.

I know the question is confusing, and I'm sure it works as expected, but I am trying to understand how it works at a more fundamental level. Thanks ahead for any help.

import aioamqp
import asyncio

async def callback(channel, body, envelope, properties):
    print(body, " [x] Done")  # breakpoint goes here

async def connect():
    transport, protocol = await aioamqp.connect()
    channel = await protocol.channel()
    await channel.basic_consume(callback, queue_name='analytics_delete_me', no_ack=True)

loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
dzen commented 5 years ago

Hello @alairock,

Can you elaborate on

NEW events continue to get delivered.

Do you mean that the callback is executed ? or messages are unqueued in RabbitMQ ?

alairock commented 5 years ago

*Messages continue to get delivered. Unqueued in rabbit.

dzen commented 5 years ago

@alairock I've tested with the examples/receive_logs.py and examples/emit_logs.py. This is because of RabbitMQ push you all messages that need to be delivered.

If you configure your connection with some prefetch, RabbitMQ won't flood you and will only deliver the prefetch_count messages you want until you ack them:

    channel = await protocol.channel()
    await channel.basic_qos(
        prefetch_size=0, prefetch_count=1, connection_global=False
    )
    exchange_name = 'logs'
alairock commented 5 years ago

@dzen So my question is about when I am debugging without prefetch. My code is completely paused, threads included, and I am still seeing my events go to that consumer. But if my code is paused in execution, how is anything able to be delivered?

dzen commented 5 years ago

My best bet is that messages are buffered in the kernel: the socket is connected, the server sent all the messages, your nic/kernel received them but they’re not received by the code until you call recv

alairock commented 5 years ago

That's the most plausible answer I've found so far. Thanks.