nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.3k stars 672 forks source link

Using Fetch() with and OrderedConsumer and a delivery policy of DeliverLastPerSubjectPolicy returns incorrect results #1662

Open cchamplin opened 2 weeks ago

cchamplin commented 2 weeks ago

Observed behavior

When using Fetch() with an OrderedConsumer and a delivery policy of DeliverLastPerSubjectPolicy multiple messages for a single subject will be returned. Instead of one message per subject. The last message for each subject is not returned.

Expected behavior

The last message per subject is returned in the results. Each subject is unique.

Server and client version

Server: 2.10.16 Client 1.36.0

Host environment

N/A

Steps to reproduce

package main

import (
    "context"
    "fmt"
    "os"
    "path/filepath"
    "time"

    server "github.com/nats-io/nats-server/v2/server"
    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

func main() {
    var bus, shutdown = StartLocalNATSServer()
    defer shutdown()

    var busAddress = "nats://" + bus.Addr().String() // use server address

    busConnection, err := nats.Connect(busAddress)
    if err != nil {
        panic(err)
    }

    NATSWaitConnected(busConnection) // wait connection if not connected yet

    cfg := jetstream.StreamConfig{
        Name:      "test-stream",
        Subjects:  []string{"test.>"},
        Storage:   jetstream.MemoryStorage,
        Retention: jetstream.LimitsPolicy,
        Replicas:  1,
    }

    js, err := jetstream.New(busConnection)
    if err != nil {
        panic(err)
    }

    stream, err := js.CreateStream(context.Background(), cfg)
    if err != nil {
        panic(err)
    }

    fmt.Println("Stream created")
    _ = stream

    _, err = js.Publish(context.Background(), "test.foo.1", []byte("Hello World!"))
    if err != nil {
        panic(err)
    }

    _, err = js.Publish(context.Background(), "test.foo.1", []byte("Hello World! 2"))
    if err != nil {
        panic(err)
    }

    _, err = js.Publish(context.Background(), "test.foo.1", []byte("Hello World! 3"))
    if err != nil {
        panic(err)
    }

    _, err = js.Publish(context.Background(), "test.foo.2", []byte("Hello World!"))
    if err != nil {
        panic(err)
    }

    _, err = js.Publish(context.Background(), "test.foo.2", []byte("Hello World! 2"))
    if err != nil {
        panic(err)
    }

    consumer, err := js.OrderedConsumer(context.Background(), "test-stream", jetstream.OrderedConsumerConfig{
        FilterSubjects: []string{
            "test.foo.>",
        },
        DeliverPolicy:     jetstream.DeliverLastPerSubjectPolicy,
        ReplayPolicy:      jetstream.ReplayInstantPolicy,
        InactiveThreshold: time.Second * 5,
    })
    if err != nil {
        panic(err)
    }

    consumerInfo := consumer.CachedInfo()
    fmt.Println("Consumer created, pending:", consumerInfo.NumPending)

    msgs, err := consumer.Fetch(int(consumerInfo.NumPending))
    if err != nil {
        panic(err)
    }

    for msg := range msgs.Messages() {
        fmt.Println(msg.Subject())
    }

    // Output:
    // Stream created
    // Consumer created, pending: 2
    // test.foo.1
    // test.foo.1

    // Expected output:
    // Stream created
    // Consumer created, pending: 2
    // test.foo.1
    // test.foo.2
}

func StartLocalNATSServer() (s *server.Server, shutdown func()) {
    s, err := server.NewServer(&server.Options{
        Host:           "127.0.0.1",
        Port:           server.RANDOM_PORT,
        NoLog:          true,
        NoSigs:         true,
        MaxControlLine: 2048,
        JetStream:      true,
        StoreDir:       filepath.Join(os.TempDir(), server.JetStreamStoreDir, "tmp"),
    })
    if err != nil {
        panic(err)
    }

    go func() {
        err := server.Run(s)
        if err != nil {
            panic(err)
        }
    }()

    if s == nil {
        panic("starting nats server: nil")
    }

    if !s.ReadyForConnections(5 * time.Second) {
        panic("starting nats server: timeout")
    }
    return s, func() {
        s.Shutdown()
    }
}

func NATSWaitConnected(c *nats.Conn) {
    timeout := time.Now().Add(5 * time.Second)
    for time.Now().Before(timeout) {
        if c.IsConnected() {
            return
        }
        time.Sleep(25 * time.Millisecond)
    }
    panic("nats connection timeout")
}
Jarema commented 2 weeks ago

I was able to reproduce this bug. It seems that for some reason the ordered consumer with this config is instantly recreated, and recreated consumer always resumes as last known sequence.

We will issue a fix.