mochi-mqtt / server

The fully compliant, embeddable high-performance Go MQTT v5 server for IoT, smarthome, and pubsub
MIT License
1.17k stars 207 forks source link

Possible issue with Will retain flag #153

Closed dgduncan closed 1 year ago

dgduncan commented 1 year ago

When testing, I discovered that there may be an issue with the retain flag when Will is published. When setting the Will to retain true it doesn't seem to actually set the retain flag to true when it is published. I can see the message when subscribed to the topic when it is published; however, the message will not be there when subscribed again.

mochi-co commented 1 year ago

Thanks @dgduncan - I will try to look into this as soon as I can. What MQTT protocol/client are you using?

dgduncan commented 1 year ago

@mochi-co paho.mqtt.golang 3.1.1 I will attempt to do some investigation myself as it was late when I posted this.

dgduncan commented 1 year ago

I am beginning to wonder if this isn't an LWT problem but perhaps a retain flag problem. Upon further testing the Will message seems to be publishing normally; however, multiple messages on the topic are being retained in a seemingly random order. I have run a few tests where I publish multiple message from the same client to a topic:

token2 = mqttclient.Publish("LWT/admin", 1, true, "connected")
fmt.Println(token2.WaitTimeout(60 * time.Second))

token2 = mqttclient.Publish("LWT/admin", 1, true, "connected")
fmt.Println(token2.WaitTimeout(60 * time.Second))

And when I subscribe to that topic:

[DEBUG] [client]   Connect()
[DEBUG] [store]    memorystore initialized
[DEBUG] [client]   about to write new connect msg
[DEBUG] [client]   socket connected to broker
[DEBUG] [client]   Using MQTT 3.1.1 protocol
[DEBUG] [net]      connect started
[DEBUG] [net]      received connack
[DEBUG] [client]   startCommsWorkers called
[DEBUG] [client]   client is connected/reconnected
[DEBUG] [net]      incoming started
[DEBUG] [net]      startIncomingComms started
[DEBUG] [net]      outgoing started
[DEBUG] [net]      startComms started
[DEBUG] [pinger]   keepalive starting
[DEBUG] [client]   startCommsWorkers done
[DEBUG] [store]    enter Resume
[DEBUG] [store]    exit resume
[DEBUG] [client]   exit startClient
[DEBUG] [client]   enter Subscribe
[DEBUG] [client]   SUBSCRIBE: dup: false qos: 1 retain: false rLength: 0 MessageID: 1 topics: [LWT/admin]
[DEBUG] [client]   sending subscribe message, topic: LWT/admin
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: inboundFromStore complete
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 1
[DEBUG] [net]      logic waiting for msg on ibound
[DEBUG] [net]      startIncomingComms: got msg on ibound
[DEBUG] [net]      startIncomingComms: received publish, msgId: 1
[DEBUG] [net]      obound priority msg to write, type *packets.SubscribePacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [client]   exit Subscribe
[DEBUG] [net]      logic waiting for msg on ibound
connected
[DEBUG] [net]      putting puback msg on obound
[DEBUG] [store]    memorystore del: message 1 was deleted
[DEBUG] [net]      done putting puback msg on obound
connected
[DEBUG] [net]      putting puback msg on obound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      done putting puback msg on obound
[DEBUG] [net]      obound priority msg to write, type *packets.PubackPacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      obound priority msg to write, type *packets.PubackPacket
[DEBUG] [net]      outgoing waiting for an outbound message
[DEBUG] [net]      startIncoming Received Message
[DEBUG] [net]      startIncomingComms: got msg on ibound
[WARN]  [store]    memorystore del: message 1 not found
[DEBUG] [net]      startIncomingComms: received suback, id: 1
[DEBUG] [net]      startIncomingComms: granted qoss [145]
[DEBUG] [net]      logic waiting for msg on ibound

I received the message twice. I have also done this with one publish to the LWT topic and another publish using the client will and when resubscribing to the same topic multiple messages will be received in a seemingly random order.

mochi-co commented 1 year ago

Thanks @dgduncan - I will look at this as soon as I can πŸ‘πŸ»

mochi-co commented 1 year ago

@dgduncan I've had a small look into this issue.

Retained messages seem to be working normally, so that's good news.

