nats-io / nats.rs

Rust client for NATS, the cloud native messaging system.
Apache License 2.0
1.07k stars 169 forks source link

Lifetime of async_nats kv::Watch vs nats kv::Watch #908

Open baadc0de opened 1 year ago

baadc0de commented 1 year ago

Use Case:

The sync NATS client's kv::Watch struct does not have a lifetime parameter, while the async_nats version does. This complicates ownership when trying to own a kv::Store and a tokio_stream StreamMap of kv::Watch instances in the same struct.

Proposed Change:

Make async_nats kv::Watch not have a lifetime parameter to match sync nats kv::Watch.

Who Benefits From The Change(s)?

Users of multiple kv::Watch in a single tokio select! loop.

Alternative Approaches

Somehow manhandle rust borrow checker into allowing the borrow. Maybe rebuild StreamMap on each loop iteration?

baadc0de commented 1 year ago

This looks like it could work for me - the resulting Streams own their own kv::Store and can be put in a StreamMap.

pub fn watch_bucket_as<T: DeserializeOwned>(store: kv::Store, key: String) -> impl Stream<Item = anyhow::Result<Option<T>>> + Unpin {
  Box::pin(stream! {
    let Ok(stream) = store.watch(key).await else { return; };
    pin_mut!(stream);

    while let Some(entry) = stream.next().await {
      match entry {
        | Ok(entry) => {
          match entry.operation {
            | kv::Operation::Put => {
              yield serde_json::from_slice(&entry.value).map_err(serde_err).map(Some);
            }
            | kv::Operation::Delete | kv::Operation::Purge => {
              yield Ok(None);
            }
          }
        }
        | Err(err) => {
          yield Err(nats_err(err));
        }
      }
    }
  })
}