nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.45k stars 686 forks source link

KV Updating not available. UpdateStream only runs when there is no changes #1538

Closed MashinaMashina closed 7 months ago

MashinaMashina commented 7 months ago

Observed behavior

When I use nats.jetstream.CreateKeyValue after changing TTL, I got error "nats: stream name already in use"

Expected behavior

I use nats.jetstream.CreateKeyValue and the TTL will update.

Server and client version

nats-io/nats.go version 1.31.0 nats in docker: nats:2.9-alpine3.18

Host environment

No response

Steps to reproduce

... connect to nats

// no error
_, err := jetstream.CreateKeyValue({
        Bucket:       "kv_name",
        MaxValueSize: 1024,
        TTL:             time.Minute,
        Storage:      nats.FileStorage,
    })

// Expected no error, but got stream name already in use
_, err = jetstream.CreateKeyValue({
        Bucket:       "kv_name",
        MaxValueSize: 1024,
        TTL:             0,
        Storage:      nats.FileStorage,
    })

==================

this behavior because we have this code (https://github.com/nats-io/nats.go/blob/main/kv.go#L489):

if errors.Is(err, ErrStreamNameAlreadyInUse) {
            if si, _ = js.StreamInfo(scfg.Name); si != nil {
                // To compare, make the server's stream info discard
                // policy same than ours.
                si.Config.Discard = scfg.Discard
                // Also need to set allow direct for v2.9.x+
                si.Config.AllowDirect = scfg.AllowDirect
                if reflect.DeepEqual(&si.Config, scfg) {
                    si, err = js.UpdateStream(scfg)
                }
            }
        }

In my opinion it must be like this:

if errors.Is(err, ErrStreamNameAlreadyInUse) {
            if si, _ = js.StreamInfo(scfg.Name); si != nil {
                // To compare, make the server's stream info discard
                // policy same than ours.
                si.Config.Discard = scfg.Discard
                // Also need to set allow direct for v2.9.x+
                si.Config.AllowDirect = scfg.AllowDirect
                if reflect.DeepEqual(&si.Config, scfg) {
                    err = nil
                } else {
                    si, err = js.UpdateStream(scfg)
                }
            }
        }

If configs are equals - do nothing, else - update stream. Now updating only if there are no changes - this is wrong.

ripienaar commented 7 months ago

We do need a KV Update function - but maybe make it a specific function rather than overuse this code path. This code path mainly is there to upgrade configuration of old buckets to current internal needs afaik

piotrpio commented 7 months ago

Hello @MashinaMashina, thanks for creating the issue. As @ripienaar said, we'll be adding an UpdateKeyValue() method for that, rather than modifying create. Possibly we'll also add CreateOrUpdate to match what we have for streams and consumers.

piotrpio commented 7 months ago

We've added UpdateKeyValue and CreateOrUpdateKeyValue methods in a recent PR: https://github.com/nats-io/nats.go/pull/1549

It is now available in release v1.33.1. Thank you for creating the issue!