eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

Wait() hangs after Publish() call #598

Closed moneyease closed 2 years ago

moneyease commented 2 years ago

Under certain circumstances our wrapper function never returns

// Publish initiates a notification on a topic with the provided payload.
func (c *Client) Publish(topic string, payload []byte) error {
    token := c.mqttClient.Publish(topic, 1, false, payload)
    token.Wait()
    if token.Error() != nil {
        return token.Error()
    }
    return nil
}

I sent a SIGABRT and found this

sync.runtime_Semacquire(0xc0004f6028)
        /usr/local/go/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc0004f6020)
        /usr/local/go/src/sync/waitgroup.go:130 +0x65
github.com/eclipse/paho%2emqtt%2egolang.startComms.func3(0xc0004f6020, 0xc000482ae0)
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:432 +0x2f
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:431 +0x2a5

goroutine 49 [select]:
github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers.func2(0xc00000e028, 0xc00000e0c0, 0xc000482120, 0xc0003c66c0)
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:602 +0xde
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:597 +0x4b3

goroutine 42 [chan receive, 4129 minutes]:
github.com/eclipse/paho%2emqtt%2egolang.(*baseToken).Wait(0xc0003873c0, 0xc0003a3000)
        /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/token.go:73 +0x38
go.panw.local/pangolin/pubsub.(*Client).Publish(0xc000187e88, 0xc0003a3000, 0x20, 0xc00023c690, 0xe2, 0xf0, 0x0, 0x2)
        /go/pkg/mod/go.panw.local/pangolin@v1.1.12/pubsub/pubsub.go:207 +0xaf
main.publishIPUserUpdateStatus(0xc00009ffc0, 0x3f, 0xc000081020, 0x55, 0xbaa3ef, 0x33, 0xbacbf5, 0x39, 0xba5ab4, 0x2b, ...)

Any help?

MattBrittan commented 2 years ago

Please see the readme for the information required when reporting bugs. My guess would be that you have a handler that is blocking (see common issues in the readme) but without a full list of go routines (or the logs) I can only guess.

moneyease commented 2 years ago

Thanks, I got full stack trace here

This nothing unusual here, we have client sends IoT msg to write to AWS dynamoDB (more)

version:github.com/eclipse/paho.mqtt.golang@v1.3.5 connect options

func (c *Client) Connect() error {
    tlsConf, err := formTLSConfig(&c.ClientCertInfo)
    if err != nil {
        return err
    }
    opts := MQTT.NewClientOptions()
    opts.AddBroker(c.BrokerEndpoint)
    opts.SetClientID(c.ClientID)
    opts.SetCleanSession(true)
    opts.SetTLSConfig(tlsConf)
    opts.SetOrderMatters(true)
    opts.SetResumeSubs(true)
    opts.SetDefaultPublishHandler(c.DefaultMessageHandler)
    opts.SetConnectionLostHandler(c.connLostHandler)
    c.gpcssub = make(map[string][]SubscribeTopic)
    // Anonymous function to re-initiate subscribe on topics we were tracking.
    opts.OnConnect = c.doOnConnect
    c.mqttClient = MQTT.NewClient(opts)
    if token := c.mqttClient.Connect(); token.Wait() && token.Error() != nil {
        return token.Error()
    }
    return nil
}

AWS IotCore MQTT

SIGABRT: abort
PC=0x46da41 m=0 sigcode=0

goroutine 0 [idle]:
runtime.futex(0x105a310, 0x80, 0x0, 0x0, 0x7ffc00000000, 0x7ffc78daac30, 0x300bbd280, 0x105a1c0, 0x7ffc78daac58, 0x40c1ff, ...)
    /usr/local/go/src/runtime/sys_linux_amd64.s:579 +0x21
runtime.futexsleep(0x105a310, 0x0, 0xffffffffffffffff)
    /usr/local/go/src/runtime/os_linux.go:44 +0x46
runtime.notesleep(0x105a310)
    /usr/local/go/src/runtime/lock_futex.go:159 +0x9f
runtime.mPark()
    /usr/local/go/src/runtime/proc.go:1340 +0x39
runtime.stopm()
    /usr/local/go/src/runtime/proc.go:2257 +0x92
runtime.findrunnable(0xc00003e800, 0x0)
    /usr/local/go/src/runtime/proc.go:2916 +0x72e
runtime.schedule()
    /usr/local/go/src/runtime/proc.go:3125 +0x2d7
