eandersson / amqpstorm

Thread-safe Python RabbitMQ Client & Management library
https://www.amqpstorm.io/
MIT License
186 stars 36 forks source link

How does amqpstorm handle Rabbitmq memory alarms #118

Open mehdigmira opened 2 years ago

mehdigmira commented 2 years ago

Hello,

Thanks for your work on this library 👍 I have a question regarding Rabbitmq memory watermarks and the way amqpstorm handles them.

Imagine the following: 1) I have a process that keeps publishing messages (basically an infinite loop for this example) 2) At a certain point memory alarm is triggered on RMQ, and further publishes are blocked (Writes on a blocked connection will time out or fail with an I/O write exception.), also the publisher get notified that a connexion is blocked (Compatible AMQP 0-9-1 clients will be [notified] when they are blocked and unblocked.). 3) Then Rabbitmq flushes out some of the data to disk, and unblocks publisher

What happens on AMQPstorm side ? Does it raise ? do we lose some messages ? does the block/unblock logic trigger some logic in AMQPStorm ?

Thanks for the help

Links: RMQ doc

eandersson commented 2 years ago

The state is exposed on the connection level using the is_blocked property.

e.g.

while connection.is_blocked:
    time.sleep(1)
channel.publish(...)

I suspect that if you enable confirm_deliveries on the channel level it would raise an exception as well, or at the very least return False, since the message wasn't delivered successfully.

jhogg commented 2 years ago

Mehdi,

The blocked notification support has to be requested when the connection is created, and it is by default. However, if you provide a "client properties override" that includes the 'capabilities' definition, you must define all required capabilities or you risk losing some functionality.

See this for reference: https://github.com/eandersson/amqpstorm/blob/c32999f5f9abe9c751c25250bd813ab330a46795/amqpstorm/channel0.py#L227

Jay

On Thu, Apr 21, 2022 at 10:43 PM Erik Olof Gunnar Andersson < @.***> wrote:

The state is exposed on the connection level using the is_blocked property.

e.g.

while connection.is_blocked: time.sleep(1) channel.publish(...)

I suspect that if you enable confirm_deliveries on the channel level it would raise an exception as well, or at the very least return False, since the message wasn't delivered successfully.

— Reply to this email directly, view it on GitHub https://github.com/eandersson/amqpstorm/issues/118#issuecomment-1105972429, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJGJRFC2K5IN73CLPBNNDVGIN5BANCNFSM5T7UNAIA . You are receiving this because you are subscribed to this thread.Message ID: @.***>

mehdigmira commented 2 years ago

Thanks for the swift replies. The is_blocked property is asynchronous, since it relies on a notification sent by Rabbitmq, so we might be trying to publish on a blocked connection, even if we use is_blocked. So my question is: what does amqpstorm do when i try to write on a blocked connection ? Rabbitmq doc states Writes on a blocked connection will time out or fail with an I/O write exception.. So how does that transpire ? Will channel.publish(...) raise ?

jhogg commented 2 years ago

Mehdi,

As you have noted, there is a significant amount of async communications involved and the exact handling will depend on how you are configuring channels and publishing.

Errors/exceptions received async are only generated when a call is made to [channel].check_for_exceptions(), which most (all?) calls that transmit/receive on the channel call as part of their handling BEFORE sending a new message. What needs to be noted here is this exception can be a channel level issue or it is an issue with a PRIOR message. I believe you have access to a message id to identify a failed message, but can't be 100% positive. You would need to implement ack/nack tracking per-message and manage the list of unconfirmed messages, and note that the exception is BEFORE the current message is sent, so you have a "prior failed" and the current one failed.

If you don't want to deal with all the tracking, then (1) the channel needs delivery confirmation and (2) you need to publish with mandatory=True. This is MUCH slower, but it significantly simplifies the code. On a publish(mandatory=True) you will (1) receive an exception if there is a channel issue and the current message is NOT published or on completion you will receive a boolean response - True = success / False = failure.

Jay

On Fri, Apr 22, 2022 at 1:32 AM Mehdi GMIRA @.***> wrote:

Thanks for the swift replies. The is_blocked property is asynchronous, since it relies on a notification sent by Rabbitmq, so we might be trying to publish on a blocked connection, even if we use is_blocked. So my question is: what does amqpstorm do when i try to write on a blocked connection ? Rabbitmq doc states Writes on a blocked connection will time out or fail with an I/O write exception.. So how does that transpire ? Will channel.publish(...) raise ?

— Reply to this email directly, view it on GitHub https://github.com/eandersson/amqpstorm/issues/118#issuecomment-1106059558, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABJGJS7HGMW7SY7H7ECT3LVGJBYPANCNFSM5T7UNAIA . You are receiving this because you commented.Message ID: @.***>