eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

client loses connection to broker (on Raspberry Pi 2) #32

Closed jinie closed 8 years ago

jinie commented 8 years ago

I'm seeing a weird issue with the client, which only happens when running on a Raspberry Pi 2.

ratatosk ➜  ~  go version
go version go1.3.3 linux/arm

I'm using the following code to connect to the broker

url := fmt.Sprintf("tcp://%s:%d", host, port)
    opts := MQTT.NewClientOptions().AddBroker(url)
    opts.SetDefaultPublishHandler(OnMessage)
    opts.SetConnectionLostHandler(ConnectionLost)

    logger.Debug("Connecting to ", url)
    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    logger.Debug("Subscribing to ", topic)
    //subscribe to the topic /go-mqtt/sample and request messages to be delivered
    //at a maximum qos of zero, wait for the receipt to confirm the subscription
    if token := c.Subscribe(topic, 2, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

And when running on my MacBook, it runs just fine, but when running on my RPi2 it will disconnect from the broker every 30-60 seconds, and while it seems to reconnect (per the logs), it never receives anything, it simply just "hangs".

Here's a debug trace

[net]      Received Message
[net]      logic got msg on ibound
[net]      received pubrel, id: 33
[net]      logic waiting for msg on ibound
[net]      obound priority msg to write, type *packets.PubcompPacket
[net]      outgoing waiting for an outbound message
13:58:49.209 UpdateLastUp ▶ DEBUG  Updating last update for  fenrir : 28-0014550
75dff : 2016-02-24 13:58:49.110356 +0000 UTC
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[net]      outgoing stopped
[net]      incoming stopped
13:59:29.146 ConnectionLo ▶ ERROR  Connection Lost: pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[net]      logic started
[net]      logic waiting for msg on ibound
[pinger]   keepalive starting
[pinger]   keepalive sending ping
[net]      incoming started
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      outgoing stopped
[net]      incoming stopped
13:59:49.158 ConnectionLo ▶ ERROR  Connection Lost: pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[pinger]   keepalive starting
[net]      incoming started
[net]      logic started
[net]      logic waiting for msg on ibound
[net]      outgoing stopped
[net]      incoming stopped
13:59:59.158 ConnectionLo ▶ ERROR  Connection Lost: pingresp not received, disconnecting
[client]   enter reconnect
[client]   about to write new connect msg
[client]   socket connected to broker
[client]   Using MQTT 3.1.1 protocol
[net]      connect started
[net]      received connack
[client]   client is reconnected
[net]      outgoing started
[net]      outgoing waiting for an outbound message
[pinger]   keepalive starting
[net]      incoming started
[net]      logic started
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on ibound
[pinger]   keepalive sending ping
[net]      Received Message
[net]      logic got msg on ibound
[net]      received pingresp
[net]      logic waiting for msg on bound
jinie commented 8 years ago

I just tried a build with go-1.6.0, same results.

alsm commented 8 years ago

Interesting, I don't have an RPi2 to hand, it'll take me a couple of days to get one and have a look at this.

jinie commented 8 years ago

Most if the time it will trigger within the first minute or so, and other times it runs for hours before triggering. Also, after the initial reconnect, it stops receiving messages. The application monitors temperature around the house, and receives 4-8 readings every minute, and despite the appearance of being connected, nothing is received.

The broker is Mosquitto, running on the same host, and connection url is tcp://localhost:1883

jinie commented 8 years ago

Turns out it also happens on my MacBook, after running 3-4 days straight, i finally see the same bug, which indicates to me that it's likely a timer issue - given the much more limited resources on the RPi2 vs my "yesteryear" MacBook.

mezzato commented 8 years ago

Hi, I had a similar issue on my Raspberry, which I manged to fix by changing the way Client.lastContact is updated. The issue is related to the KeepAlive ping not being sent and thus to the broker disconnecting the client.

According to: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349238

The issue was in the method func alllogic(c *Client) in net.go. The instruction c.lastContact.update() is fired for each received message regardless of the type, which in turn causes the Client.lastContact to be updated too often and a new ping message is not sent.

Only control packets sent (like a ping message) count for the keepalive logic, not any received package, so for keepalive purposes the only significant inbound message is a ping response. I moved c.lastContact.update() inside the case *packets.PingrespPacket and the broker does not disconnect the client anymore.

I can provide a pull request (code details) if needed.

alsm commented 8 years ago

There was an update I made not too long ago to the ping functions, are you running the latest code from here? Be interesting if it's still not fixed as it resolved the reported issues at the time

mezzato commented 8 years ago

Hi,

I have not tested the latest master version, so it might have been fixed by now, in this case I apologise.

jinie commented 8 years ago

I am running the latest version (downloaded last week).

Frej ➜  paho.mqtt.golang git:(master) git status
On branch master
Your branch is up-to-date with 'origin/master'.
nothing to commit, working directory clean
Frej ➜  paho.mqtt.golang git:(master) git pull
Already up-to-date.
jinie commented 8 years ago

The way I see it, there may actually be two bugs. 1) client loses connection on Raspberry Pi 2 2) reconnect fails to resubscribe (or similar)