runtime.park_m(0xc000400600)
    /usr/local/go/src/runtime/proc.go:3274 +0x9d
runtime.mcall(0x0)
    /usr/local/go/src/runtime/asm_amd64.s:327 +0x5b

goroutine 1 [chan receive, 11229 minutes]:
main.main()
    apps/ipsyncd/ipsync_fw_client.go:795 +0x857

goroutine 6 [select]:
go.opencensus.io/stats/view.(*worker).start(0xc00011c300)
    /go/pkg/mod/go.opencensus.io@v0.23.0/stats/view/worker.go:276 +0xcd
created by go.opencensus.io/stats/view.init.0
    /go/pkg/mod/go.opencensus.io@v0.23.0/stats/view/worker.go:34 +0x68

goroutine 35 [IO wait, 11229 minutes]:
internal/poll.runtime_pollWait(0x7ff36fd8c4e0, 0x72, 0x0)
    /usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc00011c718, 0x72, 0x0, 0x0, 0xb8c199)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Accept(0xc00011c700, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:401 +0x212
net.(*netFD).accept(0xc00011c700, 0xc60b90, 0xc000036118, 0xb89bf1)
    /usr/local/go/src/net/fd_unix.go:172 +0x45
net.(*UnixListener).accept(0xc0002e2240, 0xba829b, 0x2f, 0xc000543f90)
    /usr/local/go/src/net/unixsock_posix.go:162 +0x32
net.(*UnixListener).Accept(0xc0002e2240, 0xba829b, 0x2f, 0xc000543f90, 0x1)
    /usr/local/go/src/net/unixsock.go:260 +0x65
main.IdMgrResetHandler()
    apps/ipsyncd/ipsync_fw_client.go:915 +0x202
created by main.main
    apps/ipsyncd/ipsync_fw_client.go:756 +0x489

goroutine 23 [select]:
github.com/eclipse/paho%2emqtt%2egolang.keepalive(0xc0003c66c0, 0x7ff36fb2c480, 0xc000420380)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/ping.go:44 +0x1c5
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:542 +0x71d

goroutine 24 [chan receive]:
github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch.func2(0xc000482120, 0xc000416690, 0xc000482180, 0xc0003c66c0, 0x7ff396a14501, 0xc0004f6010, 0xc000482180, 0xc000482240, 0xc0004822a0, 0xc000482300)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/router.go:170 +0x405
created by github.com/eclipse/paho%2emqtt%2egolang.(*router).matchAndDispatch
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/router.go:169 +0x165

goroutine 34 [sleep]:
time.Sleep(0x6fc23ac00)
    /usr/local/go/src/runtime/time.go:193 +0xd2
main.monitorMsgBusConnection(0xc00009ffc0, 0x3f, 0xc000081020, 0x55, 0xbaa3ef, 0x33, 0xbacbf5, 0x39, 0xba5ab4, 0x2b, ...)
    apps/ipsyncd/ipsync_server_comm.go:37 +0x36
created by main.subscribeToIPSyncUpdates
    apps/ipsyncd/ipsync_server_comm.go:54 +0x438

goroutine 26 [select]:
github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers.func1(0xc0003c66c0, 0xc0004823c0, 0xc000482360, 0xc00000e008)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:567 +0x154
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:564 +0x37f

goroutine 27 [IO wait]:
internal/poll.runtime_pollWait(0x7ff36fd8c5c8, 0x72, 0xffffffffffffffff)
    /usr/local/go/src/runtime/netpoll.go:222 +0x55
internal/poll.(*pollDesc).wait(0xc000480b98, 0x72, 0x1700, 0x178b, 0xffffffffffffffff)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:87 +0x45
internal/poll.(*pollDesc).waitRead(...)
    /usr/local/go/src/internal/poll/fd_poll_runtime.go:92
internal/poll.(*FD).Read(0xc000480b80, 0xc0004b7000, 0x178b, 0x178b, 0x0, 0x0, 0x0)
    /usr/local/go/src/internal/poll/fd_unix.go:166 +0x1d5
net.(*netFD).Read(0xc000480b80, 0xc0004b7000, 0x178b, 0x178b, 0x177e, 0xc000420520, 0xd)
    /usr/local/go/src/net/fd_posix.go:55 +0x4f
net.(*conn).Read(0xc00000ebc8, 0xc0004b7000, 0x178b, 0x178b, 0x0, 0x0, 0x0)
    /usr/local/go/src/net/net.go:183 +0x91
crypto/tls.(*atLeastReader).Read(0xc0003ae0a8, 0xc0004b7000, 0x178b, 0x178b, 0x177e, 0x105a1c0, 0x0)
    /usr/local/go/src/crypto/tls/conn.go:776 +0x63
bytes.(*Buffer).ReadFrom(0xc0004205f8, 0xc53500, 0xc0003ae0a8, 0x40b725, 0xab68e0, 0xb65da0)
    /usr/local/go/src/bytes/buffer.go:204 +0xbe
crypto/tls.(*Conn).readFromUntil(0xc000420380, 0xc564a0, 0xc00000ebc8, 0x5, 0xc00000ebc8, 0x61)
    /usr/local/go/src/crypto/tls/conn.go:798 +0xf3
crypto/tls.(*Conn).readRecordOrCCS(0xc000420380, 0x0, 0x0, 0xc000085e58)
    /usr/local/go/src/crypto/tls/conn.go:605 +0x115
crypto/tls.(*Conn).readRecord(...)
    /usr/local/go/src/crypto/tls/conn.go:573
crypto/tls.(*Conn).Read(0xc000420380, 0xc00042206e, 0x1, 0x1, 0x0, 0x0, 0x0)
    /usr/local/go/src/crypto/tls/conn.go:1276 +0x165
io.ReadAtLeast(0x7ff36fb2c4d8, 0xc000420380, 0xc00042206e, 0x1, 0x1, 0x1, 0xc000072400, 0x461260, 0xc000482420)
    /usr/local/go/src/io/io.go:328 +0x87
io.ReadFull(...)
    /usr/local/go/src/io/io.go:347
github.com/eclipse/paho.mqtt.golang/packets.ReadPacket(0x7ff36fb2c4d8, 0xc000420380, 0x2, 0x2, 0x0, 0x0)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/packets/packets.go:115 +0xa5
github.com/eclipse/paho%2emqtt%2egolang.startIncoming.func1(0x7ff36fb2c4d8, 0xc000420380, 0xc00040a010, 0xc00040a000, 0xc000482420)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:119 +0xfd
created by github.com/eclipse/paho%2emqtt%2egolang.startIncoming
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:117 +0x131

goroutine 28 [select]:
github.com/eclipse/paho%2emqtt%2egolang.startIncomingComms.func1(0xc00000e0d0, 0xc00000e0d8, 0xc000482480, 0xc664d0, 0xc0003c66c0)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:170 +0x17c
created by github.com/eclipse/paho%2emqtt%2egolang.startIncomingComms
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:159 +0x17d

goroutine 29 [select]:
github.com/eclipse/paho%2emqtt%2egolang.startOutgoingComms.func1(0xc00000e0e8, 0xc00000e0f8, 0xc00000e100, 0xc0004829c0, 0xc664d0, 0xc0003c66c0, 0xc66478, 0xc000420380)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:272 +0x185
created by github.com/eclipse/paho%2emqtt%2egolang.startOutgoingComms
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:259 +0x1bf

goroutine 30 [chan receive]:
github.com/eclipse/paho%2emqtt%2egolang.startComms.func1(0xc000482480, 0xc000482ae0, 0xc000482960, 0xc000482a80, 0xc0004f6020)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:401 +0x46
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:400 +0x23e

goroutine 31 [chan receive, 11229 minutes]:
github.com/eclipse/paho%2emqtt%2egolang.startComms.func2(0xc0004829c0, 0xc000482ae0, 0xc0004f6020)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:424 +0x77
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:423 +0x274

goroutine 32 [semacquire, 11229 minutes]:
sync.runtime_Semacquire(0xc0004f6028)
    /usr/local/go/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc0004f6020)
    /usr/local/go/src/sync/waitgroup.go:130 +0x65
