emitter-io / go

Go/Golang client for emitter
Eclipse Public License 1.0
68 stars 37 forks source link

Reconnection fails #17

Open kamermans opened 5 years ago

kamermans commented 5 years ago

Hi, I've been trying to use emitter for 4 months now, but am constantly plagued with reconnection issues. Basically, whenever there is a network interruption, the Go client seems to notice the problem, but it fails to reconnect and doesn't panic, which leaves the client in a disconnected state.

This is happening in both the v1 and v2 code, and with both standalone emitter servers as well as clustered servers. The problem is much more common when running the emitter behind an AWS Application Load Balancer with a client TTL above 30 seconds (I suspect the AWS ALB is killing the connection more frequently in this case).

I previously found some issues in the dependency, paho.mqtt.golang, that related to autoreconnection, and so I implemented a OnConnectionLost() handler that tries to reconnect, but it also doesn't seem to help. It seems the only way to reliably reconnect is to panic the whole thing and let my process supervisor restart my application.

Is anyone else experiencing this problem, and/or is there something I can do to gather conclusive information about the problem?

I've tried the stable version, and also the latest master via my application's Gopkg.toml:

[[constraint]]
  branch = "master"
  # version = "2.0.4"
  name = "github.com/emitter-io/go"

[[override]]
  branch = "master"
  # version = "c606a4c5dacdea0598aed291be3120479c509e43"
  name = "github.com/eclipse/paho.mqtt.golang"

Thanks!

MichaelHipp commented 5 years ago

Yes. And the worst part is sometimes it indicates it is connected but won't receive any messages.

kamermans commented 5 years ago

Yes, I have seen this condition as well, typically after an attempted reconnection, the status seems to indicate that I am connected, even though no messages are received.

kamermans commented 5 years ago

With paho AutoReconnect turned on, I can see it attempting to reconnect while the connection is interrupted:

[pinger]   ping check 1
[pinger]   ping check 3
[pinger]   ping check 5
[pinger]   keepalive sending ping
[pinger]   ping check 2
[pinger]   ping check 4
[pinger]   ping check 6
[pinger]   ping check 8
[pinger]   ping check 10
[pinger]   ping check 12
[net]      error triggered, stopping
[net]      outgoing stopped
[net]      incoming stopped
[msgids]   cleaned up
2019/04/09 03:23:21 emitter: connection lost, due to pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   websocket.Dial wss://steve-test:1443: dial tcp 172.16.20.31:1443: i/o timeout
[client]   Reconnect failed, sleeping for 1 seconds
[client]   about to write new connect msg
[client]   websocket.Dial wss://steve-test:1443: dial tcp 172.16.20.31:1443: i/o timeout
[client]   Reconnect failed, sleeping for 2 seconds
[client]   about to write new connect msg
[client]   websocket.Dial wss://steve-test:1443: dial tcp 172.16.20.31:1443: i/o timeout
[client]   Reconnect failed, sleeping for 4 seconds
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      incoming started
2019/04/09 03:23:58 Emitter attempting a connection...
[pinger]   keepalive starting
[net]      logic started
[net]      logic waiting for msg on ibound
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[pinger]   ping check 2
[pinger]   ping check 4
[pinger]   ping check 6
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   ping check 2
[pinger]   ping check 4
[pinger]   ping check 6
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp

Ultimately, it does seem to reconnect at the MQTT level, but it doesn't appear to be resubscribing to the previously-subscribed channels. I imagine there is something that needs to happen in the emitter Go client, but I really don't know enough about the underlying code yet.

kamermans commented 5 years ago

[CC @tiddnet @mudhoney @paulborile]

I've built a self-contained example of the problem: https://github.com/kamermans/emitter-reconnect-issue

To reproduce my issue, clone the repo above, then run docker-compose build and docker-compose up.

It will start a stack including:

Once it's started, give it about 20 seconds to stabilize, so you can get a feel for the output (sender client sending message to the channel, receiver client printing it, pings and keepalives throughout, server chatting about client status, etc).

Now, run this command in a separate window to stop the server:

docker exec -ti emitter-test-server supervisorctl stop emitter-test-server

Wait a few seconds, and start it again:

docker exec -ti emitter-test-server supervisorctl start emitter-test-server

