eclipse / paho.golang

Go libraries
Other
327 stars 92 forks source link

Reconnection failure #230

Closed izarraga closed 7 months ago

izarraga commented 7 months ago

Describe the bug After some hours working, driver lost connection and reconnect fail

Debug output Jan 19 13:58:16 server fxx003[188851]: mqtt connection up Jan 19 13:58:16 server fxx003[188851]: mqtt subscription made Jan 19 13:58:16 server fxx003[188851]: [DEBUG] queue AwaitConnection Jan 19 13:58:16 server fxx003[188851]: [DEBUG] queue got connection Jan 19 19:40:39 server fxx003[188851]: [DEBUG] handleError received error: EOF Jan 19 19:40:39 server fxx003[188851]: [DEBUG] handleError passed error on: EOF Jan 19 19:40:39 server fxx003[188851]: client error: EOF Jan 19 19:40:39 server fxx003[188851]: [DEBUG] mainLoop: connection to server lost (EOF); will reconnect Jan 19 19:40:39 server fxx003[188851]: [DEBUG] connection down Jan 19 19:40:39 server fxx003[188851]: [DEBUG] queue AwaitConnection Jan 19 19:40:39 server fxx003[188851]: mqtt connection up Jan 19 19:40:39 server fxx003[188851]: [DEBUG] queue got connection Jan 19 19:40:39 server fxx003[188851]: failed to subscribe (writev tcp 10.205.10.3:39224->10.205.10.3:2000: writev: broken pipe). This is likely to mean no messages will be received.mqtt subscription made Jan 19 19:40:39 server fxx003[188851]: [DEBUG] handleError received error: EOF Jan 19 19:40:39 server fxx003[188851]: [DEBUG] handleError passed error on: EOF Jan 19 19:40:39 server fxx003[188851]: client error: EOF Jan 19 19:40:39 server fxx003[188851]: [DEBUG] mainLoop: connection to server lost (EOF); will reconnect Jan 19 19:40:39 server fxx003[188851]: [DEBUG] connection down Jan 19 19:40:39 server fxx003[188851]: [DEBUG] queue AwaitConnection Jan 19 19:40:39 server fxx003[188851]: mqtt connection up Jan 19 19:40:39 server fxx003[188851]: [DEBUG] queue got connection Jan 19 19:40:39 server fxx003[188851]: [DEBUG] handleError received error: EOF Jan 19 19:40:39 server fxx003[188851]: [DEBUG] handleError passed error on: EOF Jan 19 19:40:39 server fxx003[188851]: client error: EOF Jan 19 19:40:49 server fxx003[188851]: failed to subscribe (context deadline exceeded). This is likely to mean no messages will be received.mqtt subscription made Jan 19 19:40:49 server fxx003[188851]: [DEBUG] mainLoop: connection to server lost (EOF); will reconnect Jan 19 19:40:49 server fxx003[188851]: [DEBUG] connection down Jan 19 19:40:49 server fxx003[188851]: [DEBUG] queue AwaitConnection Jan 19 19:40:49 server fxx003[188851]: mqtt connection up Jan 19 19:40:49 server fxx003[188851]: [DEBUG] queue got connection Jan 19 19:40:49 server fxx003[188851]: [DEBUG] handleError received error: EOF Jan 19 19:40:49 server fxx003[188851]: [DEBUG] handleError passed error on: EOF Jan 19 19:40:49 server fxx003[188851]: client error: EOF Jan 19 19:40:59 server fxx003[188851]: failed to subscribe (context deadline exceeded). This is likely to mean no messages will be received.mqtt subscription made Jan 19 19:40:59 server fxx003[188851]: [DEBUG] mainLoop: connection to server lost (EOF); will reconnect Jan 19 19:40:59 server fxx003[188851]: [DEBUG] connection down Jan 19 19:40:59 server fxx003[188851]: [DEBUG] queue AwaitConnection Jan 19 19:40:59 server fxx003[188851]: mqtt connection up Jan 19 19:40:59 server fxx003[188851]: [DEBUG] queue got connection Jan 19 19:40:59 server fxx003[188851]: [DEBUG] handleError received error: EOF Jan 19 19:40:59 server fxx003[188851]: [DEBUG] handleError passed error on: EOF Jan 19 19:40:59 server fxx003[188851]: client error: EOF

Software used: https://github.com/eclipse/paho.golang/commit/ef0065fea247d5fb388fa82390cedce694e844e1

MattBrittan commented 7 months ago

Unfortunately I think we are going to need some more info. I've got a couple of instances that have been running since this commit went live so it's not a global problem; I've just restarted the broker and confirmed that the clients reconnected without issue.

From the logs it appears that the connection is coming up but then immediately failing. Do you have access to broker logs? (this might help us see what is going on); it would also be useful to know how you are setting up the client and what it's doing (looks like just subscribing?). A reproducible example would be perfect but I know this can be difficult to create!.

izarraga commented 7 months ago

maybe these server logs can help.

