moscajs / mosca

MQTT broker as a module
mosca.io
3.2k stars 513 forks source link

Issue with Persistence #723

Open vrst37 opened 6 years ago

vrst37 commented 6 years ago

Hello, I am noticing a strange behavior in my setup and I would appreciate your help.

Here is my server.js

I hope I have all my configuration right. Please do let me know if the configuration needs to be improved.

// Require MQTT broker
var mosca = require('mosca');

var LOCAL_MQTT_BROKER_PORT = 1885;
var LOCAL_MONGO_DB_PORT = 27018;
var ASCOLTATORE_DB_NAME = 'mqtt_db';
var MONGO_DB_URI = 'mongodb://localhost:';
var RETAIN_DB_NAME = 'mqtt_retain';
var ASCOLTATORE_DB_SIZE = 100 * 1024 * 1024 * 1024;

var asc_url = MONGO_DB_URI + LOCAL_MONGO_DB_PORT + '/' + ASCOLTATORE_DB_NAME;
var persistence_url = MONGO_DB_URI + LOCAL_MONGO_DB_PORT + '/' + RETAIN_DB_NAME;

//https://github.com/mcollina/mosca/wiki/Mosca-basic-usage
//using ascoltatore backend
var ascoltatore = {
    type: 'mongo',
    size: ASCOLTATORE_DB_SIZE,
    url: asc_url,
    pubsubCollection: 'ascoltatori',
    mongo: {}
};

// https://github.com/mcollina/mosca/issues/264
// Final settings for Mosca MQTT broker
var settings = {
    port: LOCAL_MQTT_BROKER_PORT,
    backend: ascoltatore,
    persistence: {
        factory: mosca.persistence.Mongo,
        url: persistence_url
    }
};

/**
 * This function is executed by the Mosca framework when the server is up and 
 * ready
 */
function mqtt_server_ready() {
    console.log('Mosca server is up and running');
}

// Define HTTP and MQTT servers
var mqtt_server = new mosca.Server(settings);

mqtt_server.on('ready', mqtt_server_ready);

Now I write a client (who is constantly publishing data) (python code send.py):

import paho.mqtt.client as mqtt
import time

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe("#")

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

client.connect("10.1.0.106", 1885, 60)

client.loop_start()

while True:
    client.publish(topic="sender", payload="{}".format(time.time()), qos=1)
    time.sleep(1)

And here is another client recv.py:

import paho.mqtt.client as mqtt

# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))

    # Subscribing in on_connect() means that if we lose the connection and
    # reconnect then subscriptions will be renewed.
    client.subscribe('sender', qos=1)

# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client(
    client_id="12345678", clean_session=False, userdata=None, protocol=mqtt.MQTTv311
)

client.on_connect = on_connect
client.on_message = on_message

client.connect("10.1.0.107", 1885, 60)

client.loop_forever()

A quick side question about this setup:

My main question: If in recv.py I subscribe to # ("#" instead of "sender"), then the setup acts as if there is no persistence.

Here are my steps:

Can someone shed light on what is going on?

vrst37 commented 6 years ago

I confirm the following behavior:

Persistent messages are only saved when send.py is operating on QOS=1.

I think as per spec, the sender QOS should have no impact on persistence (between receiver and MQTT Broker).

mcollina commented 6 years ago

Can you try https://github.com/mcollina/aedes? It should implement the behavior you are seeking for.

vrst37 commented 6 years ago

Thanks for the reply @mcollina

Do you think Aedes is production ready?