nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.45k stars 686 forks source link

Message duplications with js.Unsubcribe() #1036

Closed dblokhin closed 2 years ago

dblokhin commented 2 years ago

Defect

If we use an unsubcribe() method of subscription we have received duplication of messages despite using manual ack or nats.Durable subscription.

If we commented unsubscribe() for the first time of using new stream (or cleared) and then use unsubscribe() as usual we got an expected behavior: each message is delivered at ones.

Versions of nats.go and the nats-server if one was involved:

OS/Container environment:

docker container: nats@latests

Steps or code to reproduce the issue:

package main

import (
    "context"
    "fmt"
    "github.com/nats-io/nats.go"
    "log"
    "time"
)

const (
    topic = "topic"
    queue = "queue"
)

func main() {
    // nats connection
    natsCli, err := nats.Connect(nats.DefaultURL)
    if err != nil {
        log.Fatalf("failed nats: %v", err)
    }

    // js client
    js, err := natsCli.JetStream()
    if err != nil {
        log.Fatalf("failed jetstream: %v", err)
    }

    _, err = js.AddStream(&nats.StreamConfig{
        Name:     "EPD",
        Subjects: []string{"*"},
    })

    if err != nats.ErrStreamNameAlreadyInUse && err != nil {
        log.Fatalf("failed jetstream: %v", err)
    }

    sub, err := js.QueueSubscribe(topic, queue, handler)
    if err != nil {
        log.Fatalf("failed subscribe: %v", err)
    }

    if err := publicEvent(js); err != nil {
        log.Fatalf("failed public event")
    }

    time.Sleep(time.Second / 2)

       // ATTENTION HERE: 
       // If we use unsubcribe() for new stream (just created or stream without any messages)
       // handler reads all messages from very begining of the stream every time (even we use a manual ack)
    if err := sub.Unsubscribe(); err != nil { 
        log.Printf("failed to unsub: %v", err)
    }
}

func publicEvent(js nats.JetStreamContext) error {
    if _, err := js.Publish(topic, []byte(time.Now().String())); err != nil {
        log.Fatalf("failed to publish: %v", err)
    }

    return nil
}

func handler(msg *nats.Msg) {
    fmt.Printf("got: %s\n", msg.Data)
}

Expected result:

[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:06.995043332 +0300 MSK m=+0.005896941
[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:08.609378401 +0300 MSK m=+0.004604409
[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:09.914021915 +0300 MSK m=+0.003794646
^[[A[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:11.094996655 +0300 MSK m=+0.004569221
[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:12.750914547 +0300 MSK m=+0.004184066

Actual result:

[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:06.995043332 +0300 MSK m=+0.005896941
[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:06.995043332 +0300 MSK m=+0.005896941
got: 2022-08-09 11:52:08.609378401 +0300 MSK m=+0.004604409
[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:06.995043332 +0300 MSK m=+0.005896941
got: 2022-08-09 11:52:08.609378401 +0300 MSK m=+0.004604409
got: 2022-08-09 11:52:09.914021915 +0300 MSK m=+0.003794646
^[[A[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:06.995043332 +0300 MSK m=+0.005896941
got: 2022-08-09 11:52:08.609378401 +0300 MSK m=+0.004604409
got: 2022-08-09 11:52:09.914021915 +0300 MSK m=+0.003794646
got: 2022-08-09 11:52:11.094996655 +0300 MSK m=+0.004569221
[rock@hommy EPD]$ go run EPD/cmd/nats
got: 2022-08-09 11:52:06.995043332 +0300 MSK m=+0.005896941
got: 2022-08-09 11:52:08.609378401 +0300 MSK m=+0.004604409
got: 2022-08-09 11:52:09.914021915 +0300 MSK m=+0.003794646
got: 2022-08-09 11:52:11.094996655 +0300 MSK m=+0.004569221
got: 2022-08-09 11:52:12.750914547 +0300 MSK m=+0.004184066
kozlovic commented 2 years ago

This is because for JetStream subscriptions that the library created, Unsubscribe() will delete it. This is documented here: https://pkg.go.dev/github.com/nats-io/nats.go#Subscription.Unsubscribe and may change in the future with out JetStream API simplification project.

The issue boils down to the fact that the library auto-creates the JetStream consumer which then can lead to this unexpected (but documented) behavior. To prevent that, add a call to AddConsumer prior to starting your QueueSubscribe call and the subscription will not be removed (don't forget to specify the DeliverGroup in the ConsumerConfig when adding the consumer). Note that AddConsumer will be idempotent the next time you restart your application - assuming that you don't change the content of the ConsumerConfig code).