github.com/eclipse/paho%2emqtt%2egolang.startComms.func3(0xc0004f6020, 0xc000482ae0)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:432 +0x2f
created by github.com/eclipse/paho%2emqtt%2egolang.startComms
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/net.go:431 +0x2a5

goroutine 49 [select]:
github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers.func2(0xc00000e028, 0xc00000e0c0, 0xc000482120, 0xc0003c66c0)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:602 +0xde
created by github.com/eclipse/paho%2emqtt%2egolang.(*client).startCommsWorkers
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/client.go:597 +0x4b3

goroutine 43 [chan receive, 4 minutes]:
main.getRNandMUAddress(0xc000039031, 0x8)
    apps/ipsyncd/ipsync_fw_client.go:561 +0xc5
created by main.main
    apps/ipsyncd/ipsync_fw_client.go:793 +0x807

goroutine 42 [chan receive, 4129 minutes]:
github.com/eclipse/paho%2emqtt%2egolang.(*baseToken).Wait(0xc0003873c0, 0xc0003a3000)
    /go/pkg/mod/github.com/eclipse/paho.mqtt.golang@v1.3.5/token.go:73 +0x38
go.panw.local/pangolin/pubsub.(*Client).Publish(0xc000187e88, 0xc0003a3000, 0x20, 0xc00023c690, 0xe2, 0xf0, 0x0, 0x2)
    /go/pkg/mod/go.panw.local/pangolin@v1.1.12/pubsub/pubsub.go:207 +0xaf
