eclipse-paho / paho.mqtt.golang

Other
2.77k stars 534 forks source link

SubscribeToken never completes if WebSocket gets closed #555

Closed lukestoward closed 2 years ago

lukestoward commented 2 years ago

I was recently upgrading our service dependencies and upgraded this module from v.1.1.1 -> v1.3.5 and found that one of our integration tests started failing. The scenario essentially creates a client and connects to a broker. It then attempts to subscribe to a topic that does not exist. This results in the following client error being received. [ERROR] [client] Connect comms goroutine - error triggered websocket: close 1005 (no status). This error is read from the ibound channel after the out bound priority packets.SubscribePacket is written to the connection.

The main issue here is that the call to token.Wait() blocks forever. I tried to dig through and understand what's happening and I think the issue is that when this error gets read from the ibound channel (net.go) is gets passed to another channel and eventually read off of the commsErrors channel (client.go) but never gets propagated to the original SubscribeToken, thus meaning the baseToken.complete channel is never populated or closed.

I found that if I propagated the connection error by adding c.index[c.lastIssuedID].setError(err) below ERROR.Println(CLI, "Connect comms goroutine - error triggered", err), the call to token.Wait() did return. I doubt this is a proper fix, but perhaps help in some way?

The broker used in our integration testing is an AWS IoT Core broker, and the mqtt client is configured like so:

connOpts := iotMQTT.NewClientOptions()
connOpts.AddBroker(url)
connOpts.SetClientID(clientId)
connOpts.SetCleanSession(true)
connOpts.SetAutoReconnect(true)
connOpts.SetKeepAlive(2 * time.Second)

iotClient := iotMQTT.NewClient(connOpts)
token := iotClient.Connect()
if !token.Wait() || token.Error() != nil {
    return nil, fmt.Errorf("error connecting to iot: %s", token.Error())
}

token := iotClient.Subscribe("non-existing-topic", 1, nil)
token.Wait() // <- blocks forever when error "websocket: close 1005 (no status)" is received.

I found that if I upgraded from v1.1.1 -> v1.2.0 I encounter the same issue, but with a different error message. I believe various changes since v1.2.0 have changed the error message/handling, but I only dug through v1.3.5 as we'd like to use the latest version.

I've attached the stdout logs with debug enabled. stdout_log.txt

The AWS Broker logs the following message when attempting to subscribe to the non-existing-topic:

{
    "timestamp": "2021-11-02 12:15:42.720",
    "logLevel": "ERROR",
    "traceId": "OMITTED",
    "accountId": "OMITTED",
    "status": "Failure",
    "eventType": "Subscribe",
    "protocol": "MQTT",
    "topicName": "non-existing-topic",
    "clientId": "OMITTED",
    "principalId": "OMITTED",
    "sourceIp": "OMITTED",
    "sourcePort": 59414,
    "reason": "AUTHORIZATION_FAILURE",
    "details": "Authorization Failure"
}
MattBrittan commented 2 years ago

Thanks for logging this and providing plenty of detail! I'm not too surprised to see this issue; its probably something that is hidden from most users (as common practice is to subscribe from the On Connect callback). I'll take a look at it within the next week or so.

MattBrittan commented 2 years ago

It looks like this issue is due to a combination of things (inducing issues around dealing with the large number of options the library offers); when the connection is lost we conditionally clean-up the message IDs (releasing tokens)here:

if c.options.CleanSession && !reconnect {
   c.messageIds.cleanUp()
}

cleanUp() terminates the tokens:

for _, token := range mids.index {
        switch token.(type) {
        case *PublishToken:
            token.setError(fmt.Errorf("connection lost before Publish completed"))
        case *SubscribeToken:
            token.setError(fmt.Errorf("connection lost before Subscribe completed"))
        case *UnsubscribeToken:
            token.setError(fmt.Errorf("connection lost before Unsubscribe completed"))
        case nil:
            continue
        }
        token.flowComplete()
    }

The above does not get called if we are automatically reconnecting because the session will be resumed and we don't want any conflicting message IDs allocated in the meantime. The attempt to resend happens in resume:

