mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

Add connection blocking event handling #506

Closed teplinsky-maxim closed 1 year ago

teplinsky-maxim commented 1 year ago

Hello! Are there any variants to handle connection blocking by rmq server? I have events such as

Connection <Connection: "amqp://guest:******@127.0.0.1:5672/vhost" at 0x7f2551f1c540> was blocked by: 'low on memory'
Connection <Connection: "amqp://guest:******@127.0.0.1:5672/vhost" at 0x7f2551f1c540> was unblocked

and I want to stop waiting on connection until the connection is unblocked, is it possible to add an event handler such as Java has? Or maybe I can add handler to this event myself somehow like override Connection class? I know, that the logging code belongs to aiormq/connection.py, but I find it more related to this repo.

mosquito commented 1 year ago

The server sends the blocking-related frame and all frames to send to the broker pauses until the unblock-related frame is received (see this PR).

Maybe I misunderstood your question, and you want to be notified that the connection is unblocked?

The fact is that the blocking itself is an emergency situation, if I understand documentation correctly. And the purpose of blocking is not to harm the broker even more.

teplinsky-maxim commented 1 year ago

I have a high-loaded services, which can overload the broker. In that case I want to discard unnecessary data to be sent over rmq. Imagine this pseudocode:

if connection.is_blocked:
    calculate_info_locally_but_slow()
else:
    send_data_over_rmq_to_calculate_fast()

For now I came to creating a class BlockingDiscardConnection(Connection) # Connection is from aiormq, but not sure is this a rigth class to inherit from, because I also want the connection to be robust, so I should probably include RobustConnection too

mosquito commented 1 year ago

An event or a callback won't help you in any way. In your case, you will check before you "hurt" the broker, but in the process of sending it might be happen, in the first branch of the condition for example.

As I mentioned earlier, if the broker is so bad that he asked the client to stop, then there is no point in checking it.

If you have a program running on two or more computers, or in two instances on one computer, you do not synchronise these checks, and you can "hurt" the broker twice.

Complicating this is that you won't be able to send frames, even if there is some callback, until the connection is unblocked and channel close event too.

I imagine such a situation. You subscribe to a blocking event, a blocking occurs in the middle of publishing a message (this is sending three frames) in order to cancel the sending you will have to close the channel, but for this you need to send a frame about closing the channel, but frame sending paused, all you have to do is close the connection. In this case, it is not clear why you need a RobustConnection.

It is not difficult to make a blocking-callback, this library is just aimed at removing the callback-hell and so that the user can work with RabbitMQ with a minimum of callbacks. aiormq is a lower level library and has an event for the suggested behaviour (Connection.ready() method).

If you ask me, I would solve this problem by tuning up the broker, lazy-mode policy for queues, or sharding queues to different nodes of the cluster, should help in this.

UPD: You fell free to use aio_pika.Connection.transport.ready() method in aio-pika, with timeout like this:

try:
    await asyncio.wait_for(connection.transport.ready(), timeout=5)
    calculate_info_locally_but_slow()
except asyncio.TimeoutError:
    send_data_over_rmq_to_calculate_fast()
teplinsky-maxim commented 1 year ago

Yep, it is exactly what I was looking for!

try:
    await asyncio.wait_for(connection.transport.ready(), timeout=5)
    send_data_over_rmq_to_calculate_fast()    
except asyncio.TimeoutError:
    calculate_info_locally_but_slow()

Thanks!