knolleary / pubsubclient

A client library for the Arduino Ethernet Shield that provides support for MQTT.
http://pubsubclient.knolleary.net/
MIT License
3.82k stars 1.47k forks source link

Publish() perodically will fail to send to MQTT server #958

Open ceilingduster opened 2 years ago

ceilingduster commented 2 years ago

I'm completely stumped.

If I were to replace my .publish() with something simple like .publish("topic", 1);... it'll work flawlessly. If I send it a String of base64 encoded data (which is approximately 128 characters), it'll eventually fail if run long enough. No fault occurs, just no data is sent and fails silently.

Has anyone experienced this?

#include <M5Core2.h>
#include <Arduino.h>
#include <lvgl.h>
#include <Wire.h>
#include <SPI.h>
#include <Adafruit_NeoPixel.h>
#include <WiFi.h>
#include <AsyncTCP.h>
#include <ESPAsyncWebServer.h>
#include <ArduinoNvs.h>
#include <WiFiClientSecure.h>
#include <base64.h>

// mqtt
#include <PubSubClient.h>

// project libs
#include "patternlib.h"
#include "screenlib.h"
#include "randomlibs.h"

#include <ArduinoJson.h>

// create a task handle for mqtt updates
TaskHandle_t TaskMQTTUpdate;

// wifi mac
String macaddress = WiFi.macAddress();

// mqtt root ca for hivemq
static const char *root_ca PROGMEM = R"EOF(
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
)EOF";

// mqtt wifi
WiFiClientSecure espClient;
PubSubClient mqtt_client(espClient);

// callback function for the rings
void ringsComplete() {}
PatternLib rings(TOTALPIXELS, PIN, RGBTYPE + NEO_KHZ800, &ringsComplete);

// define API Key globally
String APIKey;

// mqtt server hostname
const char *mqtt_server = "172.16.0.20";

void mqtt_callback(char *topic, byte *message, unsigned int length)
{
  String payload;

  // convert byte to String
  for (int i = 0; i < length; i++)
    payload += (char)message[i];

  Serial.println(payload);
}

String report_light_status()
{
  // for mqtt and web status msgs
  DynamicJsonDocument json_doc(1024);
  String output;

  constexpr uint32_t blue_offset{0x00};
  constexpr uint32_t green_offset{0x08}; // 8 decimal
  constexpr uint32_t red_offset{0x10};   // 16 decimal
  constexpr uint32_t byte_mask{0xFF};
  constexpr uint32_t blue_mask{byte_mask << blue_offset};
  constexpr uint32_t green_mask{byte_mask << green_offset};
  constexpr uint32_t red_mask{byte_mask << red_offset};

  for (int i = 0; i <= rings.numberRings - 1; i++)
  {
    json_doc[macaddress][i]["pattern"] = rings.ActivePattern[i];
    json_doc[macaddress][i]["interval"] = rings.Interval[i];

    // do colours
    json_doc[macaddress][i]["color"]["r"] = (rings.pixelColor[i] & red_mask >> red_offset) & byte_mask;
    json_doc[macaddress][i]["color"]["g"] = (rings.pixelColor[i] & green_mask >> green_offset) & byte_mask;
    json_doc[macaddress][i]["color"]["b"] = (rings.pixelColor[i] & blue_mask >> blue_offset) & byte_mask;
  }
  json_doc["uptime"] = (unsigned int)millis(); // cast long to int
  json_doc["schema_version"] = "1.0";

  serializeJson(json_doc, output);
  String encoded = base64::encode(output);
  return encoded;
}

void mqtt_connect()
{
  // connect mqtt
  espClient.setCACert(root_ca);
  espClient.setInsecure();
  mqtt_client.setServer(mqtt_server, 8883);

  if (mqtt_client.connect(macaddress.c_str()))
  {
    Serial.println("Attempting to connect to MQTT broker ...");
    mqtt_client.setCallback(&mqtt_callback);
    Serial.print("Subscribing: ");
    Serial.println(mqtt_client.subscribe("towerlight/command"));
  }
  else
  {
    Serial.println("Could not connect to MQTT broker.");
  }
}

