mqttjs / async-mqtt

Promise wrapper over MQTT.js
MIT License
272 stars 50 forks source link

publish does not block or throw even if there is no Internet connection #41

Closed WKolaj closed 3 years ago

WKolaj commented 3 years ago

Hello,

I've encountered an issue associated with the loss of internet connection during publishing data. Event if there is no Internet connection, publish method does not throw or block - it simply acts, like the data was send successfully. Is this behavior intentional?

If so, is there a possibility to check whether publish was send to the server? I would like to store this data and send them after the return of Internet connection.

Below you will find my code for testing:


const mqtt = require("async-mqtt");
const credentials = require("./credentials.json");
const connectionURL = "presiot.mciotextension.eu1.mindsphere.io";

const connectionOptions = {
  port: 1883,
  clientId: "mqttjs_87654324",
  username: "presiot/" + credentials.username,
  password: credentials.password,
  device_name: "mqttjs_87654324",
  tenant: "presiot",
  protocol: "mqtt/tcp",
  host: "mciotextension.eu1.mindsphere.io",
};

const snooze = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

/**
 * @description Method for generating command with random value
 */
let generateCommandWithRandValue = () => {
  //Generating value and timestamp to send
  let value = Math.round(Math.random() * 1000000) / 100;
  let timestamp = Date.now();

  return [
    200,
    "variableGroup",
    "variableName",
    value.toFixed(2),
    "V",
    new Date(timestamp).toISOString(),
  ].join(",");
};

let exec = async () => {
  //Connecting
  let client = await mqtt.connectAsync(connectionURL, connectionOptions);

  while (true) {
    try {
      //Waiting 1s
      await snooze(1000);

      //Generating command to send
      let command = generateCommandWithRandValue();

      //BREAKING INTERNET CONNECTION HERE

      //Sending value to mqtt broker
      await client.publish("s/us", command);

      //EVEN IF THERE IS NO INTERNET PUBLISH DOES NOT THROW OR BLOCK - COMMAND IS TREATED AS SEND PROPERLY

      console.log(`Command send properly: ${command}`);
    } catch (err) {
      console.log(err);
    }
  }
};

exec();
WKolaj commented 3 years ago

It seems I have found the solution.

The issue lies in QoS being by default set to 0 - what does not ensure that data must be recieved by the server. If QoS is set to 1, publish hangs until it gets response from the server.

Now, you can set timeout method to detect this hanging and reject - what tryPublishing function below does.

After rejection, it is possible to store this data - eg. in a database or file and try sending it further.


const mqtt = require("async-mqtt");
const credentials = require("./credentials.json");
const connectionURL = "presiot.mciotextension.eu1.mindsphere.io";

const connectionOptions = {
  port: 1883,
  clientId: "mqttjs_87654324",
  username: "presiot/" + credentials.username,
  password: credentials.password,
  device_name: "mqttjs_87654324",
  tenant: "presiot",
  protocol: "mqtt/tcp",
  host: "mciotextension.eu1.mindsphere.io",
};

const snooze = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

/**
 * @description Method for generating command with random value
 */
let generateCommandWithRandValue = () => {
  //Generating value and timestamp to send
  let value = Math.round(Math.random() * 1000000) / 100;
  let timestamp = Date.now();

  return [
    200,
    "variableGroup",
    "variableName",
    value.toFixed(2),
    "V",
    new Date(timestamp).toISOString(),
  ].join(",");
};

let tryPublishing = async (client, command) => {
  return new Promise(async (resolve, reject) => {
    try {
      //Timeout in case publish cannot be performed due to internet connection losss
      let timeoutHandler = await setTimeout(() => {
        //Rejecting in case of connection timeout
        return reject(new Error("publishing timeout!"));
      }, 1000);

      //Sending value to mqtt broker
      //QoS 1 ensures publish hangs in case internet connection drop
      await client.publish("s/us", command, { qos: 1 });

      //Publish perfromed properly - closing handler
      clearInterval(timeoutHandler);

      return resolve(true);
    } catch (err) {
      return reject(err);
    }
  });
};

let exec = async () => {
  //Connecting
  let client = await mqtt.connectAsync(connectionURL, connectionOptions);

  while (true) {
    try {
      //Waiting 1s
      await snooze(1000);

      //Generating command to send
      let command = generateCommandWithRandValue();

      //Trying publish - promise rejects in case of publishing timeout
      let publishResult = await tryPublishing(client, command);

      if (publishResult) console.log(`Command send properly: ${command}`);
    } catch (err) {
      console.log(err);
      //Here you can save the data to send them later
    }
  }
};

exec();
WKolaj commented 3 years ago

Using above solution solved the issue.