mqttjs / MQTT.js

The MQTT client for Node.js and the browser
Other
8.59k stars 1.42k forks source link

Messages get not handled/lost when reconnecting mqtt5 client with persistent sessions #1124

Closed Shadowghost closed 4 years ago

Shadowghost commented 4 years ago

Whenever a client with a persistent session reconnects to a broker, messages (can and do) get lost because of the client receiving (and acknowledging) them before any handlers are registered. AFAIK this can't be prevented since we cannot register handlers on the client before the connection is established - the relevant time window is between connection establishment and handler registration because the broker does not care (and cannot know) about the latter one. If I somehow missed how you can register handlers before actually connecting to the broker, it'd be nice if someone can point me in the right direction.

Back on the issue here some more information:

Steps to reproduce

Client needs to have a delay between connecting to the broker and registring handlers (see code example)

  1. Connect a mqtt5 client with persistent session to a broker
  2. Subscribe to a topic
  3. Disconnect the client
  4. Publish something to the topic
  5. Reconnect the client
  6. Check broker logs: You'll see puback from client
  7. Check client logs: You won't see the log that should have been produced when receiving a message

Code example

The code was also tested by using handleMessage handler with the same result.

const clientOpts = {
    protocolId: 'MQTT',
    protocolVersion: 5,
    port: 8338,
    clean: false,
    clientId: 'test',
    resubscribe: true,
    keepalive: 0,
    // MQTT 5.0 properties
    properties: {
        sessionExpiryInterval: 3600,
        receiveMaximum: 750,
        maximumPacketSize: 750,
        topicAliasMaximum: 1024,
    },
}

const mqtt = require('mqtt')

client  = mqtt.connect(this.broker, clientOpts)

await Utils.sleep(1000)

client.on('message', async (topic, message) => {
    await reactToMessage(topic, message)
})

async reactToMessage(topic: string, message: Buffer ) {
    this.logger.verbose('Received message from ' + topic)
}
tennessine commented 4 years ago

sub.js

const mqtt = require('mqtt');

let client = mqtt.connect({
    host: 'test.mosquitto.org',
    clientId: 'issues-1124-sub',
    clean: false
});

client.on('connect', function(connack) {
    console.log('connected');
    // console.log(connack);
    // notice qos should > 0
    client.subscribe('issues/1124', {
        qos: 1
    });
});

client.on('message', function(topic, message, packet) {
    console.log('received new message', topic, message.toString('utf-8'));
});

pub.js

const mqtt = require('mqtt');

let client = mqtt.connect({
    host: 'test.mosquitto.org',
    clientId: 'issues-1124-pub'
});

client.on('connect', function(connack) {
    console.log('connected');
    // console.log(connack);
    // notice qos should > 0
    client.publish('issues/1124', 'issues/1124 message', {
        qos: 1
    });

    client.end();
});

Notice: 1、consumer client should connect with cleansession false and subscribe with qos > 0 2、producer should publish with qos > 0 3、broker you use should support offline message storage

Shadowghost commented 4 years ago

That's exactly what I did and the result is what I've written in my original post. The used broker is latest mosquitto configured with persistence enabled. Producer and consumer publish/subscribe with QoS=1. My problem has nothing to do with persistence or offline messaging but with the fact that between successful connection and handler registration is a gap where incoming messages are just acknowledged and not processed.

jdiamond commented 4 years ago

That is not exactly what you did. You have this weird call to await Utils.sleep(1000) in between calling connect and adding the message handler. If you take that out, the message handler will be available before the connection is established (because it happens asynchronously).

Shadowghost commented 4 years ago

The delay is there to make the loss obvious (it forces the behaviour). You can leave it out and it will eventually happen. In either case connect is completed before the handler registration, that's the source of the described problem. You can check that in the broker logs. But even with the delay the client shouldn't just acknowledge incoming messages and not handle them at all without notifying.

jdiamond commented 4 years ago

It is impossible for the connection to be established before .on('message') is called as in @tennessine's example. If there is a bug that is losing messages, it is for some other reason.

In your code example, you purposely avoided adding a handler so of course it loses messages.

Shadowghost commented 4 years ago

Thanks for the explanation, I wasn't really getting what @tennessine's code should tell me. I also wasn't aware of the fact that the connection is only established after the handlers are registered (I didn't find anything about handler registration being mandatory for connect to be successful in the doc). I'll check my local code tomorrow again and see if I can reproduce my (supposed) error reliably without the forced delay.

