nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

(clientID already registered) is it possible to reconnect like this ? #334

Closed artem-webdev closed 3 years ago

artem-webdev commented 3 years ago

I apologize for closing this question - https://github.com/nats-io/stan.go/issues/333

having understood a little in the code, I realized that the problem can be bypassed by reconnecting to nats Do you think it is safe? here is an example solution



import (
    "fmt"
    "github.com/nats-io/nats.go"
    "github.com/nats-io/stan.go"
    "log"
    "strings"
    "sync"
)

// create global store
var MutexStoreNatsConnect sync.RWMutex
var StoreNatsConnect = make(map[string]*nats.Conn)

// getter store
func GetStoreNatsConnects(key string) (nc *nats.Conn, err error) {

    defer MutexStoreNatsConnect.RUnlock()
    MutexStoreNatsConnect.RLock()

    var ok bool
    if nc, ok = StoreNatsConnect[key]; ok {
        return nc, err
    }

    return nc, fmt.Errorf("nats connect with clientId %s not found \r\n", key)

}

// setter store
func SetStoreNatsConnects(key string, nc *nats.Conn) {
    MutexStoreNatsConnect.Lock()
    StoreNatsConnect[key] = nc
    MutexStoreNatsConnect.Unlock()
}

const (
    StanServers                          = "localhost:4221,localhost:4222,localhost:4223"
    StanClusterId                        = "queue-messages"
    ErrorValueClientIdAlreadyRegistered = "clientID already registered"
)

// get new object stan connect
func NewStan(clientId string) (stan.Conn, error) {

    return stan.Connect(
        StanClusterId,
        clientId,
        stan.NatsURL(StanServers),
        stan.Pings(5, 10),
        stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
            log.Fatal(reason.Error())

        }),
    )

}

// wrap stan connect
func GetConnectStan(clientId string) (stan.Conn, error) {

    conn, err := NewStan(clientId)

    // we catch the error and try to reconnect the connection nats
    if err != nil {
        switch {
        case strings.Contains(err.Error(), ErrorValueClientIdAlreadyRegistered):

            nc, errGetter := GetStoreNatsConnects(clientId)
            if errGetter != nil {
                return conn, fmt.Errorf(errGetter.Error(), err.Error())
            }
            if nc != nil && nc.IsConnected() {
                nc.Close()
            }
            conn, err := NewStan(clientId)
            // replace connect in store nats
            if conn != nil {
                SetStoreNatsConnects(clientId, conn.NatsConn())
            }
            return conn, err

        default:
            return conn, err
        }
    }

    // add store nats connect
    if conn != nil {
        SetStoreNatsConnects(clientId, conn.NatsConn())
    }

    return conn, err

}

type ServiceProxy struct {
    StanConn stan.Conn
}

func main() {

    proxy := ServiceProxy{}
    var err error
    proxy.StanConn, err = GetConnectStan("test-client-1")
    if err != nil {
        log.Fatal(err.Error(), proxy.StanConn)
    }

    //for some unknown reason, the pointer became nil
    proxy.StanConn = nil

    // ok connected 
    if proxy.StanConn == nil {
        proxy.StanConn, err = GetConnectStan("test-client-1")
        if err != nil {
            log.Fatal(err.Error(), proxy.StanConn)
        }
    }

}
kozlovic commented 3 years ago

I think we are having a communication problem here. I have repeated many times that the connection that you lost the pointer to is still valid! There is no way you will be able to create a new connection with the same client ID. The server will reject the new one because it can contact and get a response from the "old" one.

Instead of doing all that you do above, why you don't simply store in the sync map the actual STAN (not NATS) connection the first time it gets created? The "key" would be the client ID. Then, the proxy.StanConn == nil issue disappears because when you need access to the connection you will look it up from the sync map.

I don't know enough, obviously, of the actual application to see how easy that would be, but I believe that you should attack the issue at the root (losing reference to the pointer) instead of trying to patch it this way. As I said many times, with the way NATS Streaming client and server are designed, trying to create a new connection with same client ID while one is still valid will not work.

artem-webdev commented 3 years ago

I probably agree with you. Thanks for the help !