void update_mqtt()
{
  if (mqtt_client.state() != MQTT_CONNECTED)
  {
    mqtt_connect();
  }
  else
  {
    const char * output = report_light_status().c_str();
    mqtt_client.publish("towerlight/update", output);
    Serial.println("Updating MQTT broker ...");
  }
}

int lastUpdate = millis();
void MQTTUpdate( void * parameter) {
  for(;;) {    
    if (millis() - lastUpdate >= 5000) {
      update_mqtt();
      lastUpdate = millis();
    }
    mqtt_client.loop();
    delay(50);
  }
}

void start_webserver()
{
}

// for storing the nvs values
String nvs_ssid_config;
String nvs_psk_config;

// on connection, write nvs values
static void WiFiEvent(WiFiEvent_t event)
{
  switch (event)
  {
  case SYSTEM_EVENT_STA_CONNECTED:
    Serial.println("Saving WiFi NVS SSID and PSK.");

    // Save values to NVS
    bool res;
    res = NVS.setString("ssid", WiFi.SSID());
    res = NVS.setString("psk", WiFi.psk());

    setMessage(""); // wifi is connected, clear the spinner label
    start_webserver();

    // for debug purposes
    rings.pixelColor[1] = rings.Color(255, 255, 0);
    rings.Interval[1] = 250;
    rings.ActivePattern[1] = rings.GetPattern(6);
    updateRing(1, String(rings.ActivePattern[1]).c_str());

    break;
  case SYSTEM_EVENT_STA_DISCONNECTED:
    setMessage("Reconnecting ...");
    WiFi.reconnect();
    break;
  }
}

void setup()
{
  // initialize NVS
  NVS.begin();

  // initialize the screen
  tft_lv_initialization();
  init_disp_driver();
  init_touch_driver();
  start_screen_task();

  // generate an API Key
  APIKey = generateAPIKey();

  WiFi.mode(WIFI_AP_STA);
  WiFi.onEvent(WiFiEvent);
  WiFi.setAutoReconnect(true); // auto reconnect

  nvs_ssid_config = NVS.getString("ssid");
  nvs_psk_config = NVS.getString("psk");

  // if no nvs settings, start smart config
  if (nvs_ssid_config == NULL || nvs_ssid_config == "")
  {
    Serial.println("Starting Smart Config.");
    setMessage("Use Smart Config on your phone.");
    WiFi.beginSmartConfig();
  }
  else
  { // otherwise connect to the wifi creds and network we have
    setMessage("Connecting ...");
    Serial.println("Retrieved wifi settings for SSID and PSK: ");
    Serial.print(nvs_ssid_config);
    Serial.print("/");
    Serial.println(nvs_psk_config);
    WiFi.begin(nvs_ssid_config.c_str(), nvs_psk_config.c_str());
  }

  mqtt_client.setKeepAlive(15);
  mqtt_client.setSocketTimeout(15);

  xTaskCreatePinnedToCore(
      MQTTUpdate, /* Function to implement the task */
      "TaskMQTTUpdates", /* Name of the task */
      10000,  /* Stack size in words */
      NULL,  /* Task input parameter */
      0,  /* Priority of the task */
      &TaskMQTTUpdate,  /* Task handle. */
      0); /* Core where the task should run */

  // give it time to catch up
  delay(500);
}

void loop()
{
  lv_task_handler();
  rings.Update();
}
ceilingduster commented 2 years ago

Is there a maximum packet size that publish() supports?

knolleary commented 2 years ago

Yes. As described in the readme and the docs, the default max packet size is 256 bytes. for a publish, that needs to cover the topic and payload, plus a handful of header bytes.