moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 508 forks source link

When is the "retain" flag forwarded to a subscribed client? #612

Closed jmholzinger closed 7 years ago

jmholzinger commented 7 years ago

I've noticed when receiving messages on the client the packet's retain flag is not set properly for retained messages.

I took a look at the code and it looks like the flag is being ignored when the client forwards the packet to the subscribers. I changed the code here and it seems to work as I would expect, but I'm not sure if this will have any side-effects. Please take a look, if necessary I can create a pull request to apply the fix as described below.

lib/client.js line 224

var packet = {
    topic: topic,
    payload: payload,
    qos: qos,
    messageId: newId
};

Here you can see I added the "retain" flag at the end of the packet:

var packet = {
    topic: topic,
    payload: payload,
    qos: qos,
    messageId: newId,
    retain: options.retain
};
psorowka commented 7 years ago

The Mosca implementation is correct. The behavior required by the MQTT spec is the following:

Therefore, a client can differentiate between "live" packets and "stored" packets it receives by inspecting the retain flag in received packets.

See the forwardRetained function which cares about the second case.

From the spec:

When sending a PUBLISH Packet to a Client the Server MUST set the RETAIN flag to 1 if a message is sent as a result of a new subscription being made by a Client [MQTT-3.3.1-8]. It MUST set the RETAIN flag to 0 when a PUBLISH Packet is sent to a Client because it matches an established subscription regardless of how the flag was set in the message it received

jmholzinger commented 7 years ago

The retain flag is always set to "false", it is not omitted.

See above, the flag is always set to "false".

psorowka commented 7 years ago

yes of course. The "flag" is just a Bit in the packet. It is always present. When I said "not set", I meant "set to false", excuse my bad wording :)

jmholzinger commented 7 years ago

Okay, thank you for the quick feedback.

I assume you meant the client cannot differentiate, if there is a method aside from the "retain" flag it would certainly be useful to know.

psorowka commented 7 years ago

No, I actually meant can differentiate. I am doing this, it works perfectly with Mosca as the Spec requires, at least with Redis persistence

jmholzinger commented 7 years ago

I see, so I should only expect to see the "retain" flag set on an incoming message if and only if the client was offline when the message was first retained? If the message was received previously and the client reconnects the "retained" message will merely be forwarded back to the client?

psorowka commented 7 years ago

it should be like this:


  Client A              Broker            Client B           Client C   

      │                   │   Subscribe       │                  │      
      │                   │◀──────────────────│                         
      │ Publish "foobar"  │                   │                  │      
      ├──────────────────▶│                   │                         
      │   retain true     │                   │                  │      
      │                   │ Forward "foobar"  │                         
      │                   ├──────────────────▶│                  │      
      │                   │   retain false    │                         
      │                   │                   ▼                  │      
      │                   │              Reconnect                      
      │                   │   Subscribe       │                  │      
      │                   │◀──────────────────│                         
      │                   │ Forward "foobar"  │                  │      
      │                   ├──────────────────▶│                         
      │                   │   retain true     │                  │      
      │                   │                   │                         
      │                   │                   ▼                  │      
      ▼                   │                                   Connect   
                          │    Subscribe                         │      
                          │ ◀────────────────────────────────────│      
                          │  Forward "foobar"                    │      
                          │─────────────────────────────────────▶│      
                          │    retain true                       │      
                          ▼                                      ▼      
jmholzinger commented 7 years ago

Okay, this makes sense. Thanks for taking the time to help me out, I suppose I'll have to approach my problem differently.

chartinger commented 6 years ago

@psorowka Could you point out where exactly in the code the retain flag is set? (Memory persistance)

Like the OP said, it seems to me Mosca never sets the retain flag. While the retained messages are sent out via

https://github.com/mcollina/mosca/blob/9686ce8bd72e882905c0149ae5a7ff875bfaabb3/lib/persistence/abstract.js#L86

The retained flag is not set in the new packet at https://github.com/mcollina/mosca/blob/9686ce8bd72e882905c0149ae5a7ff875bfaabb3/lib/client.js#L224

Using the same code on the mosquitto test server and mosca have different outcomes.

psorowka commented 6 years ago

I am not using Mosca anymore and don't know what might have changed within the year. But as far as I know this behaves like the spec requires.

From my understanding the flag is never explicitly set because it had been stored with the incoming packet in the persistence (no matter which one)

chartinger commented 6 years ago

If i understand the retain flag correctly, this gist says otherwise:

https://gist.github.com/chartinger/aeb8859d49b5a092c7cd13e1b797d2c3#file-moscaretain-js

Changing the url to the mosquitto server works as expected. I used the same fix as the OP before finding this issue.

psorowka commented 6 years ago

Hm I agree, your code should do the right thing and yes obviously Mosca is not behaving as required here.

I think problem is that client.forward does not merge the given options parameter from the persistence into the new packet.

From the blame I don't see any changes but I was so sure that it did work correctly...

Sorry for confusing this.