monstrenyatko / ArduinoMqtt

MQTT client for Arduino
MIT License
72 stars 13 forks source link

Receiving messages on subscription as publish packets #19

Closed rapasal closed 5 years ago

rapasal commented 5 years ago

I am getting some weird behavior where an incoming message on the subscribe topic seems to be handled as a publish packet, then instead of being handled by my callback function is reported as an unexpected message.

If it is relevant, I am using Sloeber (Eclipse), the board is an ESP12F and I am connecting using WiFiClientSecure. Publishing works perfectly.

The log displays the following when a message is published by the broker on the subscription topic.

MQTT - Process message, type: 3
MQTT - Publish received, qos: 0
MQTT - Unexpected message

I have wrapped the MQTT functionality in a separate class. Code is as follows.

in project.cpp

void setup () {
  wifiWrapper.connect("ssid", "password");
  mqttWrapper.setup();
  mqttWrapper.loadCertificate();
  yield();
  mqttWrapper.connect("mqtt-server.com", 8883, "username", "password");
  mqttWrapper.mqtt->subscribe("test/subscribe", MqttClient::QOS0, subCallback);
  yield();
}

void loop () {
  mqttWrapper.mqtt->yield(10000);
}

in project.h

void subCallback (MqttClient::MessageData& md) {
  const MqttClient::Message& msg = md.message;
  char payload[msg.payloadLen + 1];
  memcpy(payload, msg.payload, msg.payloadLen);
  payload[msg.payloadLen] = '\0';
  Serial.println(payload);
}

in mqttWrapper.cpp

void MqttWrapper::setup () {
  MqttClient::System *mqttSystem = new System;
  MqttClient::Logger *mqttLogger = new MqttClient::LoggerImpl<HardwareSerial>(Serial);
  MqttClient::Network * mqttNetwork = new MqttClient::NetworkClientImpl<WiFiClient>(wifiClientSecure, *mqttSystem);
  MqttClient::Buffer *mqttSendBuffer = new MqttClient::ArrayBuffer<MQTT_BUFFER_SIZE>();
  MqttClient::Buffer *mqttRecvBuffer = new MqttClient::ArrayBuffer<MQTT_BUFFER_SIZE>();
  MqttClient::MessageHandlers *mqttMessageHandlers = new MqttClient::MessageHandlersImpl<MQTT_SIMULTANEOUS_SUBS>();

  MqttClient::Options mqttOptions;
  mqttOptions.commandTimeoutMs = 10000;

  mqtt = new MqttClient(
    mqttOptions, *mqttLogger, *mqttSystem, *mqttNetwork, *mqttSendBuffer,
    *mqttRecvBuffer, *mqttMessageHandlers
  );
}

int8_t MqttWrapper::loadCertificate () {
  yield();
  int8_t status = 0;
  Serial.println("Loading CA certificate...");
  if (!SPIFFS.begin()) {
    Serial.println("Failed to mount SPIFFS");
    status = -1;
    return status;
  }
  Serial.println("SPIFFS mounted");
  Serial.println(caCertPath);
  File caCert = SPIFFS.open(caCertPath, "r");
  if (!caCert) {
    Serial.println("Failed to open CA certificate");
    status = -2;
    return status;
  }
  Serial.println("CA certificate opened");

  if (!wifiClientSecure.loadCACert(caCert)) {
    Serial.println("CA certificate failed to load into client");
    status = -3;
    return status;
  }
  Serial.println("CA certificate loaded into client");
  status = 1;
  return status;
}