if isKeyOutbound(key) {
            switch p := packet.(type) {
            case *packets.SubscribePacket:
                if subscription {
                    DEBUG.Println(STR, fmt.Sprintf("loaded pending subscribe (%d)", details.MessageID))
                    subPacket := packet.(*packets.SubscribePacket)
                    token := newToken(packets.Subscribe).(*SubscribeToken)
                    token.messageID = details.MessageID
                    token.subs = append(token.subs, subPacket.Topics...)
                    c.claimID(token, details.MessageID)
                    select {
                    case c.oboundP <- &PacketAndToken{p: packet, t: token}:
                    case <-c.stop:
                        DEBUG.Println(STR, "resume exiting due to stop")
                        return
                    }
                } else {
                    c.persist.Del(key) // Unsubscribe packets should not be retained following a reconnect
                }

So I suspect that this issue will not happen if options.ResumeSubs is true. This is because c.claimID(token, details.MessageID) automatically terminates any token already in the store before adding a new one. So it looks like the solution here will be to check for, and terminate any token in the store when doing the c.persist.Del(key) - a small change.

One challenge is going to be testing this; we use Mosquitto for testing and I cannot think of a way to reliably simulate this situation.

I'd note that the way this all works is a bit confusing (after reconnect all existing tokens are terminated but the messages are still sent out). While I'd like to simplify this I think it's likely to break existing usages (it's been like this for years) so suspect that just closing out the tokens is all that's really needed.

MattBrittan commented 2 years ago

Thinking about this further while the above is all true it is not the root cause. Subscription packets are not part of the session and the only reason this library adds them to the store is due to the non-standard ResumeSubs option. As such I think a more sensible approach is to clear them out of the message ID map if ResumeSubs is false (and not add them to the store at all in that case).

I have merged PR #557 into @master; can you please see if this resolves the issue for you. I cannot think off a way of testing this with Mosquitto so currently it's fixed 'in theory'!

lukestoward commented 2 years ago

@MattBrittan thank you for explaining the root cause and implementing a fix so quickly. I tested the fix this morning and my integration test is now passing 🎉 . The call to .Wait() no longer blocks indefinitely.

I have another separate issue, but I wondered if you could clarify the expected behaviour before I treat this as a new issue. I have another integration test that attempts to publish a message to a non-existing topic. The expectation is that after the call to token.Wait() returns, token.Error() would signal that the publish failed. In v1.1.1 this is true, the error message "Connection lost before Publish completed" is returned. However, in v1.3.5 error is nil in this scenario.

Would you expect that publishing to a non-existing or unauthorised topic would return an error? The AWS MQTT broker logs this publish attempt with "status": "Failure".

If you think this is a bug, I will raise a separate issue. Thanks again 😄

MattBrittan commented 2 years ago

Would you expect that publishing to a non-existing or unauthorised topic would return an error?

These are AWS specific things; the MQTT spec does not really offer support for this kind of thing (a topic is valid if it meets the requirements in section 4) and:

[MQTT-3.3.5-2] | If a Server implementation does not authorize a PUBLISH to be performed by a Client; it has no way of informing that Client. It MUST either make a positive acknowledgement, according to the normal QoS rules, or close the Network Connection.

Unfortunately if it just closes the connection the client has no way to tie that into the message. That means that if the message was QOS1+ then it will be resent when the connection comes back up (and an infinite loop will result). Unfortunately I don't think there is anything we can do about that...

lukestoward commented 2 years ago

It MUST either make a positive acknowledgement, according to the normal QoS rules, or close the Network Connection.

Closing the connection is the signal I was expecting to receive. In v1.1.1 this is signalled via token.Error(), however in v1.3.5 no error is signalled via token.Error(). This appears to be a client behavioural change rather than an AWS specific thing.

MattBrittan commented 2 years ago

In v1.1.1 this is signalled via token.Error(), however in v1.3.5 no error is signalled via token.Error(). This appears to be a client behavioural change rather than an AWS specific thing.

That's quite possible - v 1.1 dates back to 2017 and much of the client has been rewritten (and heaps of options added). This change is probably the cause.

If you have Autoreconnect enabled then the idea is that the client is managing the connection from the end users perspective and the reconnection should be invisible. This means that upon reconnection the cleansession flag is false and Publish packets will be resent. Because of this it does not really make sense to error out the token (the end user should assume that the message will be delivered eventually). Thinking about this I can see a few issues with how it's working (mainly re QOS2) but as I don't use CleanSession=true myself its not something I really encounter. As noted in my original comment I think this could be tidied up a bit (there is probably no need to end all tokens upon reconnect).