nats-io / nats-server

High-Performance server for NATS.io, the cloud and edge native messaging system.
https://nats.io
Apache License 2.0
16.06k stars 1.42k forks source link

Receive watch events for TTL'd KV objects #3268

Open segfaultdoc opened 2 years ago

segfaultdoc commented 2 years ago

Feature Request

Use Case:

We currently use JetStream KV to build different in memory cache views of the objects stored. We'd like to leverage the KV stores TTL mechanism to delete stale objects in our derived views.

Proposed Change:

Emit watch events from the server signaling when a KV object has been TTL'd.

Who Benefits From The Change(s)?

Jito Labs and others

Alternative Approaches

Build cleanup logic in app code.

derekcollison commented 2 years ago

This has been requested and we are looking into the best way to provide this.

esemeniuc commented 1 year ago

Would love to see this too

derekcollison commented 1 year ago

Yes on our list for 2.10..

gedw99 commented 1 year ago

This would be quite useful for building large distributed caching system with nats as the origin store.

the secondary caches could be cdn style servers that speak http downstream and nats upstream.

Use case then is:

where an external cache line can be invalidated when the nats kv or Object item TTL expires..

alexandreLamarre commented 1 year ago

Any updates on this making it into v2.10?

derekcollison commented 1 year ago

Will not make it in 2.10.0. Still on our list.

stpnov commented 10 months ago

it's much needed, we're waiting

derekcollison commented 10 months ago

It will be coming..

alexj212 commented 10 months ago

@derekcollison Any eta on a target version/date?

derekcollison commented 10 months ago

2.11 April timeframe is target.

mxcoppell commented 9 months ago

If the key expiration events could have an option to be distributed to a stream that would be great. This would be great for a multi-client environment to use different stream policy to consume the expiration events.

derekcollison commented 9 months ago

They will be on the stream that caused the expiration. As system delete markers.

mxcoppell commented 9 months ago

Is there any limitation for the underlying stream for the KV store? One issue we have with Redis is that all pods need to listen to the key space for expiration events and it requires dedup for the duplicate events. If the underlying stream could be WorkQueuePolicy and a number of queue subscribers can listen to the stream that would be perfect.

derekcollison commented 9 months ago

@mxcoppell I am not following. Could you provide a bit more information?

mxcoppell commented 9 months ago

Sure, using the following example from https://shijuvar.medium.com/using-nats-jetstream-key-value-store-in-go-85f88b0848ce:

url := os.Getenv("NATS_URL")
 if url == "" {
  url = nats.DefaultURL
 }

 nc, _ := nats.Connect(url)
 defer nc.Drain()

 js, _ := nc.JetStream()
if stream, _ := js.StreamInfo("KV_discovery"); stream == nil {
  // A key-value (KV) bucket is created by specifying a bucket name.
  kv, _ = js.CreateKeyValue(&nats.KeyValueConfig{
   Bucket: "discovery",
  })
 } else {
  kv, _ = js.KeyValue("discovery")
 }

The KV bucket "discovery" is in the stream "KV_discovery".

My understanding to the key expiration events (could be totally wrong) is that the event will be distributed to the stream consumers of "KV_discovery". My last question about this if user could set the retention policy like "WorkGroupQueue" for "KV_discovery" so a group of queue subscribers could listen to stream "KV_discover" for the expiration events - and each expiration event will be distributed to one subscriber at a time, not broadcasting to all the listeners.

Thanks!

derekcollison commented 9 months ago

Yes system level expiration or removal will be distributed to the watchers of that KV in 2.11.

mxcoppell commented 9 months ago

Derek, could you use an example to illustrate how this expiration event subscription work? And if the events are broadcasts to all watchers or there could be more options like queue subscriber?

derekcollison commented 9 months ago

They will be for all watchers. This is similar to what happens today if you use the KV library to delete a key.

arunsmenon88 commented 7 months ago

Hello Derek , Just wanted to touch base on the availability of this feature in upcoming 2.11 release. We couldnt find it in the pre-release notes https://github.com/nats-io/nats-server/releases/tag/v2.11.0-preview.2 This would be a very useful feature for us. and would be great if you can give us some update on when this feature would be available.

derekcollison commented 7 months ago

Still planned for 2.11, we are a bit behind schedule wise.

arunsmenon88 commented 7 months ago

Thanks Derek

kyrylo commented 6 months ago

Hey there, just checking in.

I am also waiting for this feature. I will be using it as soon as it's released.

Thanks for the great work, @derekcollison 🙌

derekcollison commented 6 months ago

Yes still planned for 2.11 and hoping to get this into next 2.11 preview 3.

Some very large customers are taking precedence but still planned.

ashupednekar commented 3 months ago

Hi, any updates? It'd be really helpful... Are there any advisory events or something?

This is what I'm trying to achieve

package main

import (
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {

    nc, err := nats.Connect("nats://localhost:30042")
    if err != nil {
        log.Fatal(err)
    }
    defer nc.Close()

    js, err := nc.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    kv, err := js.CreateKeyValue(&nats.KeyValueConfig{
        Bucket: "ashukv",
        TTL: time.Second * 5,
    })
    if err != nil {
        log.Fatal(err)
    }

    _, err = kv.Put("name", []byte("ashu"))
    if err != nil {
        log.Fatal(err)
    }

    log.Println("Key set with 5 second expiry")

    go func() {

        sub, err := nc.Subscribe("$JS.EVENT.ADVISORY.*", func(msg *nats.Msg) {
            log.Printf("Received key expiry advisory: %s\n", string(msg.Data))
        })
        if err != nil {
            log.Fatal(err)
        }
        defer sub.Unsubscribe()

        select {}
    }()

    select {}
}
derekcollison commented 3 months ago

Coming in 2.11 but not landed yet.

ckasabula commented 1 month ago

@ashupednekar

Are there any advisory events or something?

I tested using cli, subscribing to ">". No messages are received on TTL expire, advisory or otherwise.

derekcollison commented 1 month ago

Has not landed yet, blocked on per msg ttl which needs to land first.

aradalvand commented 2 weeks ago

@derekcollison In your upcoming implementation, when a key is deleted via a TTL expiration, will that be a marker included in its history (assuming the history setting is set to 1 or some greater number)? That's absolutely crucially needed in certain scenarios.

Currently, the bucket-wide MaxAge TTL leaves no trace, there is no history to be found after the fact.

ripienaar commented 2 weeks ago

Yes tja

@derekcollison In your upcoming implementation, when a key is deleted via a TTL expiration, will that be a marker included in its history (assuming the history setting is set to 1 or some greater number)? This is absolutely crucial in certain scenarios.

Yes that is the plan with an optional timer for how long the market will stay