main.publishIPUserUpdateStatus(0xc00009ffc0, 0x3f, 0xc000081020, 0x55, 0xbaa3ef, 0x33, 0xbacbf5, 0x39, 0xba5ab4, 0x2b, ...)
    apps/ipsyncd/ipsync_fw_client.go:366 +0x412
main.ipSyncHandler()
    apps/ipsyncd/ipsync_server_comm.go:132 +0x6ed
created by main.main
    apps/ipsyncd/ipsync_fw_client.go:792 +0x7d7

rax    0xca
rbx    0x105a1c0
rcx    0xffffffffffffffff
rdx    0x0
rdi    0x105a310
rsi    0x80
rbp    0x7ffc78daac20
rsp    0x7ffc78daabd8
r8     0x0
r9     0x0
r10    0x0
r11    0x286
r12    0x431c0625855d9
r13    0x1059ce0
r14    0x1
r15    0x0
rip    0x46da41
rflags 0x286
cs     0x33
fs     0x0
gs     0x0
MattBrittan commented 2 years ago

I have taken a look at the stack trace and cannot see anything unusual (the library appears to be functioning as expected) which makes it difficult to say anything further without yet more info without more info (a minimal reproducible example would be ideal but logs can also help). Unfortunately tracing this kind of issue can be quite time consuming and requires a lot of info (often the issue turns out to be something external to the library).

Note: You may want to try the latest code (go get github.com/eclipse/paho.mqtt.golang@master) - I don't think any changes have been made that would resolve the issue but working with the latest code makes things simpler for me.

moneyease commented 2 years ago

Thanks, Matt, this is a production issue, some clients were in this state, no certain ways to get traces. Moreover, can you give me a sense of potential problems? I see no goroutine held by the application. A) Who writes to channel b.complete in Wait() token. B) Also, it's wise to use WaitTimeout() instead to mitigate the issue? If we get 'false' does that mean a problem in making?

Thanks

MattBrittan commented 2 years ago

Unfortunately the is not really something I can answer without logs; I cannot even really make a guess. The fact that the stack trace shows it is waiting at packets.go:115 seems to indicate that the library is functional (it is waiting for something to come in over the network as opposed to being deadlocked elsewhere). Another goroutine is waiting at (net.go:272)[https://github.com/eclipse/paho.mqtt.golang/blob/v1.3.5/net.go#L272] and will handle any publish requests (so it looks like the publish packet has probably been sent as this is waiting for another packet to send).

For historic reasons the library is fairly complex (it's evolved over time with many authors and aims to fully manage the connection for you); the new MQTT v5 client is a lot simpler. In the past there have been a range of deadlocks and while all of the ones I'm aware of have been fixed; it's also possible that the issue is with your broker (I have no experience with dynamoDB) or something network related (connection dropping and the client not being advised - but the keepalive should detect that).

A) The token is closed in flowComplete (called when the puback is received) and Error (don't think this will be called with the options you have as the client will try to reconnect). B) Wait always returns true. WaitTimeout returns false in the event it times out. Personally I'd run the Wait etc in a goroutine and just log the result (or perhaps log if the message is not sent within a time period).

moneyease commented 2 years ago

I appreciate your suggestions. I will see to update the library to v5.

MattBrittan commented 2 years ago

Note that the v5 library is a total rewrite so works quite differently. Also some fun tonality (e.g. persistence) is not yet implemented.

MattBrittan commented 2 years ago

I'm going to close this issue because without more information there is really nothing I can do (cannot even ascertain if the issue is in this library or elsewhere in your code). Please feel free to reopen this if you are able to provide additional information.