Out of curiousity and to get a better understanding: Since connect waits for the handlers, why does my intentional delay break that logic? Shouldn't connect still wait for handler registration and handle the recieved messages with the handlers instead of just acknowledging and not processing them (by using the default handler implementation?)? If you replace the delay with a simple subscribe you can see that subscribe is called and executed before the on('connect') handler is registered and called - shouldn't either subscribe be called and the on('connect') handler be ignored (since the connection is already established before registring the handler) or the handlers be called first (because they're the requirement for a successful connection) and subscribe after them (because it requires a connection)?

I'm new to JavaScript and the handler concept, so thanks in advance for bearing with my (maybe stupid) questions. This problem drove me nuts yesterday, so I want to understand the implications and do it properly.

Shadowghost commented 4 years ago

To clarify the objective of this issue: Is it possible to register the handlers before calling connect?

I think my original issue description wasn't clear about that (at all).

The intentional delay in my code example should highlight that there is a race conditon between connect and handler registration which can lead to message loss.

jdiamond commented 4 years ago

It's not that there is code that waits for the handlers to be set before making the connection--it's just normal JavaScript/Node.js behavior.

Any asynchronous operations started by calling connect (like creating a TCP connection) won't complete on this turn of the event loop no matter how fast it is but the rest of the code in the function (like setting the message handler) will finish on this turn. See Concurrency model and the event loop. But when you add await to a function, it gets broken up and run as separate tasks after each await so you can no longer rely on the whole function completing before your async operations.

The way the API is currently written, you cannot add the handlers before calling connect but that's supposed to be OK. I get that it seems like there could be race conditions, but JavaScript is what it is and this is a normal pattern in Node.js. Check out the example here for net.createConnection (which mqtt.connect eventually ends up calling):

https://nodejs.org/dist/latest-v12.x/docs/api/net.html#net_net_createconnection_options_connectlistener

I'm sure that API was direct inspiration for MQTT.js.

MQTT.js clients have some other helpful behavior. When you call a method like publish or subscribe immediately after calling connect, you would expect it to fail because the connection can't possibly be made yet, right? Internally, the client knows it's not connected yet and adds the packets it wants to send to a queue which it flushes once a connection is finally established. And if the underlying TCP connection gets reset, the client will queue publishes (even qos 0) while offline and flush them after it manages to reconnect. You can mostly avoid wondering "am I currently connected" and let the client manage everything for you.

Shadowghost commented 4 years ago

Thanks for the extensive explanation! Everything's resolved now.

Global-Ekons commented 4 years ago

Error 104

The system32 folder is mia as is the MQTT CLIENT ID, now it doesn't matter what I try, won't even reinstall win10, as it doesn't pass Boot.

Boot option doesn't work either, or any other command doesn't work either.

LAPTOP, DELL LATITUDE E6540, 64bit with 16gb ram. Win10 System was never reinstalled after 2016, as updates did enough, was though a Win8.1 beforehand.

On Tue, 14 Jul 2020, 10:20 AM Shadowghost, notifications@github.com wrote:

Closed #1124 https://github.com/mqttjs/MQTT.js/issues/1124.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/mqttjs/MQTT.js/issues/1124#event-3543037765, or unsubscribe https://github.com/notifications/unsubscribe-auth/APW74GJIH3MFH4UIQJE4W4LR3QIM3ANCNFSM4OS5DU2Q .

mayurpatelgec commented 2 years ago

@Shadowghost,

Can you tell me how did you solve your issue?

I have totally similar situation as you.

Client misses messages when broker wants to send and we have a race condition.

The only solution is at the moment is: SUB 1st QOS_1 and then CONNECT so client will be ready to handle message as soon as broker fires them. but i think its not the best practice to do SUB 1st ad CONNECT 2nd.

Thank you in advance.

Shadowghost commented 2 years ago

I don't remember exactly how we fixed it, but afair the main reason it didn't work was trying to do stuff async and not registring everything directly after creating the connection. The conclusion was that you need to register the handlers and everything else directly after you create the connection, don't do anything inbetween in your code or you may lose messages.

dev-sonen commented 2 years ago

I think forcing it to re-establish the connection when a certain event is fired would be a good approach here.

this is my solution:

const establishConnection = () => {

    let subscriber

    subscriber = mqtt.connect( `wss://${ process.env.MQTT_SERVER }:${ process.env.MQTT_WSS_PORT }` , {

        // your options here.

    } )

    subscriber.on( 'connect' , () => {

        subscriber.subscribe( 'yourTopic/yourData' , ( err , granted ) => {

            const [ object ] = granted

            // "granted" array is empty after "reconnect"

            if( granted.length !== 0 ) {

                console.log( 'connected to: ${ object.topic }' )

            } else {

                subscriber.end()
                // fires the event "end"

            }

        } )
    } )

    subscriber.on( 'message' , ( topic , message , packet ) => {

        // your crazy stuff here.

    } )

    subscriber.on( 'reconnect' , () => console.log( 'your message' ) )
    subscriber.on( 'error' , () => console.error( 'your message' ) )
    subscriber.on( 'offline' , () => console.log( 'your message' ) )
    subscriber.on( 'end' , () => {
        // triggers the event, run the function itself.
        establishConnection()
    } )

}

establishConnection()