eclipse / paho.mqtt.golang

Other
2.73k stars 533 forks source link

Race Condition when subscribing to a topic with a retained message #663

Closed andig closed 8 months ago

andig commented 8 months ago

I'm listening to topics with retained messages:

token := m.Client.Subscribe(topic, m.Qos, func(c paho.Client, msg paho.Message) {
    //...
})

select {
case <-time.After(request.Timeout):
    return fmt.Errorf("subscribe: %s: %w", topic, api.ErrTimeout)
case <-token.Done():
    return nil
}

I would expect that- when Subscribe() token handling completes- the callback will have been invoked if a retained message exists and no error occured. That seems not always to be the case.

There used to be a similar issue in the java version: https://github.com/eclipse/paho.mqtt.java/issues/432.

Here is a related log:

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [client]   startCommsWorkers done
[DEBUG] [client]   exit startClient
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [pinger]   keepalive starting
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [price]
[DEBUG] [client]   sending subscribe message, topic: price
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [1]
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 0
[DEBUG] [net]      logic waiting for msg on ibound

I'm wondering what this means:

SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 2 topics: [price]

Subscribe() has no way to specify retain- should this matter?

MattBrittan commented 8 months ago

I would expect that- when Subscribe() token handling completes- the callback will have been invoked if a retained message exists and no error occured. That seems not always to be the case.

Your expectation is incorrect; the Subscribe token completes when a SUBACK packet is received (indicating that the server has processed the subscription request). THE MQTT spec states:

The Server is permitted to start sending PUBLISH packets matching the Subscription before the Server sends the SUBACK Packet.

and

Any existing retained messages matching the Topic Filter MUST be re-sent, but the flow of publications MUST NOT be interrupted [MQTT-3.8.4-3].

It does not state whether retained messages are sent before or after the SUBACK. In your example the messages are being received after the SUBACK.

There used to be a similar issue in the java version: https://github.com/eclipse/paho.mqtt.java/issues/432.

That was quite a different issue (the Java client was not passing messages received before the SUBACK was received onto the appropriate handler; this client registers the handler before sending the SUBSCRIBE request).

Subscribe() has no way to specify retain- should this matter?

This is just a generic record of the packet fixed header (the flag only has meaning for PUBLISH packets).

Note: As you are subscribing with QOS1 be mindful of the potential issue mentioned in the readme:

When QOS1+ subscriptions have been created previously and you connect with CleanSession set to false it is possible that the broker will deliver retained messages before Subscribe can be called. To process these messages either configure a handler with AddRoute or set a DefaultPublishHandler. If there is no handler (or DefaultPublishHandler) then inbound messages will not be acknowledged. Adding a handler (even if it's opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})) is highly recommended to avoid inadvertently hitting inflight message limits.

andig commented 8 months ago

Thank youl @MattBrittan for the detailed explanation. I admit not being accustomed with the spec. The specified behavior seems unfortunate though. As far as I understand, there is no way for a client to identify if a server has a retained value waiting for him other that wait- how long would depend on server load, network, etc.

Much appreciated, and Merry Christmas šŸŽ…šŸ».

MattBrittan commented 8 months ago

As far as I understand, there is no way for a client to identify if a server has a retained value waiting for him other that wait- how long would depend on server load, network, etc.

Correct; retained messages provide initial values when a connection is established; there is no real provision for systems to connect, get retained messages and disconnect (i.e. the MQTT protocol provides no simple mechanism for this).

I'll close this off as I believe your question has been answered; have a great Christmas!