mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.3k stars 223 forks source link

paho golang client cannot keep stable connection #293

Closed cyclamen closed 1 year ago

cyclamen commented 1 year ago

I use the paho golang client like the codes. The client prints:

2023/09/07 12:36:17 ERROR Connect lost: error=EOF 2023/09/07 12:36:17 INFO Connected 2023/09/07 12:36:27 ERROR Connect lost: error=EOF 2023/09/07 12:36:27 INFO Connected 2023/09/07 12:37:14 ERROR Connect lost: error=EOF 2023/09/07 12:37:14 INFO Connected 2023/09/07 12:37:22 ERROR Connect lost: error=EOF 2023/09/07 12:37:22 INFO Connected 2023/09/07 12:38:58 ERROR Connect lost: error=EOF 2023/09/07 12:38:58 INFO Connected 2023/09/07 12:39:23 ERROR Connect lost: error=EOF 2023/09/07 12:39:23 INFO Connected 2023/09/07 12:39:44 ERROR Connect lost: error=EOF 2023/09/07 12:39:44 INFO Connected 2023/09/07 12:42:52 ERROR Connect lost: error=EOF 2023/09/07 12:42:52 INFO Connected 2023/09/07 12:43:51 ERROR Connect lost: error=EOF 2023/09/07 12:43:51 INFO Connected 2023/09/07 12:43:59 ERROR Connect lost: error=EOF 2023/09/07 12:43:59 INFO Connected .......

package main

import (
    "fmt"
    "github.com/eclipse/paho.mqtt.golang"
    "log/slog"
    "os"
    "time"
)

var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
    fmt.Printf("TOPIC: %s\n", msg.Topic())
    fmt.Printf("MSG: %s\n", msg.Payload())
}

func main() {
    //mqtt.DEBUG = log.New(os.Stdout, "", 0)
    //mqtt.ERROR = log.New(os.Stdout, "", 0)
    opts := mqtt.NewClientOptions().AddBroker("tcp://10.1.0.213:1883").SetClientID("abcdefghi1")
    opts.OnConnect = connectHandler
    opts.OnConnectionLost = connectLostHandler
    //opts.SetDefaultPublishHandler(messagePubHandler)
    opts.SetKeepAlive(2 * time.Second)
    opts.SetDefaultPublishHandler(f)
    opts.SetPingTimeout(1 * time.Second)

    c := mqtt.NewClient(opts)
    if token := c.Connect(); token.Wait() && token.Error() != nil {
        panic(token.Error())
    }

    if token := c.Subscribe("go-mqtt/sample", 0, nil); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("go-mqtt/sample", 0, false, text)
        token.Wait()
    }

    time.Sleep(600 * time.Second)

    if token := c.Unsubscribe("go-mqtt/sample"); token.Wait() && token.Error() != nil {
        fmt.Println(token.Error())
        os.Exit(1)
    }

    c.Disconnect(250)

    time.Sleep(1 * time.Second)
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
    slog.Info("Connected")

}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
    slog.Error("Connect lost:", "error", err)

}

and the server log :

12:35PM INF added hook hook=allow-all-auth 12:35PM INF attached listener address=:1883 id=t1 protocol=tcp 12:35PM INF mochi mqtt starting version=2.3.0 12:35PM INF mochi mqtt server started 12:36PM WRN error=EOF listener=t1 12:36PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54389: i/o timeout" listener=t1 12:36PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54431: i/o timeout" listener=t1 12:37PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54474: i/o timeout" listener=t1 12:37PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54643: i/o timeout" listener=t1 12:38PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:54671: i/o timeout" listener=t1 12:39PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55054: i/o timeout" listener=t1 12:39PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55139: i/o timeout" listener=t1 12:42PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55217: i/o timeout" listener=t1 12:43PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:55913: i/o timeout" listener=t1 12:43PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56168: i/o timeout" listener=t1 12:44PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56196: i/o timeout" listener=t1 12:45PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56332: i/o timeout" listener=t1 12:45PM WRN error="read tcp 10.1.0.213:1883->10.1.0.73:56539: i/o timeout" listener=t1

so, how to make the client be stable.

werbenhu commented 1 year ago

@cyclamen It's caused by a too short keepalive setting. For more information about keepalive, please refer to MQTT v3 spec 3.1.2.10

opts.SetKeepAlive(60 * time.Second)
cyclamen commented 1 year ago

@werbenhu

It works!Thanks!

lucasjinreal commented 1 year ago

@werbenhu hi, if just set 60s, does it too small?