nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.55k stars 696 forks source link

`KeyValue.Watch()` blocks forever for certain values of the `keys` parameter #1608

Closed mihaitodor closed 6 months ago

mihaitodor commented 7 months ago

Observed behavior

The kv.Watch("\"001.>\"") call blocks forever if there's a key starting with 001. in Nats KV.

Additionally, if you also try kv.Watch(""), you get a rather cryptic error: nats: jetstream not enabled.

Expected behavior

kv.Watch() should reject invalid keys or, at the very least, it shouldn't block forever.

Also, kv.Watch("") should return a more informative error message, maybe saying that the keys parameter cannot be set to an empty string.

Server and client version

Host environment

OSX Sonoma 14.4.1 on Macbook Pro M3 Max

Steps to reproduce

Run the following shell commands:

> docker run --rm -it -p4222:4222 nats:2.10.12 --js
> nats kv add foobar
> nats kv put foobar 001.123 abc

Create a go.mod file with the following contents:

module test

go 1.22.1

require github.com/nats-io/nats.go v1.34.1

require (
    github.com/klauspost/compress v1.17.2 // indirect
    github.com/nats-io/nkeys v0.4.7 // indirect
    github.com/nats-io/nuid v1.0.1 // indirect
    golang.org/x/crypto v0.18.0 // indirect
    golang.org/x/sys v0.16.0 // indirect
)

And then write the following code in a file called main.go:

package main

import (
    "context"
    "log"
    "time"

    "github.com/nats-io/nats.go"
)

func main() {
    conn, err := nats.Connect("localhost:4222")
    if err != nil {
        log.Fatal(err)
    }

    js, err := conn.JetStream()
    if err != nil {
        log.Fatal(err)
    }

    kv, err := js.KeyValue("foobar")
    if err != nil {
        log.Fatal(err)
    }

    // watcher, err := kv.Watch("001.>")
    watcher, err := kv.Watch("\"001.>\"")
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        if err := watcher.Stop(); err != nil {
            log.Fatal(err)
        }
    }()

    var keys []any
    ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
    defer done()
loop:
    for {
        select {
        case entry := <-watcher.Updates():
            if entry == nil {
                break loop
            }
            keys = append(keys, entry.Key())
        case <-ctx.Done():
            log.Fatal("timeout")
        }
    }
    log.Printf("%v", keys)
}

If you execute go mod tidy && go run main.go, you'll see a message containing timeout. If you uncomment line 27 and comment out line 28, you should see [001.123] instead.

Additionally, if you try kv.Watch(""), you'll get a rather cryptic error: nats: jetstream not enabled.

(cc @codegangsta)

mihaitodor commented 7 months ago

Update: I also investigated switching to the new github.com/nats-io/nats.go/jetstream API and I can still reproduce the issue with the following code:

package main

import (
    "context"
    "log"
    "time"

    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

func main() {
    conn, err := nats.Connect("localhost:4222")
    if err != nil {
        log.Fatal(err)
    }

    js, err := jetstream.New(conn)
    if err != nil {
        log.Fatal(err)
    }

    kv, err := js.KeyValue(context.Background(), "foobar")
    if err != nil {
        log.Fatal(err)
    }

    // watcher, err := kv.Watch(ctx, "001.>")
    watcher, err := kv.Watch(context.Background(), "\"001.>\"")
    if err != nil {
        log.Fatal(err)
    }
    defer func() {
        if err := watcher.Stop(); err != nil {
            log.Fatal(err)
        }
    }()

    var keys []any
    ctx, done := context.WithTimeout(context.Background(), 3*time.Second)
    defer done()
loop:
    for {
        select {
        case entry := <-watcher.Updates():
            if entry == nil {
                break loop
            }
            keys = append(keys, entry.Key())
        case <-ctx.Done():
            log.Fatal("timeout")
        }
    }
    log.Printf("%v", keys)
}

I can, however, now pass a context to kv.Watch() which makes it easier to abort after a while, but I still think the kv.Watch() call should reject invalid keys.

Also, the nats: jetstream not enabled error is still returned for kv.Watch(context.Background(), "").

Jarema commented 7 months ago

Thanks for the detailed report!

We will take a look soon.

piotrpio commented 6 months ago

@mihaitodor there were 2 issues:

Both will now return an error since both are essentially invalid arguments: nats: invalid key: keys cannot be empty and must be a valid NATS subject.

mihaitodor commented 6 months ago

That's awesome, thank you very much for the quick fixes!

piotrpio commented 6 months ago

Both PRs are merged and will be included in the next releases of nats-server and nats.go.