I've been thinking if 1 could be related to the Pi's lack of a RTC. If so, it would be a bug in Go itself.

mezzato commented 8 years ago

If it is of any help I confirm that the code is working on a Raspberry Pi model B+ with version:

https://github.com/eclipse/paho.mqtt.golang/commit/dbd8d5c40a582eb9adacde36b47932b3a3ad0034

and the fix I previously mentioned. Go version is 1.6.

jinie commented 8 years ago

I've been playing around with this some more. It happens within 5-60 minutes on a RPi2 and RPi3 running Raspbian-Jessie (Lite), but I can't seem to trigger it on a RPi2 running FreeBSD 11, so perhaps it could be a timer issue in Go related to Linux ? I've seen it fail to resubscribe/reconnect once on a retina macbook pro (with running time over a week).

All clients are connected to the same mosquito broker instance.

alsm commented 8 years ago

Something I've just realised, the default property for CleanSession is true, this means every time you connect the session is considered to be a brand new session, subscriptions you made to the server previously will not be maintained. But at the same time the default for autoreconnect is also true. In combination this means that if you connect, subscribe then lose your connection, when the autoreconnect kicks in and reconnects you the server has thrown away your subscriptions. So that behaviour is expected, you would want to set cleansession to false, or do your subscribing in an OnConnectHandler. This still doesn't explain the ping issues though.

jinie commented 8 years ago

@alsm When i explicitly set CleanSession(false), it fails with

16:24:07.372    SetupMQTT ▶ DEBUG  Connecting to  tcp://mqtt.thoughtcrime.dk:1883
panic: Identifier rejected

But it does appear to reconnect and resubscribe when i set the clientID. I was under the impression that if i don't set the clientId, one would be generated at runtime ?

alsm commented 8 years ago

Not quite, there is a facility in MQTT 3.1.1 that if you connect with no client id the server can internally assign you one, but that is only valid for cleansession true connections. The client library never autogenerates a client id for you.

ash-owl commented 8 years ago

There's a bug in the keepalive mechanism: a race between resetting pingRespTimer in ping.go and stopping it in net.go. I have a patch but I haven't polished it yet.

ash-owl commented 8 years ago

If you change the KeepAlive value to say 1*time.Second you should see the connection drop faster -- the default is 30 seconds.

brocaar commented 8 years ago

I'm experiencing the same behaviour where the client looses the connection with the mqtt due to pingresp not received errors. See https://github.com/brocaar/loraserver/issues/5 for some logging output of both the application and mosquitto.

Environment: Digitalocean Ubuntu 15.10 machine, Mosquitto 1.4.3, Go 1.6.

@ash-lshift are able to share your patch?

brocaar commented 8 years ago

I did some more testing and I'm able to reproduce the issue within 10-15 seconds when setting KeepAlive to 5 seconds (Go 1.6, OS X, Mosquitto 1.4.8 running on same machine).

Script:

package main

import (
    "log"
    "os"
    "os/signal"
    "syscall"
    "time"

    "github.com/codegangsta/cli"
    "github.com/eclipse/paho.mqtt.golang"
)

func init() {
    mqtt.DEBUG = log.New(os.Stderr, "DEBUG - ", log.LstdFlags)
    mqtt.CRITICAL = log.New(os.Stderr, "CRITICAL - ", log.LstdFlags)
    mqtt.WARN = log.New(os.Stderr, "WARN - ", log.LstdFlags)
    mqtt.ERROR = log.New(os.Stderr, "ERROR - ", log.LstdFlags)

}

func run(c *cli.Context) {
    opts := mqtt.NewClientOptions()
    opts.AddBroker(c.String("mqtt-server"))
    opts.SetUsername(c.String("mqtt-username"))
    opts.SetPassword(c.String("mqtt-password"))
    opts.SetOnConnectHandler(onConnected)
    opts.SetConnectionLostHandler(onConnectionLost)
    opts.SetKeepAlive(time.Second * 5)

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        log.Fatal(token.Error())
    }

    sigChan := make(chan os.Signal)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    log.Println(<-sigChan)
}

func onConnected(c mqtt.Client) {
    log.Println("onConnected was called")
    if token := c.Subscribe("application/+/node/+/rx", 0, func(c mqtt.Client, msg mqtt.Message) {
        log.Printf("topic: %s, payload: %s", msg.Topic(), msg.Payload())
    }); token.Wait() && token.Error() != nil {
        log.Fatal(token.Error())
    }
}

func onConnectionLost(c mqtt.Client, err error) {
    log.Printf("onConnectionLost was called with error: %s", err)
}

func main() {
    app := cli.NewApp()
    app.Name = "loratestapp"
    app.Usage = "test application to test the loraserver"
    app.Action = run
    app.Flags = []cli.Flag{
        cli.StringFlag{
            Name:   "mqtt-server",
            Usage:  "MQTT server",
            Value:  "tcp://localhost:1883",
            EnvVar: "MQTT_SERVER",
        },
        cli.StringFlag{
            Name:   "mqtt-username",
            Usage:  "MQTT username",
            EnvVar: "MQTT_USERNAME",
        },
        cli.StringFlag{
            Name:   "mqtt-password",
            Usage:  "MQTT password",
            EnvVar: "MQTT_PASSWORD",
        },
    }
    app.Run(os.Args)
}

