eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

goroutine leak when connectionUp(true) return error #675

Closed kiqi007 closed 3 months ago

kiqi007 commented 4 months ago

Issue Description: Encountered a goroutine leak scenario when using the MQTT package.

    for {
        select {
        case <-m.checkDone:
            hlog.Info("mqtt check done")
            return
        default:
            if m.client == nil || !m.client.IsConnectionOpen() {
                if nil != m.client {
                    m.client.Disconnect(1000)
                }
                err := m.Connect() // create a new client, and do connect
                if nil != err {
                    hlog.Error("mqtt Connect error", elog.String("host", m.host), elog.Int("port", m.port), elog.FieldErr(err))
                } else {
                    hlog.Info("mqtt Connect success", elog.String("host", m.host), elog.Int("port", m.port))
                }
            }
            <-m.connCheckTicker.C
        }
    }
gorouine pprof:
goroutine profile: total 2787
1339 @ 0xe2e556 0xe3edac 0xe3ed86 0xe57ae5 0xe77032 0x16e6c25 0x16e43dc 0xe5ca61
#   0xe57ae4    sync.runtime_Semacquire+0x24                            /usr/local/go/src/runtime/sema.go:56
#   0xe77031    sync.(*WaitGroup).Wait+0x51                         /usr/local/go/src/sync/waitgroup.go:136
#   0x16e6c24   github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers+0x4a4   /home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:608
#   0x16e43db   github.com/eclipse/paho%2emqtt%2egolang.(*client).Connect.func1+0x49b       /home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:282

1339 @ 0xe2e556 0xe3edac 0xe3ed86 0xe57c05 0xe755a5 0x16e7cc9 0x16e7c9e 0x16e5cc5 0x16e5c06 0x16e5b87 0xe5ca61
#   0xe57c04    sync.runtime_SemacquireMutex+0x24                       /usr/local/go/src/runtime/sema.go:71
#   0xe755a4    sync.(*Mutex).lockSlow+0x164                            /usr/local/go/src/sync/mutex.go:162
#   0x16e7cc8   sync.(*Mutex).Lock+0xa8                             /usr/local/go/src/sync/mutex.go:81
#   0x16e7c9d   github.com/eclipse/paho%2emqtt%2egolang.(*client).stopCommsWorkers+0x7d     /home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:711
#   0x16e5cc4   github.com/eclipse/paho%2emqtt%2egolang.(*client).disconnect+0x24       /home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:498
#   0x16e5c05   github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect.func1.1+0x25   /home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:451
#   0x16e5b86   github.com/eclipse/paho%2emqtt%2egolang.(*client).Disconnect.func1+0x4a6    /home/jenkins/go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.4.2/client.go:470

Local Reproduction Steps: Perform the following code changes and simulate network fluctuations.

// github.com/eclipse/paho.mqtt.golang/client.go:574 startCommsWorkers
        c.workers.Add(1)
        go keepalive(c, conn)  // issue:  This goroutine will change the connect status to disconnecting
    }

    // matchAndDispatch will process messages received from the network. It may generate acknowledgements
    // It will complete when incomingPubChan is closed and will close ackOut prior to exiting
    incomingPubChan := make(chan *packets.PublishPacket)
    c.workers.Add(1) // Done will be called when ackOut is closed
    ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)

    time.Sleep(10 * time.Second) // issue:  Simulate the situation where keepalive has already converted status to disconnecting, but connectionUp has not yet been executed

    // The connection is now ready for use (we spin up a few go routines below). It is possible that
    // Disconnect has been called in the interim...
    if err := connectionUp(true); err != nil {
        DEBUG.Println(CLI, err)
        close(c.stop) // Tidy up anything we have already started
        close(incomingPubChan)
        c.workers.Wait()       // issue: Goroutine will block at this location
        c.conn.Close()

Issue Cause: Within the code mentioned above, there is no call to Done() between Add() and Wait(), which causes the Wait() method to block indefinitely. To resolve this issue, a call to Done() needs to be properly placed.

MattBrittan commented 4 months ago

Hmmm, so we probably need to drain ackOut and then call c.workers.Done() before calling c.workers.Wait().

kiqi007 commented 4 months ago

Hey there! Any idea when we might tackle this issue? Also, could anyone lend a hand with fixing it? I stumbled upon this while assisting my colleagues in debugging a memory leak, but I'm not very familiar with the MQTT library itself. So, if someone could help fix this problem, that would be great!

MattBrittan commented 4 months ago

@kiqi007 I flagged it as "Help Wanted" because I doubt it's something I will get around to in the foreseeable future (don't ever call Disconnect personally and am investing more time in the v5 client).