Jan 19 13:58:06 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 13:58:06 newtajo aikoMQTT[188589]: time=2024-01-19T13:58:06.418+01:00 level=WARN msg="" listener=t1 error=EOF Jan 19 13:58:16 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:40:39 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:40:39 newtajo aikoMQTT[188589]: time=2024-01-19T19:40:39.546+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:40:39 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:40:39 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:40:39 newtajo aikoMQTT[188589]: time=2024-01-19T19:40:39.548+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:40:39 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:40:39 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:40:39 newtajo aikoMQTT[188589]: time=2024-01-19T19:40:39.550+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:40:49 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:40:49 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:40:49 newtajo aikoMQTT[188589]: time=2024-01-19T19:40:49.551+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:40:59 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:40:59 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:40:59 newtajo aikoMQTT[188589]: time=2024-01-19T19:40:59.552+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:41:09 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:41:09 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:41:09 newtajo aikoMQTT[188589]: time=2024-01-19T19:41:09.554+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:41:09 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5 Jan 19 19:41:09 newtajo aikoMQTT[188589]: client disconnected: fxx003 Jan 19 19:41:09 newtajo aikoMQTT[188589]: time=2024-01-19T19:41:09.556+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic" Jan 19 19:41:19 newtajo aikoMQTT[188589]: client connected: fxx003 user: proto:5

izarraga commented 7 months ago

The code is practically the one in the example

const ( MQTT_USER = "aiko" MQTT_PWD = "aiko" MQTT_QOS = 1 MQTT_PUB = "d" MQTT_SUB = "s" )

func mqttCfg(clientId string, mqttUrl string) autopaho.ClientConfig {

u, err := url.Parse(mqttUrl)
if err != nil {
    panic(err)
}

cfg := autopaho.ClientConfig{
    Debug: logstd.New(os.Stdout, "[DEBUG] ", 0),
    Errors: logstd.New(os.Stdout, "[ERROR] ", 0),
    //PahoDebug: logstd.New(os.Stdout, "[PAHODEBUG] ", 0),
    //PahoErrors: logstd.New(os.Stdout, "[PAHOERROR] ", 0),
    ServerUrls: []*url.URL{u},
    ConnectUsername: MQTT_USER,
    ConnectPassword: []byte(MQTT_PWD),
    KeepAlive:  20, // Keepalive message should be sent every 20 seconds
    // CleanStartOnInitialConnection defaults to false. Setting this to true will clear the session on the first connection.
    CleanStartOnInitialConnection: false,
    // SessionExpiryInterval - Seconds that a session will survive after disconnection.
    // It is important to set this because otherwise, any queued messages will be lost if the connection drops and
    // the server will not queue messages while it is down. The specific setting will depend upon your needs
    // (60 = 1 minute, 3600 = 1 hour, 86400 = one day, 0xFFFFFFFE = 136 years, 0xFFFFFFFF = don't expire)
    SessionExpiryInterval: 60,
    OnConnectionUp: func(cm *autopaho.ConnectionManager, connAck *paho.Connack) {
        fmt.Println("mqtt connection up")
        // Subscribing in the OnConnectionUp callback is recommended (ensures the subscription is reestablished if
        // the connection drops)
        if _, err := cm.Subscribe(context.Background(), &paho.Subscribe{
            Subscriptions: []paho.SubscribeOptions{
                {Topic: MQTT_SUB + "/+", QoS: MQTT_QOS},
            },
        }); err != nil {
            fmt.Printf("failed to subscribe (%s). This is likely to mean no messages will be received.", err)
        }
        fmt.Println("mqtt subscription made")
    },
    OnConnectError: func(err error) { fmt.Printf("error whilst attempting connection: %s\n", err) },
    // eclipse/paho.golang/paho provides base mqtt functionality, the below config will be passed in for each connection
    ClientConfig: paho.ClientConfig{
        // If you are using QOS 1/2, then it's important to specify a client id (which must be unique)
        ClientID: clientId,
        // OnPublishReceived is a slice of functions that will be called when a message is received.
        // You can write the function(s) yourself or use the supplied Router
        OnPublishReceived: []func(paho.PublishReceived) (bool, error){
            func(pr paho.PublishReceived) (bool, error) {
                //fmt.Printf("topic %s; body: %s (retain: %t)\n", pr.Packet.Topic, pr.Packet.Payload, pr.Packet.Retain)
                //s := strings.Split(pr.Packet.Topic, "/")
                //deviceId := s[1]
                //processServerMsg(deviceId, pr.Packet.Payload)
                return true, nil
            }},
        OnClientError: func(err error) { fmt.Printf("client error: %s\n", err) },
        OnServerDisconnect: func(d *paho.Disconnect) {
            if d.Properties != nil {
                fmt.Printf("server requested disconnect: %s\n", d.Properties.ReasonString)
            } else {
                fmt.Printf("server requested disconnect; reason code: %d\n", d.ReasonCode)
            }
        },
    },
}

return cfg

}

MattBrittan commented 7 months ago

Jan 19 19:40:39 newtajo aikoMQTT[188589]: time=2024-01-19T19:40:39.550+01:00 level=WARN msg="" listener=t1 error="malformed packet: invalid utf-8 string: malformed packet: topic"

A malformed topic should lead to disconnection; so I would guess that is the issue.

It looks like you are using the mochi broker; I have tested the code above (with a basic main) with that and it appears to reconnect OK. I have not been able to duplicate the "malformed packet" (without specifically creating a topic that is malformed, and in that case it behaves as you are seeing).

So currently I believe the issue is probably elsewhere in your code (i.e. you are publishing with an invalid topic). There is a question here as to whether the library should validate the topic prior to sending it (it does not currently do this). Personally I see doing so as extra overhead (the user should ensure the topic is valid).

izarraga commented 7 months ago

I don't see either validate topic in the library. I will investigate.

Thanks @MattBrittan

izarraga commented 7 months ago

verified that it is a not valid utf8 string, specifically 642f001303b500010010008b0832000000

I don't see inside library but in publish example maybe it would be interesting to add:

if !utf8.ValidString(topic) {

izarraga commented 7 months ago

I close issue. Thanks