moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 508 forks source link

Retrieves messages in mongo after reconnect #616

Closed jinvillaz-zz closed 7 years ago

jinvillaz-zz commented 7 years ago

Hello, Retrieve messages in mongo after reconnecting. I have problem to retrieve messages after to reconnect with broker Mosca. The messages are saved for send to client after to reconnect, but the client receive only a buffer with length 0. With redis I dont have any problems.

broker code:

let redis = require('redis');  
//Mongodb setting persistence   
let mongoSetting = {
    type: 'mongo',
    url: 'mongodb://localhost:27017/mqtt',
    pubsubCollection: 'mosca',
    mongo: {}
}; 
let settings = {
    port: 1884,
    backend: mongoSetting,
    persistence: {
        factory: mosca.persistence.Mongo,
        url: 'mongodb://localhost:27017/mqtt',
        ttl: {
            subscriptions: 60 * 60 * 1000,
            packets: 60 * 60 * 1000
        }
    }
}; 
//Redis setting persistence
// let redisSetting = {
//     type: 'redis',
//     redis: redis,
//     port: 6379,
//     return_buffers: true, // to handle binary payloads
//     host: "localhost"
// };
// let settings = {
//     port: 1884,
//     backend: redisSetting,
//     persistence: {
//         factory: mosca.persistence.Redis
//     }
// }; 
let setup = () => {
    console.log('Mosca server is up and running');  
};
let server = new mosca.Server(settings);
server.on('ready', setup);
server.on('clientConnected', (client) => {  
    console.log('client connected', client.id);
});
setTimeout(() => {
    let i = 1;
    let interval = setInterval(() => {
        let data = JSON.stringify({data: 'hello client ' + i});
        let message = {
            topic: '/agents',
            payload: data,
            qos: 1,
            retain: false
        };
        server.publish(message, function () {
            console.log('send message.! ' + i);
        });
        i += 1;
        if (i === 6) {
            clearInterval(interval);
        }
    }, 500);
}, 10000);
// fired when a message is received
server.on('published', (packet, client) => {
    if (client) {
        console.log('published topic from client: ', packet.topic);
        let message = packet.payload;
        console.log('published message from client: ', typeof message, message);
        let messageObject = JSON.parse(message);
        console.log('published messageObject from client: ', typeof messageObject, messageObject);
    } else {
        console.log('published server: ', packet);
    }
});

client code:


let param = process.argv[2];
let clientId = param || 'mqttjs_' + Math.random().toString(16).substr(2, 8);
let host = 'mqtt://localhost:1884';
let options = {
    clientId: clientId,
    clean: false
};
let client = mqtt.connect(host, options);
client.on('error', function (err) {
    console.log(err);
    client.end();
});
client.on('connect', function (params) {
    client.subscribe('/agents', {qos: 1});
    console.log('client connected:' + clientId);
    console.log(params);
});
client.on('message', function (topic, message, packet) {
    if (message.length > 0) {
        let messageParsed = JSON.parse(message);
        console.log('Received Message:= ' + messageParsed.toString() + '\nOn topic:= ' + topic);
        console.log(messageParsed);
    } else {
        console.log('message length 0. from: ', topic);
    }
});
client.on('close', function () {
    console.log(clientId + ' disconnected');
});```
mcollina commented 7 years ago

Were you able to track down the source of the problem? Which version of MongoDB are you using?

jinvillaz-zz commented 7 years ago

I am using mongodb 3.2 and node 6.9.4.
I dont know which is the problem exactly. I could check that the data are saved in mongo, the data are sent by mosca to client, but the client only receives messages with length 0. I also tested the same fly code with redis and it is working correctly. datasave clientterminal

mcollina commented 7 years ago

I think payload should be a Buffer to work properly.

mitsos1os commented 7 years ago

Yes I had the same problem, payload should definitely be a Buffer or else offline packet retrieve from Mongo will not work and will always return a zero length buffer. This is dues to the line here

The proper solution would be for the code to check if the returned payload was a Buffer or simply return the payload otherwise... I could issue a pull request with this change

mcollina commented 7 years ago

Please do!

Il giorno ven 21 lug 2017 alle 14:27 Dimitris Halatsis < notifications@github.com> ha scritto:

Yes I had the same problem, payload should definitely be a Buffer or else offline packet retrieve from Mongo will not work and will always return a zero length buffer. This is dues to the line here https://github.com/mcollina/mosca/blob/021517e0112169d6e345234ec5bb66cca394aff9/lib/persistence/mongo.js#L354

The proper solution would be for the code to check if the returned payload was a Buffer or simply return the payload otherwise... I could issue a pull request with this change

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/mcollina/mosca/issues/616#issuecomment-316988020, or mute the thread https://github.com/notifications/unsubscribe-auth/AADL41kE7TXvfwJInMuaGPnAb8agvKxwks5sQJlBgaJpZM4MV3SY .