nats-io / stan.go

NATS Streaming System
https://nats.io
Apache License 2.0
706 stars 117 forks source link

Using Connection Lost Handler Correctly #330

Closed cemremengu closed 4 years ago

cemremengu commented 4 years ago

As far as I understand, we have to use SetConnectionLostHandler for reconnections/resubs when connection is lost since it is not automatic. What is a good way to achieve this since it feels like it is a bit recursive to do the following:

sc, err = stan.Connect(
    opts.Cluster,
    opts.Cid,
    stan.Pings(10, 5),
    stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
        sc, err = stan.Connect(
            opts.Cluster,
            opts.Cid,
            stan.Pings(10, 5),
            stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
                nc, err := stan.Connect(...... keeps going)

            }), stan.MaxPubAcksInflight(300))

    }), stan.MaxPubAcksInflight(300))

Could you please give some hints?

kozlovic commented 4 years ago

@cemremengu First, you may want to set the stan.Pings() to a setting that would allow for the maximum server downtime or network partition that you want your application to recover from.

I have set the default stan.Pings() values way too aggressive. This was an ill attempt to make new users realize quickly that their applications are not going to be able to publish or receive messages after restarting a NATS Streaming with memory store (since no state is recovered). The side effect is that for experimented users that know that and use a streaming server with persistent store, the default pings cause the client to give up too quickly should the server take a bit of time to restart.

Regardless, here is an example of very crude way of connecting, and reconnecting. I would first create an object that holds your specific configuration.

type conn struct {
    sync.RWMutex
    Cluster string
    Cid     string
    sc      stan.Conn
}

Then this could have a connect() API as such:

func (c *conn) connect(opts ...stan.Option) {
    var sc stan.Conn
    var err error
    for i := 0; i < 10; i++ {
        log.Printf("Connecting, attempt: %v", i+1)
        sc, err = stan.Connect(c.Cluster, c.Cid, opts...)
        if err == nil {
            break
        }
        log.Printf("Error connecting: %v", err)
        time.Sleep(2 * time.Second)
    }
    if sc == nil {
        log.Fatal("Unable to connect after repeated attempts")
    }
    log.Printf("Connected to clusterID: [%s] clientID: [%s]", c.Cluster, c.Cid)
    c.Lock()
    c.sc = sc
    c.Unlock()
}

You would initially call all this like this:

    c := &conn{
        Cluster: "test-cluster",
        Cid:     "client",
    }

    var sopts []stan.Option
    sopts = append(sopts, stan.Pings(10, 5))
    sopts = append(sopts, stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
        log.Printf("Lost connection, will attempt to recreate it")
        c.connect(sopts...)
    }))

    c.connect(sopts...)
        sc := c.getConn()

Note that since the connection sc changes after a reconnect, everywhere in your code where you used sc you would likely want to get a function that returns it:

func (c *conn) getConn() stan.Conn {
    c.RLock()
    sc := c.sc
    c.RUnlock()
    return sc
}