However, I noticed you are using Qos 1 for the publish and subscribe. MQTT implements offline message queuing for messages published with Qos 1 and 2, so if you disconnect and reconnect without setting the clean session (v3) or clean start (v5) flag, the client will inherit the queued Qos messages, which will be delivered after a successful connack.

There is a known issue with message ordering which may also be affecting this (#150), and I'm hoping to resolve that in the near future once I decide on the correct solution.

Does this sound like it might explain the behaviour you are experiencing?

dgduncan commented 1 year ago

Hey @mochi-co thank you for looking into this. I am currently traveling and have not been able to test this to the best of my abilities in a known environment.

My understanding of MQTT retain is that when publishing QOS 1 or 2 with retain true the broker will store the latest message on that topic and publish that to subscribers of that topic regardless of clean session state. Please correct me if I am wrong in this.

In my testing I found that I would receive two retained messages. In my experience with other brokers I never found that to be the case. I am still relatively new to MQTT and could be misunderstanding interactions between retain and clean session.

mochi-co commented 1 year ago

@dgduncan I think this is very possible and would explain what you're experiencing, it's not something I'd thought of. When a client reconnects, publishing pending inflights and publishing retained messages are two separate calls. Potentially, if a Client A subscribes to Topic and then disconnects, and Client B publishes a retained message Hello with QOS 1/2 to Topic, then when Client A reconnected it would receive first the pending inflight for Hello, and then it would receive the retained message Hello when the subscription is transferred.

I'll have to look into this, but it might be a week or so depending on how busy I get. Probably the simplest solution will be to add some kind of message merge and send both the inflights and retained out at the same time, but I will need to investigate more thoroughly to ensure nothing more broad-reaching is happening.

Let me know if you discover anything else which may confirm or rule out anything. Thank you for raising it, either way! I will try to get it fixed.

dgduncan commented 1 year ago

@mochi-co thank you for looking into this. If I have some time I will look into this as well to the best of my ability.

I appreciate you and everyone else working so hard on this project. I have been writing some really cool hooks for this project to integrate some cloud auth and messaging patterns where MQTT isn’t really viable for a personal project of mine and I think this project has a lot of potential :)

mochi-co commented 1 year ago

I've investigated this and it occurs because we resend retained messages when a client inherits an existing session, but per https://groups.google.com/g/mqtt/c/ZUpadJna_tA this is a misreading of the specification. As a result, we end up with the same packet duplicated, because the retained message has a packet id of 0 while the QoS message retains it's original outgoing packet id (e.g. 7) - as such they can't be reconciled into a single message.

As retained messages should only be sent when a client sends a SUBSCRIBE packet, and not on reconnect, the solution is to remove the retained message re-issuance when a session takeover occurs. I will patch this momentarily.

mochi-co commented 1 year ago

This should now be resolved in https://github.com/mochi-co/mqtt/releases/tag/v2.2.0 - let me know! πŸ‘πŸ»

dgduncan commented 1 year ago

@mochi-co I will check this out and update with the results. Thank you for looking into this!

dgduncan commented 1 year ago

Just for an update I have not tested the changes yet but will this week and either keep the issue open or close it.

mochi-co commented 1 year ago

@dgduncan I'll go ahead and close this for now, but if you find it is still a problem, comment and I'll reopen :)

dadebue commented 1 year ago

Hey, unfortunately this problem still exists.

My test case

Findings

Potential problem / solution

Could it be that the problem occurs because the s.sendLWT() is missing a call to s.retainMessage() if the LWT message is retained? So something like this:

func (s *Server) sendLWT(cl *Client) {

    ...

    if pk.FixedHeader.Retain { // [MQTT-3.3.1-5] ![MQTT-3.3.1-8]
        s.retainMessage(cl, pk)
    }

    s.publishToSubscribers(pk)                      // [MQTT-3.1.2-8]
    atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
    s.hooks.OnWillSent(cl, pk)
}

instead of this:

func (s *Server) sendLWT(cl *Client) {

    ...

    s.publishToSubscribers(pk)                      // [MQTT-3.1.2-8]
    atomic.StoreUint32(&cl.Properties.Will.Flag, 0) // [MQTT-3.1.2-10]
    s.hooks.OnWillSent(cl, pk)
}

Thanks and best regards Marco