You will see both clients complaining about the disconnection and reconnecting, however, after auto-reconnection, the receiver client doesn't display any more messages - it's as if it is no longer subscribed to the channel. This is the problem I'm reporting.

kamermans commented 5 years ago

Ok, so I have made some progress here. When the client loses a connection, then auto-reconnects, you need to resubscribe to all the channels again. Using the OnConnect() handler is working for me in my limited testing:

emitterClient.OnConnect(func(c *emitter.Client) {
    log.Println("OnConnect triggered, subscribing...")

    // Subscribe to the channel(s)
    emitterClient.Subscribe(channelKey, channel, func(client *emitter.Client, msg emitter.Message) {
        log.Printf("Message received: [%s] %s", msg.Topic(), msg.Payload())
    })
})

You should OnConnect() before you connect the first time, otherwise it will miss the initial connection. I suspect this is causing the problem that @MichaelHipp noted (no messages after reconnection).

If OnConnect() seems reliable, I suppose we could update this library to track the active channel subscriptions and resubscribe them if the connection is lost and then regained.

kamermans commented 5 years ago

I've found a discussion about this problem on the eclipse paho MQTT client project: https://github.com/eclipse/paho.mqtt.golang/issues/22

TL;DR: set CleanSession(false) and the client will resubscribe you when you reconnect. https://godoc.org/github.com/eclipse/paho.mqtt.golang#ClientOptions.SetCleanSession

I'll change this to the default and add an Emitter option in a PR.

joeblew99 commented 5 years ago

seems that eclipse/paho.mqtt.golang#22 is fixed and closed.

Has this fixed the reconnection issue for Emitter though ?

kamermans commented 5 years ago

The issue is still happening, even after using this auto-resubscribe code:

func ConnectionLostHandler(client *emitter.Client, reason error) {
    log.Println("Emitter connection lost, reconnecting.  Reason: ", reason.Error())

    emitterClient.Disconnect(100 * time.Millisecond)
    if connErr := emitterClient.Connect(); connErr != nil {
        log.Fatalf("Error on Client.Connect(): %v", connErr)
    }
}

emitterClient.OnConnect(func(client *emitter.Client) {
    // Subscribe to the channel
    if subErr := emitterClient.Subscribe(channelKey, channel, nil); subErr != nil {
        log.Fatalf("Error on Client.Subscribe(): %v", subErr)
    }
    log.Printf("Subscribed to channel '%s'", channel)
    ConnectHandler(client)
})
emitterClient.OnDisconnect(ConnectionLostHandler)

I am still seeing output like this in my logs:

2019/07/04 17:41:44 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/04 17:41:45 Subscribed to channel 'somechannelname'
2019/07/04 17:41:45 Emitter attempting a connection...
2019/07/04 17:42:39 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/04 17:42:39 Subscribed to channel 'somechannelname'
2019/07/04 17:42:39 Emitter attempting a connection...
2019/07/04 17:49:57 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/04 17:49:57 Subscribed to channel 'somechannelname'
2019/07/04 17:49:57 Emitter attempting a connection...
2019/07/04 18:20:09 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/04 18:20:09 Subscribed to channel 'somechannelname'
2019/07/04 18:20:09 Emitter attempting a connection...
2019/07/04 18:21:03 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/04 18:21:03 Subscribed to channel 'somechannelname'
2019/07/04 18:21:03 Emitter attempting a connection...
2019/07/04 18:24:45 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/04 18:24:45 Subscribed to channel 'somechannelname'

I have 48 servers running the emitter client and 5-10 of them randomly get stuck in this condition. The only fix is for me to restart the application completely. I am running 3 emitter servers in different continents with latency-based DNS routing, so each client uses the nearest/fastest server.

I wonder if my emitterClient.Disconnect(100 * time.Millisecond) in the ConnectionLostHandler is in a race condition with the next line, emitterClient.Connect(), or something like that.

Any ideas? At this point, I'm seriously considering scrapping emitter for this project and moving to something else, but I really like the simplicity, channel authorization model and native go code.

kelindar commented 5 years ago

I'll have a look over the weekend to see if we can reproduce this issue.

kamermans commented 5 years ago