int8_t MqttWrapper::connect (char server[STR_BUFFER_LENGTH], uint16_t port, char username[STR_BUFFER_LENGTH], char password[STR_BUFFER_LENGTH]) {
  int8_t status = 0;
  Serial.println("Attempting TCP/MQTT connection...");

  wifiClientSecure.connect(server, port);
  if (wifiClientSecure.connected()) {
    Serial.println("Secure TCP connection established");
  } else {
    Serial.println("Secure TCP connection failed");
    status = -1;
    return status;
  }

  Serial.println("Attempting MQTT connection");

  MqttClient::ConnectResult connectResult;
  MQTTPacket_connectData options = MQTTPacket_connectData_initializer;
  options.MQTTVersion = 4;
  options.clientID.cstring = deviceId;
  options.cleansession = true;
  options.keepAliveInterval = 15;
  options.username.cstring = username;
  options.password.cstring = password;

  MqttClient::Error::type returnCode = mqtt->connect(options, connectResult);

  if (returnCode == MqttClient::Error::SUCCESS) {
    Serial.println("MQTT connection successful");
    status = 1;
  } else {
    Serial.println("MQTT connection failed");
    status = 0;
  }

  return status;
}

Any glaring errors? Or any ideas where to start looking?

monstrenyatko commented 5 years ago

Hi @rapasal What broker do you use? Please provide a full debug log. See #define MQTT_LOG_ENABLED 1

I didn't notice anything very wrong from the first look. Need to debug. Try to add more logs to void deliverMessage(MQTTString& topic, Message& message) {} method. It looks like the client can't match the topic value with the value used on subscribe. You use clean session, so the broker should not send messages until you subscribe and set the handler... Verify the topic value is sent by the broker

Try to reproduce the same using public MQTT broker like Mosquitto and on x86 platform. See example It could be easier to debug on PC.

rapasal commented 5 years ago

I reformatted to bring the code listed above out of the MqttWrapper class and it works fine! For some reason wrapping it up introduces this behavior. I will do some experimenting today with some logic wrapped and some just in setup() and loop() to see if I can spot what breaks it. I'll add some extra logs like you suggest too.

rapasal commented 5 years ago

OK, so I have been playing around and found that the problem is when calling mqtt->subscribe(...). In my example above I had a hard coded topic. I got myself in a muddle before posting and was actually calling this with a char array instead of a hard coded string when I got the error.

char subTopic[64];
functionToPopulateSubTopic(subTopic);
mqttWrapper.mqtt->subscribe(subTopic, MqttClient::QOS0, subCallback);

When I hard code as per my initial example it actually works! When I pass the char array in I get the error described. Having the code in a wrapper has no effect. I can confirm that running the ConnectEsp8266WiFiClientexample in Arduino IDE also shows the same error when using a char array as the subscription topic.

When I pass a char array to mqtt->publish(...) it works fine. I noticed that the topic argument for both subscribe and publish are converted into a MQTTString differently, and tried to use the implementation from publish in the subscribe method but no luck.

I added the following line to deliverMessage(...) inside the for loop:

MQTT_LOG_PRINTFLN("Incoming topic: %s", (char*)(mMessageHandlers.get()[i].topic));

This threw up something strange... When I hard code the subscription address this prints the correct topic. When I pass the subscription address as a char array it prints the publish topic! I know that the char array I pass definitely contains the subscription topic as I print it right before calling subscribe(...).

I'm not sure what the solution here is. Hard coding isn't really an option as I want to be able to store the topics in arrays so that they can be edited at run time.

monstrenyatko commented 5 years ago

@rapasal I think your problem with a string allocated on stack. When you use this library or the original Paho library you have to assume that nothing is copied by default. You have full memory control. That is good for embedded systems. The topic is stored by reference in default implementation. Look at

I guess this is exactly what you need to avoid hardcoded topics. If you know the maximum topic length I would suggest using MessageHandlersStaticImpl

I see no problem with the default MessageHandlers implementation if you guaranty the topic pointer is valid until you unsubscribe. You can organize own topic storage or write custom MessageHandlers implementation if you wish

The library is flexible enough to provide everything externally with no need for library modification.

rapasal commented 5 years ago

Yes! That's just what I needed. Thanks for your help. Great library by the way!