moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 508 forks source link

Not able to retrieve the persisted messages #578

Open KohliDev opened 7 years ago

KohliDev commented 7 years ago

Mosca setup with MongoDB persistence as per this PR:

https://github.com/mcollina/mosca/issues/215

  1. Client A Subscribe to topic with QoS 1 using paho-mqtt
  2. Client A Disconnects
  3. Client B Publishes message to the same topic with QoS 1, while Client A is offline
  4. Client A reconnects and subscribes to the same topic (Sometimes able to receive the retained message and sometimes not, the duration is within one hour)

Even facing consistency issue with MQTTfx but works fine in case of mosquitto client.

mosquitto_sub -d -c -i sample -q 1 -t test/offline

Need to debug the cause of this inconsistency.

Using the below sample paho-mqtt Python code for subscribing:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc):
    print("Connected with result code "+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("test/offline")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))

client = mqtt.Client(client_id="sample", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message

client.connect(server-ip, 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()
mcollina commented 7 years ago

This? https://github.com/mcollina/mosca/issues/215#issuecomment-74189580.

MongoDB persistence has a 1h TTL: https://github.com/mcollina/mosca/blob/master/lib/persistence/mongo.js#L35-L46.

Note the persistence flag.

KohliDev commented 7 years ago

https://github.com/mcollina/mosca/issues/215#issuecomment-74189580, using the same configuration.

mcollina commented 7 years ago

try changing the TTL.

KohliDev commented 7 years ago

Still facing the inconsistency. I believe a particular topic is not tied to a clientID for publishing the retained messages only to it.

mcollina commented 7 years ago

Can you please provide detailed instructions to reproduce, including source code examples, client (possibly MQTT.js) and so on?

KohliDev commented 7 years ago

Mosca-Server.js

var mosca = require('mosca');
var mongoUrl = 'mongodb://localhost:27017/mqtt';
var ascoltatore = {
  //using ascoltatore
  type: 'mongo',
  url: mongoUrl,
  pubsubCollection: 'ascoltatori',
  mongo: {}
};

var settings = {
  port: 1883,
  persistence: {
    factory: mosca.persistence.Mongo,
    url: mongoUrl,
  },
  backend: ascoltatore
};
var server = new mosca.Server(settings);
server.on('ready', setup);

server.on('clientConnected', function(client) {
    console.log('client connected', client.id);
});

// fired when a message is received
server.on('published', function(packet, client) {
    console.log('Published',packet.payload);
    if(client!==undefined&&client!==null){
        console.log("\tclient",client.id);  
    }
});
// fired when the mqtt server is ready
function setup() {
  console.log('Mosca server is up and running')
}

Normal Publisher: mosquitto_pub -d -i client1 -q 1 -t test/offline1 -m "h100"

Subscriber using Paho-MQTT:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, rc):
    print("Connected with result code "+str(rc))
    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("test/offline")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))

client = mqtt.Client(client_id="sample", clean_session=False)
client.on_connect = on_connect
client.on_message = on_message

client.connect(localhost, 1883, 60)

# Blocking call that processes network traffic, dispatches callbacks and
# handles reconnecting.
# Other loop*() functions are available that give a threaded interface and a
# manual interface.
client.loop_forever()

I start the subscriber, publish a message which is received at the subscriber then purposely bring the subscriber down. Publish an offline message with QoS=1 and start the subscriber again sometimes able to retrieve the message sometimes not. Tried changing the clientID and then re-subscribe facing the same issue.

mcollina commented 7 years ago

A couple of note in your code:

a. your publisher issue a PUBLISH on topic test/offline1, while your subscriber subscribes to test/offline. b. the QoS of the subscriber is 0, https://pypi.python.org/pypi/paho-mqtt/1.1#subscribe-unsubscribe, so there is no offline message support.

KohliDev commented 7 years ago

Thanks, for pointing out the issues. The main cause was that I was stopping the client session completely (Disconnect). If the network disconnect happens and the client session is retained I`m able to get the messages but not in the case of the client session closure. This is the default working mechanism of MQTT Protocol with QoS=1, figured it out a bit late :-)