Thanks! I'm also running a couple versions of it on a separate machine that will slack me if it disconnects :). I've been using the master branch from paho via a go.mod replace but it seems to be the same.

Florimond commented 5 years ago

I have a client on one machine, that just runs a loop incrementing ´i´ and publishing it.

The other client, on another machine, has this code:

package main

import (
    "fmt"

    emitter "github.com/emitter-io/go/v2"
)

func main() {
    key := "zG0gBzTYXKYDLRA0n-O6cU2J5pbabnl_"
    channel := "test/"
    done := make(chan bool)

    c := emitter.NewClient(
        emitter.WithBrokers("tcp://192.168.0.10:8080"),
        emitter.WithAutoReconnect(true))

    c.OnError(func(_ *emitter.Client, err emitter.Error) {
        fmt.Println(err.Message)
    })

    c.OnDisconnect(func(client *emitter.Client, e error) {
        fmt.Println("######### Disconnected: ", e.Error())
    })

    c.OnConnect(func(_ *emitter.Client) {
        fmt.Println("######### Connected.")
        c.Subscribe(key, channel, func(_ *emitter.Client, msg emitter.Message) {
            fmt.Println(string(msg.Payload()))
        })
    })

    c.Connect()
    <-done
}

Naively, what I do is that I disconnect the Wifi for 30 seconds or more, then I reconnect it. The output looks like this:

3090
3091
3092
3093
######### Disconnected:  pingresp not received, disconnecting
######### Connected.
3250
3251
3252
3253
3254
kamermans commented 5 years ago

Thanks for checking into it @Florimond - I constructed a similar test, which ran successfully for several weeks and reconnected correctly after disconnects (which I introduced with iptables). In my scenario, nearly all servers are listening to a channel indefinitely. When a message arrives, they perform a task internally and send a confirmation back on a different channel. Some of the clients disconnect and don't reconnect, but for whatever reason it's not 100% reproducible - there is some element of chance it seems.

Is there some sort of verbose logging I can enable on the client to help diagnose the issue?

kelindar commented 5 years ago

It may seem it's a paho MQTT issue: https://github.com/eclipse/paho.mqtt.golang/issues/328

kamermans commented 5 years ago

Interesting, this certainly looks related, thanks!

kamermans commented 5 years ago

I think I've found the issue that's been haunting me, and it's with my code:

func ConnectionLostHandler(client *emitter.Client, reason error) {
    log.Println("Emitter connection lost, reconnecting.  Reason: ", reason.Error())

    emitterClient.Disconnect(100 * time.Millisecond)
    if connErr := emitterClient.Connect(); connErr != nil {
        log.Fatalf("Error on Client.Connect(): %v", connErr)
    }
}

The statement emitterClient.Disconnect(100 * time.Millisecond) is there to try to ensure that I am completely disconnected before I try to reconnect, but by enabling logging in the paho client I can see that this is giving me a warning: Disconnect() called but not connected (disconnected/reconnecting).

It turns out that calling Disconnect() in the emitter.OnDisconnect() handler puts the paho client in a weird state. This seems to be timing-related, but I am able to reproduce it locally.

When in this weird state (lost connection, then called Disconnect() before Connect()), I see these errors from the client every time a new message arrives on the channel that it's subscribed to:

2019/07/19 13:40:57 [pinger]   pingresp not received, disconnecting
2019/07/19 13:40:57 [net]      error triggered, stopping
2019/07/19 13:40:57 [net]      incoming dropped a received message during shutdown
2019/07/19 13:40:57 [net]      logic stopped
2019/07/19 13:40:57 Emitter connection lost, reconnecting.  Reason:  pingresp not received, disconnecting
2019/07/19 13:40:57 [client]   Disconnect() called but not connected (disconnected/reconnecting)

I suspect the message pending on the channel is misinterpreted as a pingresp or something like that.

I am going to update all of my servers to ensure that this solves the problem. Thanks for hanging in there with me!

For other users, this is how you enable verbose logging of the paho client:

import mqtt "github.com/eclipse/paho.mqtt.golang"

func init() {
    logger := log.New(os.Stderr, "", log.LstdFlags)
    mqtt.ERROR = logger
    mqtt.CRITICAL = logger
    mqtt.WARN = logger
    mqtt.DEBUG = logger
}