eclipse-paho / paho.mqtt.embedded-c

Paho MQTT C client library for embedded systems. Paho is an Eclipse IoT project (https://iot.eclipse.org/)
https://eclipse.org/paho
Other
1.37k stars 757 forks source link

Messages are not retained even if the flag is set to true #222

Open dervur opened 3 years ago

dervur commented 3 years ago

Hello there,

I'm trying to use a modified version of the sample code for Arduino (I'm using an ESP01 with arduino's framework) however, if I try to retain the messages as I did below in msg_online() and in the last will, they are published as not retained 99% of time. Every once in while they are set as retained, but it a very very rare case. Would you know where the problem in my code is or suggest some solutions?

This is my code below (I only cleared my Wifi and personal broker fields for privacy reasons, but I have the problem both with my broker and with hiveMQ's public broker, so I left only hivemq's broker field):

#include <Arduino.h>

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <SPI.h>
#include "ESP8266WiFi.h"
#include <IPStack.h>
#include <Countdown.h>
#include <MQTTClient.h>

/******************* Wi-Fi connection options *******************/
//Enter SSID
#define ssid "" 
//Enter Password
#define wifipassword ""
/****************************************************************/

/******************* MQTT CONNECT options ***********************/
//HiveMQ open broker host and port
#define brokerhost "broker.hivemq.com"
#define brokerport 1883

//ClientID (must be unique for each device)
#define MQTTclientID "ESP01"
//Authentication
#define MQTTusername ""
#define MQTTpassword ""
//Persistent session = 0, Clean session = 1
#define MQTTsession 0
//MQTT 3.1.1 = 4, MQTT 3.0 = 3
#define MQTTver 3
//Time to keep alive the session defined in seconds
#define MQTTkeepAlive 120
//Last Will and Testament (LWT_flag = 1 to implement this)
#define LWT_flag 1
#define LWT_topic "EE579/ESP01/status"
#define LWT_payload "offline"
#define LWT_retained 1
#define LWT_qos 1
/****************************************************************/

void messageArrived(MQTT::MessageData& md)
{
  //Obtain MQTT message
  MQTT::Message &message_rec = md.message;
  //Obtain topic string
  MQTTLenString topic_rec = md.topicName.lenstring;

  char printbuf[100];
  //Print message data
  sprintf(printbuf, "Message arrived: qos %d, retained %d, dup %d, packetid %d, ", 
        message_rec.qos, message_rec.retained, message_rec.dup, message_rec.id);
  Serial.print(printbuf);
  //Print message topic
  sprintf(printbuf, "Topic: ");
  Serial.print(printbuf);
  snprintf(printbuf, topic_rec.len+1, "%s", topic_rec.data);
  Serial.println(printbuf);
  //Print payload
  sprintf(printbuf, "Payload: ");
  Serial.print(printbuf);
  snprintf(printbuf, message_rec.payloadlen+1, "%s", (char*)message_rec.payload);
  Serial.println(printbuf);
}

WiFiClient c;
IPStack ipstack(c);
MQTT::Client<IPStack, Countdown> client = MQTT::Client<IPStack, Countdown>(ipstack);

//Connect to WiFi
void connectWiFi()
{
  //Connect to WiFi
  WiFi.begin(ssid, wifipassword);
  while (WiFi.status() != WL_CONNECTED) 
  {
     delay(500);
     Serial.print("*");
  }

  Serial.println("");
  Serial.println("WiFi connection Successful");
  Serial.print("The IP Address of ESP8266 Module is: ");

  //Print the IP address
  Serial.print(WiFi.localIP());

  Serial.print("\n");
}

//Connect to MQTT broker
void connectMQTT()
{
  char printbuf[100];
  char hostname[] = brokerhost;
  int port = brokerport;
  sprintf(printbuf, "Connecting to %s:%d\n", hostname, port);
  Serial.print(printbuf);
  int rc = ipstack.connect(hostname, port);
  if (rc != 1)
  {
    printbuf[100];
    sprintf(printbuf, "rc from TCP connect is %d\n", rc);
    Serial.print(printbuf);
  }

  Serial.println("MQTT connecting");
  MQTTPacket_connectData data = MQTTPacket_connectData_initializer; 
  data.clientID.cstring = MQTTclientID;
  data.username.cstring = MQTTusername;
  data.password.cstring = MQTTpassword;
  data.cleansession = MQTTsession;
  data.willFlag = LWT_flag;      
  data.MQTTVersion = MQTTver;
  data.keepAliveInterval = MQTTkeepAlive;
  MQTTPacket_willOptions LWT = MQTTPacket_willOptions_initializer;
  LWT.topicName.cstring = LWT_topic;
  LWT.message.cstring = LWT_payload;
  LWT.retained = LWT_retained;
  LWT.qos = LWT_qos;
  data.will = LWT;

  rc = client.connect(data);
  if (rc != 0)
  {
    printbuf[100];
    sprintf(printbuf, "rc from MQTT connect is %d\n", rc);
    Serial.print(printbuf);
  }
  Serial.println("MQTT connected");

  const char* topic = "EE579/#";

  rc = client.subscribe(topic, MQTT::QOS2, messageArrived);   
  if (rc != 0)
  {
    printbuf[100];
    sprintf(printbuf, "rc from MQTT subscribe is %d\n", rc);
    Serial.print(printbuf);
  }
  Serial.println("MQTT subscribed");
}

//Publish first message "online"
void msg_online()
{
  MQTT::Message message_pub;
  //Topic
  const char* topic = "EE579/ESP01/status";
  //Payload
  const char* message = "online";
  char payload[100];
  snprintf(payload, strlen(message)+1, "%s", (char*)message);
  message_pub.payload = (void*)payload;
    message_pub.payloadlen = strlen(payload);
  //Settings
  message_pub.qos = MQTT::QOS1;
  message_pub.retained = true;
  message_pub.dup = false;
  //Publish message
  int rc = client.publish(topic, message_pub);
}

//Publish empty message to retain client online
void msg_empty()
{
  MQTT::Message message_pub;
  //Topic
  const char* topic = "e";
  //Payload
  const char* message = "";
  char payload[1];
  snprintf(payload, strlen(message)+1, "%s", (char*)message);
  message_pub.payload = (void*)payload;
    message_pub.payloadlen = strlen(payload);
  //Settings
  message_pub.qos = MQTT::QOS1;
  message_pub.retained = true;
  message_pub.dup = false;
  //Publish message
  int rc = client.publish(topic, message_pub);
}

void setup()
{
  Serial.begin(115200);

  //Connect to WiFi and then to MQTT broker
  connectWiFi();
  connectMQTT();

  //Communicate online status
  msg_online();

  delay(500);
}

void loop()
{
  //20 seconds delay
  delay(20000);

  //If client is not connected, connect it
  if (client.isConnected())
    connectMQTT();

  client.yield(1000);

  //Send empty message to keep client online
  msg_empty();

}

Kind regards, Vur

scaprile commented 3 years ago

I'm too lazy to dig in your code and find what you are doing for you. If you believe either the client or the library does not work properly then write the smallest possible example where this can be reproduced, post how you call the client or the library, what you get back, and confirm that with a network capture showing the flag is indeed not as it should.

dervur commented 3 years ago

This is the specific code I used to send the message:

//Publish first message "online"
void msg_online()
{
  MQTT::Message message_pub;
  //Topic
  const char* topic = "EE579/ESP01/status";
  //Payload
  const char* message = "online";
  char payload[100];
  snprintf(payload, strlen(message)+1, "%s", (char*)message);
  message_pub.payload = (void*)payload;
    message_pub.payloadlen = strlen(payload);
  //Settings
  message_pub.qos = MQTT::QOS1;
  message_pub.retained = true;
  message_pub.dup = false;
  //Publish message
  int rc = client.publish(topic, message_pub);
}

In the library struct Message, retained is bool, whence I set it correctly setting it to true. The publish function I call, in the library is declared as:

template<class Network, class Timer, int MAX_MQTT_PACKET_SIZE, int b>
int MQTT::Client<Network, Timer, MAX_MQTT_PACKET_SIZE, b>::publish(const char* topicName, Message& message)
{
    return publish(topicName, message.payload, message.payloadlen, message.qos, message.retained);
}

This HiveMQ public client shows if the message is retained, as you can see there is a previously published online that is retained, but the new one not. As I said, sometimes it publishes it retained but 99% of the times not... I guess it's a problem of the library, because if with the same code sometimes the message is retained but many times not, the code should be right I guess.... Screenshot 2021-04-08 151848