nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.02k stars 161 forks source link

Allow for a separate `Acker` type from a jetstream Message #919

Closed thomastaylor312 closed 1 year ago

thomastaylor312 commented 1 year ago

Use Case:

It would be nice if a jetstream Message type could return or have some sort of Acker type that can be used to ack the message once data has been consumed. To avoid allocations in our code, we often consume the message payload rather than borrowing it so we don't end up with 2 copies of everything (the raw payload and the newly parsed bytes). But when you do that, you can't call any of the ack methods because it borrows the data in the reply of the normal inner nats message. One example of this:

if let Err(e) = republish(
                    &msg.context,
                    publish_topic.clone(),
                    msg.message.headers.unwrap_or_default(),
                    msg.message.payload,
                )
                .await
                {
                    error!(error = ?e, "Unable to republish message. Will nak and retry");
                    if let Err(e) = msg
                        .ack_with(async_nats::jetstream::AckKind::Nak(None))
                        .await
                    {
                        warn!(error = ?e, "Unable to nak. This message will timeout and redeliver");
                    }
                } else if let Err(e) = msg.double_ack().await {
                    if let Err(e) = msg
                        .ack_with(async_nats::jetstream::AckKind::Nak(None))
                        .await
                    {
                        // There isn't much we can do if this happens as this means we
                        // successfully published, but couldn't nak
                        error!(error = ?e, "Unable to ack. This message will timeout and redeliver a duplicate message");
                    }
                }
            }

Compiler error:

borrow of partially moved value: `msg`
partial move occurs because `msg.message.payload` has type `bytes::Bytes`, which does not implement the `Copy` trait

Another example of us needing to capture the whole message when all we need to do is ack: https://github.com/wasmCloud/wadm/blob/4beff2813dcedc5723e474e4a2948fa232a7e68f/src/consumers/commands.rs#L110-L117

Proposed Change:

My suggestion would be to have something that is either a data member of jetstream Message or can be constructed using something like msg.acker() that can be stored for later acking of the message.

Who Benefits From The Change(s)?

Anyone trying to reduce allocations in their code that consumes NATS messages.

Alternative Approaches

I don't think there are other alternative approaches right now. Currently the way around it is to clone

Jarema commented 1 year ago

Thanks @thomastaylor312 !

That is a good idea and will get implemented, probably for the 0.30.0.

caspervonb commented 1 year ago

Payload clone won't be very expensive as it Bytes, but semantically I do agree.

Jarema commented 1 year ago

Implemented in #938