amqp-rs / lapin

AMQP client library in Rust, with a clean, futures based API
MIT License
1.04k stars 92 forks source link

Reconnect on shutting down AMQP service #389

Closed mjanssen closed 2 months ago

mjanssen commented 1 year ago

Hi,

We have experienced some issues with one of our providers killing their services every once in a while, which results in Lapin to stop consuming messages.

We have an implementation based on the reconnect example, which functions when Lapin can't find the queue name for example, but nothing happens when the service is killed.

I have a test setup that connects to a local instance of RabbitMQ, which I can terminate. I can connect to a queue and receive messages when I publish a message on the queue. However, when I kill the container, nothing happens in the Lapin connection and it still waits for messages (line 52 of the example).

When I boot RabbitMQ again and publish a message on the same queue, the message is not received in the Lapin message loop.

Any clue how to fix this issue?

mjanssen commented 1 year ago

Verifying the consumer state in the while loop allowed me to check whether the consumer was still active.

while let Some(delivery) = consumer.next().await {
                // If state is not active anymore, make sure we try to reconnect
                if consumer.state().ne(&lapin::ConsumerState::Active) {
                    error!("Connection broke");

                    retry_connection(addr, channel_name);
                    break;
                }

                ...
}

Not sure if this is the correct way of handling this issue, but it seems to work :)

Keruspe commented 1 year ago

Hmm, you shouldn't need to do that, the next().await should give you an error. Can you reopen this ?

mjanssen commented 1 year ago

Definitely!

I've setup a local RabbitMQ environment using Docker

docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmqcontainer rabbitmq:3.9.5-management

and connected it through the following url: amqp://localhost:5672?heartbeat=5&connection_timeout=10000.

I have setup a custom queue in the UI of RabbitMQ (http://localhost:15672/#/queues) and connected through it using Lapin. When the connection is all set, I initially expected that stopping the container (docker stop rabbitmqcontainer) would automatically trigger the error handling on if let Err(err) = connect(addr.clone(), channel.clone()).await.

Keruspe commented 3 months ago

Is it still valid with latest versions where we detect missing heartbeats from the server?