mnetship / ratsio

NATS & NATS Streaming Server client library for Rust
MIT License
50 stars 22 forks source link

Stan Manual acknowledgment #11

Closed glueball closed 4 years ago

glueball commented 4 years ago

I could find no way of sending manual message acknowledgments (sorry if it exists and I missed it). So I'd like to suggest this feature.

I found the manual_acks(true) method in the StanSubscribeBuilder, which suppresses the automatic acknowledgment. But then found no way of manually sending the acknowledgment message. Looking at the code, It'd need the ack_inbox of the subscription and the underlying NATS client, but neither are public.

IMHO, the ideal API would be something like stan_message.acknowledge(), or at least stan_client.acknowledge(stan_message). I guess that acknowledging the message should consume it, since if you are doing it manually it means that you're only acknowledging after fully processing the message.

mnetship commented 4 years ago

Hi @glueball , i've added a method StanClient::acknowledge(&self, message) to acknowledge messages. Only messages that came through a subscription can be acknowledged in this way, which is always the case. Let me know if this is sufficient. ratsio-0.2.4 has this addition

glueball commented 4 years ago

It does look sufficient, but I guess the method should be "pub".

mnetship commented 4 years ago

Oops, it's pub now. 0.2.5

glueball commented 4 years ago

It works! ;)

I wrote the following (mostly stupid) example:

            let sub = StanSubscribe::builder()
                .subject(subject.clone())
                .start_position(StartPosition::First)
                .durable_name(Some("test.subject.durable".into()))
                .manual_acks(true)
                .ack_wait_in_secs(5)
                .build().unwrap();

            let stan_client = stan_client.clone().to_owned();

            stan_client
                .subscribe(sub, SyncHandler(Box::new(move |stan_msg: StanMessage| {
                    let msg = std::str::from_utf8(&stan_msg.payload).unwrap();
                    println!("> GOT :::: {:?} -> {}", stan_msg, msg);

                    // Stupid example to see that some messages get re-delivered and others don't
                    if stan_msg.sequence % 2 == 0 {
                        tokio::spawn(stan_client.acknowledge(stan_msg)
                                                .map(|_| ())
                                                .map_err(|_| ()));
                    }

                    Ok(())
                })))

In this example, it's a bit weird that that I need to clone an owned stan_client in order to move it into the closure. But I guess that in a real use case the stan_client will be stored somewhere where it can be used in a more natural way.

For me it is ok. Many thanks! ;)