Log output

DEBUG - 2016/04/02 22:08:19 [client]   Connect()
DEBUG - 2016/04/02 22:08:19 [client]   about to write new connect msg
DEBUG - 2016/04/02 22:08:19 [client]   socket connected to broker
DEBUG - 2016/04/02 22:08:19 [client]   Using MQTT 3.1.1 protocol
DEBUG - 2016/04/02 22:08:19 [net]      connect started
DEBUG - 2016/04/02 22:08:19 [net]      received connack
DEBUG - 2016/04/02 22:08:19 [store]    memorystore initialized
DEBUG - 2016/04/02 22:08:19 [client]   client is connected
WARN - 2016/04/02 22:08:19 [store]    memorystore wiped
DEBUG - 2016/04/02 22:08:19 [client]   exit startClient
DEBUG - 2016/04/02 22:08:19 [net]      outgoing started
DEBUG - 2016/04/02 22:08:19 [net]      outgoing waiting for an outbound message
2016/04/02 22:08:19 onConnected was called
DEBUG - 2016/04/02 22:08:19 [net]      logic started
DEBUG - 2016/04/02 22:08:19 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:19 [pinger]   keepalive starting
DEBUG - 2016/04/02 22:08:19 [net]      incoming started
DEBUG - 2016/04/02 22:08:19 [client]   enter Subscribe
DEBUG - 2016/04/02 22:08:19 [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0MessageID: 0 topics: [application/+/node/+/rx]
DEBUG - 2016/04/02 22:08:19 [client]   exit Subscribe
DEBUG - 2016/04/02 22:08:19 [net]      obound priority msg to write, type *packets.SubscribePacket
DEBUG - 2016/04/02 22:08:19 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:08:19 [net]      Received Message
DEBUG - 2016/04/02 22:08:19 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:19 [net]      received suback, id: 1
DEBUG - 2016/04/02 22:08:19 [net]      granted qoss [0]
DEBUG - 2016/04/02 22:08:19 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:24 [pinger]   keepalive sending ping
DEBUG - 2016/04/02 22:08:24 [net]      Received Message
DEBUG - 2016/04/02 22:08:24 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:24 [net]      received pingresp
DEBUG - 2016/04/02 22:08:24 [net]      logic waiting for msg on ibound
ERROR - 2016/04/02 22:08:31 [net]      incoming stopped with error
ERROR - 2016/04/02 22:08:31 [net]      logic got error
DEBUG - 2016/04/02 22:08:31 [net]      outgoing stopped
DEBUG - 2016/04/02 22:08:31 [pinger]   keepalive stopped
DEBUG - 2016/04/02 22:08:31 [client]   enter reconnect
DEBUG - 2016/04/02 22:08:31 [client]   about to write new connect msg
2016/04/02 22:08:31 onConnectionLost was called with error: EOF
DEBUG - 2016/04/02 22:08:31 [client]   socket connected to broker
DEBUG - 2016/04/02 22:08:31 [client]   Using MQTT 3.1.1 protocol
DEBUG - 2016/04/02 22:08:31 [net]      connect started
DEBUG - 2016/04/02 22:08:31 [net]      received connack
DEBUG - 2016/04/02 22:08:31 [client]   client is reconnected
DEBUG - 2016/04/02 22:08:31 [net]      outgoing started
DEBUG - 2016/04/02 22:08:31 [net]      outgoing waiting for an outbound message
2016/04/02 22:08:31 onConnected was called
DEBUG - 2016/04/02 22:08:31 [net]      incoming started
DEBUG - 2016/04/02 22:08:31 [client]   enter Subscribe
DEBUG - 2016/04/02 22:08:31 [net]      logic started
DEBUG - 2016/04/02 22:08:31 [pinger]   keepalive starting
DEBUG - 2016/04/02 22:08:31 [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0MessageID: 0 topics: [application/+/node/+/rx]
DEBUG - 2016/04/02 22:08:31 [client]   exit Subscribe
DEBUG - 2016/04/02 22:08:31 [net]      obound priority msg to write, type *packets.SubscribePacket
DEBUG - 2016/04/02 22:08:31 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:08:31 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:31 [net]      Received Message
DEBUG - 2016/04/02 22:08:31 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:31 [net]      received suback, id: 1
DEBUG - 2016/04/02 22:08:31 [net]      granted qoss [0]
DEBUG - 2016/04/02 22:08:31 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:36 [pinger]   keepalive sending ping
DEBUG - 2016/04/02 22:08:36 [net]      Received Message
DEBUG - 2016/04/02 22:08:36 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:36 [net]      received pingresp
DEBUG - 2016/04/02 22:08:36 [net]      logic waiting for msg on ibound
ERROR - 2016/04/02 22:08:43 [net]      incoming stopped with error
ERROR - 2016/04/02 22:08:43 [net]      logic got error
DEBUG - 2016/04/02 22:08:43 [net]      outgoing stopped
DEBUG - 2016/04/02 22:08:43 [pinger]   keepalive stopped
DEBUG - 2016/04/02 22:08:43 [client]   enter reconnect
DEBUG - 2016/04/02 22:08:43 [client]   about to write new connect msg
2016/04/02 22:08:43 onConnectionLost was called with error: EOF
DEBUG - 2016/04/02 22:08:43 [client]   socket connected to broker
DEBUG - 2016/04/02 22:08:43 [client]   Using MQTT 3.1.1 protocol
DEBUG - 2016/04/02 22:08:43 [net]      connect started
DEBUG - 2016/04/02 22:08:43 [net]      received connack
DEBUG - 2016/04/02 22:08:43 [client]   client is reconnected
DEBUG - 2016/04/02 22:08:43 [net]      outgoing started
DEBUG - 2016/04/02 22:08:43 [net]      outgoing waiting for an outbound message
2016/04/02 22:08:43 onConnected was called
DEBUG - 2016/04/02 22:08:43 [net]      incoming started
DEBUG - 2016/04/02 22:08:43 [pinger]   keepalive starting
DEBUG - 2016/04/02 22:08:43 [net]      logic started
DEBUG - 2016/04/02 22:08:43 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:43 [client]   enter Subscribe
DEBUG - 2016/04/02 22:08:43 [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0MessageID: 0 topics: [application/+/node/+/rx]
DEBUG - 2016/04/02 22:08:43 [client]   exit Subscribe
DEBUG - 2016/04/02 22:08:43 [net]      obound priority msg to write, type *packets.SubscribePacket
DEBUG - 2016/04/02 22:08:43 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:08:43 [net]      Received Message
DEBUG - 2016/04/02 22:08:43 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:43 [net]      received suback, id: 1
DEBUG - 2016/04/02 22:08:43 [net]      granted qoss [0]
DEBUG - 2016/04/02 22:08:43 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:48 [pinger]   keepalive sending ping
DEBUG - 2016/04/02 22:08:48 [net]      Received Message
DEBUG - 2016/04/02 22:08:48 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:48 [net]      received pingresp
DEBUG - 2016/04/02 22:08:48 [net]      logic waiting for msg on ibound
ERROR - 2016/04/02 22:08:55 [net]      incoming stopped with error
ERROR - 2016/04/02 22:08:55 [net]      logic got error
DEBUG - 2016/04/02 22:08:55 [net]      outgoing stopped
DEBUG - 2016/04/02 22:08:55 [pinger]   keepalive stopped
DEBUG - 2016/04/02 22:08:55 [client]   enter reconnect
DEBUG - 2016/04/02 22:08:55 [client]   about to write new connect msg
2016/04/02 22:08:55 onConnectionLost was called with error: EOF
DEBUG - 2016/04/02 22:08:55 [client]   socket connected to broker
DEBUG - 2016/04/02 22:08:55 [client]   Using MQTT 3.1.1 protocol
DEBUG - 2016/04/02 22:08:55 [net]      connect started
DEBUG - 2016/04/02 22:08:55 [net]      received connack
DEBUG - 2016/04/02 22:08:55 [client]   client is reconnected
DEBUG - 2016/04/02 22:08:55 [net]      outgoing started
2016/04/02 22:08:55 onConnected was called
DEBUG - 2016/04/02 22:08:55 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:08:55 [net]      logic started
DEBUG - 2016/04/02 22:08:55 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:08:55 [net]      incoming started
DEBUG - 2016/04/02 22:08:55 [client]   enter Subscribe
DEBUG - 2016/04/02 22:08:55 [pinger]   keepalive starting
DEBUG - 2016/04/02 22:08:55 [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0MessageID: 0 topics: [application/+/node/+/rx]
DEBUG - 2016/04/02 22:08:55 [client]   exit Subscribe
DEBUG - 2016/04/02 22:08:55 [net]      obound priority msg to write, type *packets.SubscribePacket
DEBUG - 2016/04/02 22:08:55 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:08:55 [net]      Received Message
DEBUG - 2016/04/02 22:08:55 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:08:55 [net]      received suback, id: 1
DEBUG - 2016/04/02 22:08:55 [net]      granted qoss [0]
DEBUG - 2016/04/02 22:08:55 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:09:00 [pinger]   keepalive sending ping
DEBUG - 2016/04/02 22:09:00 [net]      Received Message
DEBUG - 2016/04/02 22:09:00 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:09:00 [net]      received pingresp
DEBUG - 2016/04/02 22:09:00 [net]      logic waiting for msg on ibound
ERROR - 2016/04/02 22:09:07 [net]      incoming stopped with error
ERROR - 2016/04/02 22:09:07 [net]      logic got error
DEBUG - 2016/04/02 22:09:07 [pinger]   keepalive stopped
DEBUG - 2016/04/02 22:09:07 [net]      outgoing stopped
DEBUG - 2016/04/02 22:09:07 [client]   enter reconnect
DEBUG - 2016/04/02 22:09:07 [client]   about to write new connect msg
2016/04/02 22:09:07 onConnectionLost was called with error: EOF
DEBUG - 2016/04/02 22:09:07 [client]   socket connected to broker
DEBUG - 2016/04/02 22:09:07 [client]   Using MQTT 3.1.1 protocol
DEBUG - 2016/04/02 22:09:07 [net]      connect started
DEBUG - 2016/04/02 22:09:07 [net]      received connack
DEBUG - 2016/04/02 22:09:07 [client]   client is reconnected
DEBUG - 2016/04/02 22:09:07 [net]      incoming started
DEBUG - 2016/04/02 22:09:07 [net]      outgoing started
2016/04/02 22:09:07 onConnected was called
DEBUG - 2016/04/02 22:09:07 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:09:07 [net]      logic started
DEBUG - 2016/04/02 22:09:07 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:09:07 [pinger]   keepalive starting
DEBUG - 2016/04/02 22:09:07 [client]   enter Subscribe
DEBUG - 2016/04/02 22:09:07 [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0MessageID: 0 topics: [application/+/node/+/rx]
DEBUG - 2016/04/02 22:09:07 [client]   exit Subscribe
DEBUG - 2016/04/02 22:09:07 [net]      obound priority msg to write, type *packets.SubscribePacket
DEBUG - 2016/04/02 22:09:07 [net]      outgoing waiting for an outbound message
DEBUG - 2016/04/02 22:09:07 [net]      Received Message
DEBUG - 2016/04/02 22:09:07 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:09:07 [net]      received suback, id: 1
DEBUG - 2016/04/02 22:09:07 [net]      granted qoss [0]
DEBUG - 2016/04/02 22:09:07 [net]      logic waiting for msg on ibound
DEBUG - 2016/04/02 22:09:12 [pinger]   keepalive sending ping
DEBUG - 2016/04/02 22:09:12 [net]      Received Message
DEBUG - 2016/04/02 22:09:12 [net]      logic got msg on ibound
DEBUG - 2016/04/02 22:09:12 [net]      received pingresp
DEBUG - 2016/04/02 22:09:12 [net]      logic waiting for msg on ibound
alsm commented 8 years ago

I've fixed this immediate problem in the latest commit, I was reseting the ping timer to the wrong value, pingTimeout instead of KeepAlive, so in any situation where pingTimeout was larger than KeepAlive it would fail to send a ping appropriately.

jinie commented 8 years ago

I'm still seeing the error. Running with cleansession now, and a generated client-id, and it still gets stuck in a

2016-04-15 19:19:32,stdout,"21:19:32.748 ConnectionLo â–¶ ERROR  Connection Lost: pingresp not received, disconnecting

loop

bobintornado commented 8 years ago

It happens to me, too, disconnecting 30~50 times a day.

brocaar commented 8 years ago

I can confirm the same issue... Sometimes it works perfect for a long time, sometimes it is disconnecting every ~ 1 min with the error pingresp not received, disconnecting.

weitzj commented 8 years ago

Same thing for me. Running in an Alpine 3.3 Docker container with Static compiled Golang 1.6.2 (CGO_ENABLED=0)

lazyeights commented 8 years ago

Checking the stack trace I see the code blocks at a WaitGroup.Wait() in internalConnLost at https://github.com/eclipse/paho.mqtt.golang/blob/master/client.go#L413

My guess is that incoming is not shutting down on the lost connection so it never calls c.workers.Done(), maybe due to a race condition where ReadPacket is successful but a disconnect is handled before the inbound channel write succeeds in this block:

    for {
        if cp, err = packets.ReadPacket(c.conn); err != nil {
            break
        }
        DEBUG.Println(NET, "Received Message")
        c.ibound <- cp
    }

The observed behavior is that it still reconnects, spins up, and keeps reading and writing ping keep alive packets on the timer, but the outbound message pump remains stalled indefinitely.

alsm commented 8 years ago

The last commit should, I hope, fix the ping issues. If someone with a Pi could try and let me know. Thanks.

jinie commented 8 years ago

While i'm not holding my breath, it does appear to have been running (and staying connected) for almost 10 hours. Running a couple of clients on old RPi B+ models, and a couple of servers on a newer RPi 3. It has however managed to keep running for days without timeouts before, so we'll see in a few days :)

jinie commented 8 years ago

The pingresp timeout issue appears to have been fixed, but now it no longer reconnects automatically. I call "SetAutoReconnect(true)", and i don't have a ConnectionLostHandler. I've tried with/without Clean Session set to true, but it just sits there. It gets an EOF when i force restart the broker, but nothing happens after that.

ash-owl commented 8 years ago

If anyone is interested, here is how I fixed this issue locally. Patch written against 617c801. (This patch is Copyright (c) 2016, LShift Ltd. opensource@lshift.net and offered to you under the EPL v1.0. If you decide you want to incorporate the patch I'll be happy to sign the CLA.)

diff --git a/client.go b/client.go
index 0aa6a55..dcb6a49 100644
--- a/client.go
+++ b/client.go
@@ -77,7 +77,7 @@ type Client struct {
    persist         Store
    options         ClientOptions
    pingTimer       *time.Timer
-   pingRespTimer   *time.Timer
+   pingReceived    chan interface{}
    status          connStatus
    workers         sync.WaitGroup
 }
@@ -220,8 +220,7 @@ func (c *Client) Connect() Token {
        c.errors = make(chan error)
        c.stop = make(chan struct{})
        c.pingTimer = time.NewTimer(c.options.KeepAlive)
-       c.pingRespTimer = time.NewTimer(time.Duration(10) * time.Second)
-       c.pingRespTimer.Stop()
+       c.pingReceived = make(chan interface{})

        c.incomingPubChan = make(chan *packets.PublishPacket, c.options.MessageChannelDepth)
        c.msgRouter.matchAndDispatch(c.incomingPubChan, c.options.Order, c)
diff --git a/net.go b/net.go
index 8da1854..2428050 100644
--- a/net.go
+++ b/net.go
@@ -178,8 +178,8 @@ func alllogic(c *Client) {
            switch msg.(type) {
            case *packets.PingrespPacket:
                DEBUG.Println(NET, "received pingresp")
-               c.pingRespTimer.Stop()
-               c.pingTimer.Reset(c.options.PingTimeout)
+               c.pingReceived <- msg
+               c.pingTimer.Reset(c.options.KeepAlive)
            case *packets.SubackPacket:
                sa := msg.(*packets.SubackPacket)
                DEBUG.Println(NET, "received suback, id:", sa.MessageID)
diff --git a/ping.go b/ping.go
index 62d502a..0ac0461 100644
--- a/ping.go
+++ b/ping.go
@@ -16,6 +16,7 @@ package mqtt

 import (
    "errors"
+   "time"

    "git.eclipse.org/gitroot/paho/org.eclipse.paho.mqtt.golang.git/packets"
 )
@@ -24,6 +25,7 @@ func keepalive(c *Client) {
    DEBUG.Println(PNG, "keepalive starting")

    for {
+       DEBUG.Println("waiting")
        select {
        case <-c.stop:
            DEBUG.Println(PNG, "keepalive stopped")
@@ -35,13 +37,23 @@ func keepalive(c *Client) {
            //We don't want to wait behind large messages being sent, the Write call
            //will block until it it able to send the packet.
            ping.Write(c.conn)
-           c.pingRespTimer.Reset(c.options.PingTimeout)
-       case <-c.pingRespTimer.C:
-           CRITICAL.Println(PNG, "pingresp not received, disconnecting")
-           c.workers.Done()
-           c.internalConnLost(errors.New("pingresp not received, disconnecting"))
-           c.pingTimer.Stop()
-           return
+           DEBUG.Println("starting timer")
+           pingRespTimer := time.NewTimer(c.options.PingTimeout)
+       waitForResponse:
+           for {
+               select {
+               case <-pingRespTimer.C:
+                   CRITICAL.Println(PNG, "pingresp not received, disconnecting")
+                   c.workers.Done()
+                   c.internalConnLost(errors.New("pingresp not received, disconnecting"))
+                   c.pingTimer.Stop()
+                   return
+               case <-c.pingReceived:
+                   DEBUG.Println("ping received")
+                   break waitForResponse
+               }
+           }
+           pingRespTimer.Stop()
        }
    }
 }
alsm commented 8 years ago

@jinie Is this still on an RPi? I just added an autoreconnect test and that passes here. If you can run that test and it fails/hangs see if there is any information in the pprof debug, or import pprof into your app and get the active goroutines, stack trace etc when it hangs. The pprof bits are the last import in unit_client_test.go and the go func in the init() of that file.

jinie commented 8 years ago

It's running on an RPi 3, but the client - on an RPi B+ - seems to cope just fine. The client however isn't subscribed to any topics, it only publishes data.

jinie commented 8 years ago

I ran the test on my RPi a couple of times, it passed twice, and the 3rd run got a couple of errors

DEBUG    15:59:24 [store]    memory store closed
--- FAIL: Test_ping1_idle5 (5.61 seconds)
    fvt_client_test.go:893: Connection-lost handler was called: EOF

and

DEBUG    15:59:35 [store]    store is opened at /tmp/TestStore/_get_error
WARNING  15:59:35 [store]    corrupted file detected: unexpected EOF archived at: /tmp/TestStore/_get_error/o.120.CORRUPT
DEBUG    15:59:35 [store]    store is opened at /tmp/TestStore/_all

I have no idea how to obtain the pprof data you're asking. I've attached the output from the test, not that it helps much. dump.txt

alsm commented 8 years ago

The ping test failure is unusual, but could just be coincidence, and it means the autoreconnect test succeeded all 3 times. The warning about a corrupted store file means that the store is working correctly.

Re pprof https://golang.org/pkg/net/http/pprof/ is what I'm talking about, when the test suite is running you can connect to http://127.0.0.1:6060/debug/pprof to see the goroutines that are running and the stack trace for each, very handy if any of the tests hang, also useful for debugging applications.

jinie commented 8 years ago

So far nothing seems to be hanging. I've seen the ping test fail 2/10 times now.

alsm commented 8 years ago

That's probably due to the short timeout in the test, 2 seconds, depending on how far you are from the test server the clock sync could mean you get disconnected by the server before your ping arrives, if you have a local mqtt server you can export the environment variable TEST_FVT_ADDR with the hostname of that server and it'll use that instead of iot.eclipse.org

jinie commented 8 years ago

I haven't seen the pingresp timeout since the commit, so i guess that's fixed. Through debugging i found out that it reconnects alright with autoreconnect=true, but it fails to resubscribe.

I'm using the following code to connect (i've tried with cleansession=true and cleansession=false, same results)

url := fmt.Sprintf("tcp://%s:%d", flags.host, flags.port)
    opts := MQTT.NewClientOptions().AddBroker(url)
    opts.SetDefaultPublishHandler(OnMessage)
    opts.SetAutoReconnect(true)
    opts.SetCleanSession(false)
    opts.SetConnectionLostHandler(connectionLost)
    host, err := os.Hostname()
    clientID := "mqtt-test-"
    if err != nil {
        ERROR.Println(err)
    } else {
        clientID += host
    }
    opts.SetClientID(clientID)

    DEBUG.Println("Connecting to ", url)
    c := MQTT.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }
    DEBUG.Println("Subscribing to ", flags.topic)
    if token := c.Subscribe(flags.topic, 1, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

Here'a log trace.

ERROR:[net]      logic received from error channel, other components have errored, stopping
DEBUG:[pinger]   keepalive stopped
DEBUG:[net]      outgoing stopped
ERROR:Connection Lost: EOF
DEBUG:[client]   enter reconnect
DEBUG:[client]   about to write new connect msg
DEBUG:[client]   socket connected to broker
DEBUG:[client]   Using MQTT 3.1.1 protocol
DEBUG:[net]      connect started
ERROR:[net]      connect got error read tcp 192.168.1.5:1883: connection reset by peer
DEBUG:[client]   Trying reconnect using MQTT 3.1 protocol
DEBUG:[client]   about to write new connect msg
DEBUG:[client]   socket connected to broker
DEBUG:[client]   Using MQTT 3.1 protocol
DEBUG:[net]      connect started
ERROR:[net]      connect got error read tcp 192.168.1.5:1883: connection reset by peer
DEBUG:[client]   Reconnect failed, sleeping for 1 seconds
DEBUG:[client]   about to write new connect msg
ERROR:[client]   dial tcp 192.168.1.5:1883: connection refused
WARN:[client]   failed to connect to broker, trying next
DEBUG:[client]   Reconnect failed, sleeping for 2 seconds
DEBUG:[client]   about to write new connect msg
DEBUG:[client]   socket connected to broker
DEBUG:[client]   Using MQTT 3.1 protocol
DEBUG:[net]      connect started
DEBUG:[net]      received connack
DEBUG:[client]   client is reconnected
DEBUG:[net]      outgoing started
DEBUG:[net]      outgoing waiting for an outbound message
DEBUG:[net]      logic started
DEBUG:[net]      logic waiting for msg on ibound
DEBUG:[pinger]   keepalive starting
DEBUG:[net]      incoming started
DEBUG:[pinger]   keepalive sending ping
DEBUG:[net]      Received Message
DEBUG:[net]      logic got msg on ibound
DEBUG:[net]      received pingresp
DEBUG:[net]      logic waiting for msg on ibound
DEBUG:[net]      resetting ping timers
^CINFO:Interrupted, shutting down MQ client, signal is  0x563410c0
DEBUG:[client]   disconnecting
DEBUG:[net]      obound priority msg to write, type *packets.DisconnectPacket
DEBUG:[net]      outbound wrote disconnect, stopping
WARN:[net]      logic stopped
DEBUG:[pinger]   keepalive stopped
DEBUG:[net]      incoming stopped
DEBUG:[client]   disconnected
DEBUG:[store]    memory store closed
alsm commented 8 years ago

I will make it more explicit in the documentation but the auto reconnect feature only handles reconnecting the connection. Resuming state is either handled by cleansession=false, this means the server is supposed to remember your subscriptions and will continue to deliver messages to you when you reconnect (and also deliver messages that would have been delivered while you were offline), or by resubscribing when you are connected, the OnConnectHandler function will be called at initial connect and all reconnects.

jinie commented 8 years ago

the server (mosquito 1.4.9) or cleansession=false doesn't work then :) with autoreconnect=true and cleansession=False i don't receive any messages after the disconnect. The good news is that OnConnectHandler works. If this is "working as intended", feel free to close the issue as i've not seen the bug since the last commit.

alsm commented 8 years ago

cleansession is all on the server, it's just a flag in the client that it sets in the connect packet. I just added a test to the client suite for cleansession specifically and tested it against iot.eclipse.org and a local install of mosquitto 1.4.9. You can run this one test with go test -v -run _CleanSession. Against both brokers the test succeeds. running mosquitto with -v might show some more information? I'm going to close this issue now but if you find anything related to cleansession in the client please open a new one.

Thanks to everyone who reported this issue.

jarrodsinclair commented 7 years ago

Using this sample code (https://github.com/eclipse/paho.mqtt.golang/blob/master/cmd/stdoutsub/main.go) and connecting to a local Mosquitto 1.4.11 server leads to socket error/disconnection whenever a KeepAlive is sent. As described above, setting AutoReconnect=true and CleanSession=false will lead to the connection being re-established soon after being disconnected, and provide a work-around for continued operation.

Is this expected behaviour?

I don't experience disconnections with the same KeepAlive setting in the Paho MQTT Python library or mosquitto_sub.

Thanks.

xrlin commented 6 years ago

@jarrodsinclair I meet this probem too. My mosquitto version is 1.4.8 but everything goes well when use mqttjs. Anyone can help me or give me some tips. It cannot work despite setting timeout.


func GetMQTTClient() (client MQTT.Client, err error) {
    if MQTTClient == nil {
        MQTTClient = MQTT.NewClient(options)
        token := MQTTClient.Connect()
        if token.Wait() && token.Error() != nil {
            err = token.Error()
            MQTTClient = nil
            return
        }
    }
    log.Printf("GetMQTTClient result %#v\noptions: %#v", MQTTClient, *options)
    client = MQTTClient
    return
}

func init() {
    options = MQTT.NewClientOptions()
    options.AddBroker("ws://localhost:1883")
    options.SetAutoReconnect(true)
    options.SetPingTimeout(time.Microsecond * 10)
    options.SetConnectTimeout(time.Minute * 10)
}
amenzhinsky commented 6 years ago

@xrlin, @jarrodsinclair it was fixed on the master branch already, most likely you're using the latest stable version, which is not really stable.

xrlin commented 6 years ago

@amenzhinsky Thanks for youre reply. I check the lib in go/src it seems the code has keep up with the master branch. Below is the mosquitto log when call client.Publish. This problem has agonize me for two days.

521561200: New client connected from 127.0.0.1 as mqttjs_e6c0bd2b (c1, k60).
1521561200: Sending CONNACK to mqttjs_e6c0bd2b (0, 0)
1521561204: Received SUBSCRIBE from mqttjs_e6c0bd2b
1521561204:     IMS/P2P/82 (QoS 0)
1521561204: mqttjs_e6c0bd2b 0 IMS/P2P/82
1521561204: Sending SUBACK to mqttjs_e6c0bd2b
1521561204: Received PUBLISH from mqttjs_e6c0bd2b (d0, q0, r0, m0, 'IMS/P2P/82',
 ... (49 bytes))
1521561204: Sending PUBLISH to mqttjs_e6c0bd2b (d0, q0, r0, m0, 'IMS/P2P/82', ..
. (49 bytes))
1521561208: Socket error on client webim-server, disconnecting.
1521561209: New client connected from 127.0.0.1 as webim-server (c1, k30).
1521561209: Sending CONNACK to webim-server (0, 0)
1521561253: Client webim-server has exceeded timeout, disconnecting.

The client with name like 'mqttjs_e6c0bd2b' is the client with mqttjs. It works well. Butn i use golang to publish the broker it failed.

alsm commented 6 years ago

Can you enable the debug output in the go client and post that from a session where it exhibits this behaviour?

xrlin commented 6 years ago

Here is a log between when Publish method called.

My app Name client.go:189: [client]   Connect()
My app Name memstore.go:48: [store]    memorystore initialized
My app Name client.go:206: [client]   about to write new connect msg
My app Name client.go:209: [client]   socket connected to broker
My app Name client.go:216: [client]   Using MQTT 3.1.1 protocol
My app Name client.go:397: [net]      connect started
My app Name client.go:415: [net]      received connack
My app Name client.go:279: [client]   client is connected
My app Name memstore.go:137: [store]    memorystore wiped
My app Name ping.go:27: [pinger]   keepalive starting
My app Name net.go:226: [net]      logic started
My app Name net.go:229: [net]      logic waiting for msg on ibound
My app Name net.go:122: [net]      incoming started
My app Name client.go:298: [client]   exit startClient
My app Name net.go:160: [net]      outgoing started
My app Name net.go:163: [net]      outgoing waiting for an outbound message
My app Name ping.go:46: [pinger]   ping check 5
My app Name client.go:509: [client]   enter Publish
My app Name client.go:534: [client]   sending publish message, topic: 
My app Name net.go:191: [net]      obound wrote msg, id: 1
My app Name net.go:163: [net]      outgoing waiting for an outbound message
My app Name net.go:150: [net]      incoming stopped with error EOF
My app Name net.go:331: [net]      error triggered, stopping
My app Name net.go:166: [net]      outgoing stopped
My app Name ping.go:43: [pinger]   keepalive stopped
My app Name net.go:318: [net]      logic stopped
My app Name messageids.go:55: [msgids]   cleaned up
My app Name client.go:636: Connection lost: EOF
My app Name client.go:435: [client]   Disconnect() called but not connected (disconnected/reconnecting)
My app Name client.go:306: [client]   enter reconnect
My app Name client.go:480: In disconnect and stop channel is already closed
My app Name client.go:365: [client]   Client moved to disconnected state while reconnecting, abandoning reconnect
My app Name messageids.go:55: [msgids]   cleaned up
My app Name client.go:500: [client]   disconnected
My app Name memstore.go:126: [store]    memorystore closed
brocaar commented 6 years ago

My app Name client.go:534: [client] sending publish message, topic:

Shouldn't there be a topic? I noticed that when publishing to a topic "", the client will disconnect (not sure if that is expected behavior).

xrlin commented 6 years ago

@brocaar Oh, thanks. It's my fault. It is a bug in my code. After pasted the log last day i found the mistake.