gftea / amqprs

Async & Lock-free RabbitMQ Rust Client, Easy-to-use API
MIT License
214 stars 27 forks source link

No heartbeats sent with long-running consume functions? #138

Closed dtebbs closed 3 weeks ago

dtebbs commented 4 months ago

I have a client which accepts "requests", executes a long running computation (several minutes) and then writes the result to a response queue before ack-ing the original request. I have set the tokio runtime to multi-threaded, 2 worker threads, and the prefetch for the request queue is 1.

Since the consume function is async, I assumed that heartbeats would automatically be sent while I am processing requests. I also tried spawning in the handler to do the processing. However if the processing lasts long enough then the rabbitmq server closes my connection due to lack of heartbeat.

I'm not very familiar with the protocol or the amqprs implementation, so I'm wondering if this is expected and I must set the heartbeat timeout much higher, or should heartbeats be sent by other tasks while processing is going on and I'm doing something wrong?

Thanks in advance for any advice.

jmrgibson commented 3 months ago

I was under the impression that you had to manually send something for the heartbeat (in 1.5 at least). I've been doing

                tokio::task::spawn_local({
                    async move {
                        let interval: u64 = (connection.heartbeat() / 2).into();
                        loop {
                            time::sleep(time::Duration::from_secs(interval - 1)).await;
                            if let Err(error) = channel.flow(true).await {
                                error!("Error while sending heartbeat: {}", error);
                                break;
                            }
                        }
                    }
                });

and then handling the task exit as one of my failure conditions.

Another problem might be that your processing might be blocking the thread that the heartbeat task is on (since in my case, I use spawn_local rather than spawn. In that case, I would either increase the number of threads, or try using tokio::task::spawn_blocking to ensure your work done on a different thread than your local_set

dtebbs commented 3 months ago

Thanks @jmrgibson that is extremely helpful. Will give this a shot.

gftea commented 3 months ago

if you are using tokio and your task is doing long-running blocking computation, it will block whole runtime to shedule other tasks, you should run it in blocking context instead of async context. the heartbeat does send automatically by the lib.

dtebbs commented 2 months ago

Thanks for your help @gftea .
Will ensure the tokio threads don't block and remove the manual heartbeats.