nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.04k stars 165 forks source link

JetStream: `SampleFrequency` is not set on Pull Consumers #1299

Closed bengsparks closed 1 month ago

bengsparks commented 1 month ago

Observed behavior

Create a pull consumer with sample_frequency set to anything fails. Note that I have not checked if the same occurs for push consumers. See example below.

Expected behavior

The created consumer should not contain SampleFrequency: ""

Server and client version

nats.rs

[package]
name = "async_nats_consumer_sample_frequency"
version = "0.1.0"
edition = "2021"

[dependencies]
async-nats = "0.35.1"
tokio = { version = "1.39", features = ["macros", "rt-multi-thread"] }

nats-server

$ docker run -p 4222:4222 nats:latest --version     
nats-server: v2.10.18

nats-cli

$ nats --version
(devel)

Host environment

No response

Steps to reproduce

Consumer Creation (with Cargo.toml above) ```rs use async_nats::jetstream::{consumer::pull, stream}; #[tokio::main] async fn main() { let connection = async_nats::connect("localhost:4222").await.unwrap(); let js = async_nats::jetstream::new(connection); let stream = js .create_stream(stream::Config { name: "StreamWithSampledConsumer".into(), ..Default::default() }) .await .unwrap(); let mut consumer = stream .create_consumer(pull::Config { name: Some("SampledConsumer".into()), durable_name: Some("SampledConsumer".into()), description: Some("See below to check that Ack Sampling has been set to 100%!".to_string()), sample_frequency: 100, // <--- sample all the messages ..Default::default() }) .await.unwrap(); // Read using provided config // Output: 0 println!("in-memory sample frequency {}", consumer.cached_info().config.sample_frequency); // Updating does nothing let new_config = consumer.info().await.unwrap(); // Output: 0 println!("fetched sample frequency {}", new_config.config.sample_frequency); } ```
Double-check consumer by looking up information with nats-cli ```console [nix-shell:~/monitor]$ nats consumer info [localhost] ? Select a Stream StreamWithSampledConsumer [localhost] ? Select a Consumer SampledConsumer Information for Consumer StreamWithSampledConsumer > SampledConsumer created 2024-08-06T16:26:26+02:00 Configuration: Name: SampledConsumer Description: See below to check that Ack Sampling has been set to 100%! Pull Mode: true Deliver Policy: All Ack Policy: Explicit Ack Wait: 30.00s Replay Policy: Instant Max Ack Pending: 1,000 Max Waiting Pulls: 512 State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 0 Waiting Pulls: 0 of maximum 512 ```
Triple-check consumer by manually setting the sample frequency with nats-cli ```console [nix-shell:~/monitor]$ nats consumer edit --sample 100 [localhost] ? Select a Stream StreamWithSampledConsumer [localhost] ? Select a Consumer SampledConsumer Differences (-old +new): api.ConsumerConfig{ ... // 18 identical fields RateLimit: 0, ReplayPolicy: s"Instant", - SampleFrequency: "", + SampleFrequency: "100", HeadersOnly: false, MaxRequestBatch: 0, ... // 7 identical fields } [localhost] ? Really edit Consumer StreamWithSampledConsumer > SampledConsumer Yes Information for Consumer StreamWithSampledConsumer > SampledConsumer created 2024-08-06T16:26:26+02:00 Configuration: Name: SampledConsumer Description: See below to check that Ack Sampling has been set to 100%! Pull Mode: true Deliver Policy: All Ack Policy: Explicit Ack Wait: 30.00s Replay Policy: Instant Sampling Rate: 100 Max Ack Pending: 1,000 Max Waiting Pulls: 512 State: Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 0 Waiting Pulls: 0 of maximum 512 ```
Jarema commented 1 month ago

Hey!

Thanks for reporting this one. This is because the actual field is sample_freq, not sample_frequency.

Are you interested in contributing and fixing the serde part?:)

bengsparks commented 1 month ago

Certainly I can :)

bengsparks commented 1 month ago

PR #1300 should fix this.