nats-io / nats.go

Golang client for NATS, the cloud native messaging system.
https://nats.io
Apache License 2.0
5.34k stars 676 forks source link

consumer is already bound to a subscription #1269

Open sylr opened 1 year ago

sylr commented 1 year ago

Hello,

I've a JetStream subscription bound to a consumer and on a periodic basis I unsubscribe, do a few things and re-subscribe.

Sometimes the re-subscribe fails with consumer is already bound to a subscription. I suspect that re-subscribing too fast might be a factor in this.

Here a reproduction:

package main

import (
    "fmt"
    "time"

    "github.com/nats-io/nats.go"
)

const (
    streamName   = "STREAM"
    streamSubj   = "stream.>"
    consumerName = "unsubscribe"
)

var ch = make(chan *nats.Msg)

func main() {
    var err error
    var sub *nats.Subscription

    nc, err := nats.Connect("nats://127.0.0.1:4221")
    if err != nil {
        panic(err)
    }

    js, err := nc.JetStream()
    if err != nil {
        panic(err)
    }

    _, err = js.AddConsumer(streamName, &nats.ConsumerConfig{
        Name:           consumerName,
        Durable:        consumerName,
        DeliverSubject: nc.NewInbox(),
        AckWait:        time.Second,
        AckPolicy:      nats.AckAllPolicy,
        DeliverPolicy:  nats.DeliverNewPolicy,
        ReplayPolicy:   nats.ReplayInstantPolicy,
        FlowControl:    true,
        MaxAckPending:  1,
        MaxDeliver:     1,
        Heartbeat:      3 * time.Second,
        FilterSubject:  streamSubj,
    })
    if err != nil {
        panic(err)
    }

    go func() {
        for range ch {
        }
    }()

    for i := 0; ; i++ {
        if sub, err = js.ChanSubscribe(streamSubj, ch, nats.Bind(streamName, consumerName)); err != nil {
            fmt.Println(i)
            panic(err)
        }
        if err = sub.Unsubscribe(); err != nil {
            panic(err)
        }
    }
}
hofit-cylus commented 5 months ago

It happens to me as well. Also I'm not sure that re-subscribing too fast is the only factor, since my code retries for 5 seconds. Has anyone else encountered this issue?

piotrpio commented 5 months ago

Hello @sylr, @hofit-cylus. Sorry it took so long - I am looking at the issue now and will let you know when I have a solution.

Walms commented 1 month ago

Did you have any luck finding a solution?

piotrpio commented 1 month ago

The issue stems from the fact that the server asynchronously tracks bound interest, so at the moment of asking for consumer info in Subscribe() it may not yet be updated - so after unsubscribing and re-subscribing, there is a possibility of race which the client can do little about. Changing this behavior on the server would make checking for consumer info heavier (and it is already a resource-heave operation)

As a workaround it is possible to use the nats.SkipConsumerLookup() option in subscribe which makes the client skip explicitly calling consumer info to verify whether the consumer name in Bind() exists.