nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.03k stars 162 forks source link

async_nats: double_ack() should return an error if the ack_wait period is over #1042

Open ghost opened 1 year ago

ghost commented 1 year ago

Use case

Currently, we are using NATS to run some long running jobs. To avoid running too much jobs, we have a rate limit for some worker that pause message processing to avoid overwhelm itself. When the service is ready to accept messages, we are sending the ack.

But in the current implementation, ack doesn't throw an error when the ack_wait period is exceeded, and this message will be redelivered by NATS even if it's already processed.

This can be reproduced using the jetstream_pull example with the following patch by using the WorkQueue rentention policy:

diff --git a/async-nats/examples/jetstream_pull.rs b/async-nats/examples/jetstream_pull.rs
index 8368c9a..358449b 100644
--- a/async-nats/examples/jetstream_pull.rs
+++ b/async-nats/examples/jetstream_pull.rs
@@ -23,12 +23,14 @@ async fn main() -> Result<(), async_nats::Error> {
         .create_stream(jetstream::stream::Config {
             name: stream_name,
             subjects: vec!["events.>".to_string()],
+            retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
             ..Default::default()
         })
         .await?
         // Then, on that `Stream` use method to create Consumer and bind to it too.
         .create_consumer(jetstream::consumer::pull::Config {
             durable_name: Some("consumer".to_string()),
+            ack_wait: std::time::Duration::from_secs(1),
             ..Default::default()
         })
         .await?;
@@ -52,14 +54,18 @@ async fn main() -> Result<(), async_nats::Error> {
     // Iterate over messages.
     while let Some(message) = messages.next().await {
         let message = message?;
+        // long running job here
+        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
+
+        // acknowledge the message
+        message.ack().await?;
+
+        // message can now be processed
         println!(
             "got message on subject {} with payload {:?}",
             message.subject,
             from_utf8(&message.payload)?
         );
-
-        // acknowledge the message
-        message.ack().await?;
     }

     Ok(())

When running the code, you will see that all messages are processed but the last one is still in queue because it can't be acked in time.

Proposed change

ack() should return an error of kind WaitPeriodOver or anything else to handle this specific case.

Who benefits from the change(s)?

Any user who needs to handle a lot of concurrent long running job

Alternative Approaches

No response

Jarema commented 1 year ago

Thanks for filling in the issue!

Did you consider using double_ack? https://docs.rs/async-nats/latest/async_nats/jetstream/message/struct.Message.html#method.double_ack

The problem with client-side timeout check is that it's race'y in nature: From client perspective, there is still time to send the ack, but until it arrives at the server, the ack wait threshold is reached and redelivery already is happening.

ghost commented 1 year ago

Well, even using the double_ack, I don't get any error in return. For example, by reusing the example jetstream_pull and with this diff:

diff --git a/async-nats/examples/jetstream_pull.rs b/async-nats/examples/jetstream_pull.rs
index 8368c9a..216260e 100644
--- a/async-nats/examples/jetstream_pull.rs
+++ b/async-nats/examples/jetstream_pull.rs
@@ -23,12 +23,14 @@ async fn main() -> Result<(), async_nats::Error> {
         .create_stream(jetstream::stream::Config {
             name: stream_name,
             subjects: vec!["events.>".to_string()],
+            retention: async_nats::jetstream::stream::RetentionPolicy::WorkQueue,
             ..Default::default()
         })
         .await?
         // Then, on that `Stream` use method to create Consumer and bind to it too.
         .create_consumer(jetstream::consumer::pull::Config {
             durable_name: Some("consumer".to_string()),
+            ack_wait: std::time::Duration::from_secs(1),
             ..Default::default()
         })
         .await?;
@@ -47,19 +49,19 @@ async fn main() -> Result<(), async_nats::Error> {

     // Attach to the messages iterator for the Consumer.
     // The iterator does its best to optimize retrieval of messages from the server.
-    let mut messages = consumer.messages().await?.take(10);
+    let mut messages = consumer.messages().await?;

     // Iterate over messages.
     while let Some(message) = messages.next().await {
+        tokio::time::sleep(std::time::Duration::from_millis(150)).await;
         let message = message?;
+        // acknowledge the message
+        message.double_ack().await.unwrap();
         println!(
             "got message on subject {} with payload {:?}",
             message.subject,
             from_utf8(&message.payload)?
         );
-
-        // acknowledge the message
-        message.ack().await?;
     }

     Ok(())

I am getting this output when removing the take(10) on the messages stream:

got message on subject events.0 with payload "data"
got message on subject events.1 with payload "data"
got message on subject events.2 with payload "data"
got message on subject events.3 with payload "data"
got message on subject events.4 with payload "data"
got message on subject events.5 with payload "data"
got message on subject events.6 with payload "data"
got message on subject events.7 with payload "data"
got message on subject events.8 with payload "data"
got message on subject events.9 with payload "data"
got message on subject events.6 with payload "data"
got message on subject events.7 with payload "data"
got message on subject events.8 with payload "data"
got message on subject events.9 with payload "data"

I would expect the double_ack to return an error from the server telling it that the message hasn't been acked in time.

Jarema commented 1 year ago

I'll take a look into double ack not returning the expected response.

ghost commented 1 year ago

Thanks!