nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.07k stars 169 forks source link

fails to publish if client is cloned #1336

Closed ashupednekar closed 3 weeks ago

ashupednekar commented 3 weeks ago

Observed behavior

I have a function like so

pub async fn get_jetstream() -> Result<(Client, Stream)> {
    let client = connect_with_options(
        env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
        ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
    )
    .await?;
    client.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
    let ctx = jetstream::new(client.clone());
    let cfg = Config {
        name: "tasks".to_string(),
        subjects: vec!["tasks.>".to_string()],
        retention: RetentionPolicy::WorkQueue,
        ..Default::default()
    };
    Ok((client, ctx.get_or_create_stream(cfg).await?))
}

the example publish line here works as expected

Expected behavior

I'm initializing jetstream with a clone of this client object...

jetstream behaves as expected...

but then, when I try publishing with the returned client

    #[traced_test]
    #[tokio::test]
    async fn test_produce() -> Result<()>{
        let (nc, _) = get_jetstream().await?;
        nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
        nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
        nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
        Ok(())
    }

I get no errors, but no message gets published..

Here's a screenshots illustrating nats s subjects and advisory events

image

Server and client version

server: 2.9.20 client: async-nats = "0.37.0"

Host environment

nats running as docker containers

client system: image

Steps to reproduce

util

pub async fn get_jetstream() -> Result<(Client, Stream)> {
    let client = connect_with_options(
        env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
        ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
    )
    .await?;
    client.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
    let ctx = jetstream::new(client.clone());
    let cfg = Config {
        name: "tasks".to_string(),
        subjects: vec!["tasks.>".to_string()],
        retention: RetentionPolicy::WorkQueue,
        ..Default::default()
    };
    Ok((client, ctx.get_or_create_stream(cfg).await?))
}

test

    #[traced_test]
    #[tokio::test]
    async fn test_produce() -> Result<()>{
        let (nc, _) = get_jetstream().await?;
        nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
        nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
        nc.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
        Ok(())
    }
ashupednekar commented 3 weeks ago

note: there's probably something I'm doing incorrectly...

same behavior when I create a second instance as well. nothing to do with clone


pub async fn get_jetstream() -> Result<(Client, Stream)> {
    let client = connect_with_options(
        env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
        ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
    )
    .await?;
    client.publish("tasks.one", serde_json::to_vec(&json!({"msg": "hey"}))?.into()).await?;
    let ctx = jetstream::new(client);
    let cfg = Config {
        name: "tasks".to_string(),
        subjects: vec!["tasks.>".to_string()],
        retention: RetentionPolicy::WorkQueue,
        ..Default::default()
    };
    let nc = connect_with_options(
        env::var("NATS_URL").unwrap_or("localhost:30042".to_string()),
        ConnectOptions::new().name(env::var("SERVICE_NAME").unwrap_or("worker".to_string())),
    )
    .await?; //because of nats-rs bug: https://github.com/nats-io/nats.rs/issues/1336
    Ok((nc, ctx.get_or_create_stream(cfg).await?))
}
ashupednekar commented 3 weeks ago

Updated the test to directly create a client

 #[traced_test]
    #[tokio::test]
    async fn test_produce() -> Result<()> {
        //let (nc, _) = get_jetstream().await?;
        let nc = connect_with_options(
            settings.nats_url.clone(),
            ConnectOptions::new().name(settings.service_name.clone()),
        )
        .await?;
        nc.publish(
            "tasks.one",
            serde_json::to_vec(&json!({"msg": "hey"}))?.into(),
        )
        .await?;
        nc.publish(
            "tasks.one",
            serde_json::to_vec(&json!({"msg": "hey"}))?.into(),
        )
        .await?;
        nc.publish(
            "tasks.one",
            serde_json::to_vec(&json!({"msg": "hey"}))?.into(),
        )
        .await?;
        Ok(())
    }

still not producing properly, can someone please point out what could be amiss?

Jarema commented 3 weeks ago

You need to flush() before the end of the test/app to ensure that everything in the buffer is sent through the socket before everything is dropped.

ashupednekar commented 3 weeks ago

yep, thanks adding nc.flush().await? in the end worked