odelot / aws-mqtt-websockets

Implementation of a middleware to use AWS MQTT service through websockets, aiming the ESP8266 plataform
GNU Lesser General Public License v3.0
231 stars 67 forks source link

Cannot subscribe to 2 topics. Lots of reconnects. #33

Closed jmseight closed 6 years ago

jmseight commented 6 years ago

Hi, I am unable to subscribe to more than 2 topics. Also, there are lots of reconnections, even if I keep publishing.

The code that I changed are

void loop() {
  //keep the mqtt up and running
  if (awsWSclient.connected ()) {
    delay(2000);
    sendmessage();
    client->yield();
  } else {
    //handle reconnection
    if (connect ()) {
      subscribe ();
      sendmessage ();
    }
  }

}

and

void subMessage1(MQTT::MessageData& md) {
  Serial.print("Topic ");
  Serial.println(aws_sub1);
  messageArrived(md);
}
void subMessage2(MQTT::MessageData& md) {
  Serial.print("Topic ");
  Serial.println(aws_sub2);
  messageArrived(md);
}

and

//subscribe to a mqtt topic
void subscribe () {
  //subscript to a topic
  int rc1 = client->subscribe(aws_sub1, MQTT::QOS0, subMessage1);
  if (rc1 != 0) {
    Serial.print("rc from MQTT subscribe is ");
    Serial.println(rc1);
    return;
  }
  Serial.println("MQTT subscribed");

  int rc2 = client->subscribe(aws_sub2, MQTT::QOS0, subMessage2);
  if (rc2 != 0) {
    Serial.print("rc from MQTT subscribe is ");
    Serial.println(rc2);
    return;
  }
  Serial.println("MQTT subscribed");
}

Full code is as follows:

#include <Arduino.h>
#include <Stream.h>

#include <ESP8266WiFi.h>
#include <ESP8266WiFiMulti.h>

//AWS
#include "sha256.h"
#include "Utils.h"

//WEBSockets
#include <Hash.h>
#include <WebSocketsClient.h>

//MQTT PAHO
#include <SPI.h>
#include <IPStack.h>
#include <Countdown.h>
#include <MQTTClient.h>

//AWS MQTT Websocket
#include "Client.h"
#include "AWSWebSocketClient.h"
#include "CircularByteBuffer.h"

//AWS IOT config, change these:
char wifi_ssid[]       = "Amped_TAPR2_2.4";
char wifi_password[]   = "6aeb76df";
char aws_endpoint[]    = "aly36d4whljst.iot.us-west-2.amazonaws.com";
char aws_key[]         = "AKIAJII65UM7AOKSSBTQ";
char aws_secret[]      = "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";
char aws_region[]      = "us-west-2";
const char* aws_pub  = "$aws/things/ESPNODE/shadow/update";
const char* aws_sub1  = "espnode/lcd1";
const char* aws_sub2  = "espnode/lcd2";
int port = 443;

//MQTT config
const int maxMQTTpackageSize = 512;
const int maxMQTTMessageHandlers = 1;

ESP8266WiFiMulti WiFiMulti;

AWSWebSocketClient awsWSclient(1000);

IPStack ipstack(awsWSclient);
MQTT::Client<IPStack, Countdown, maxMQTTpackageSize, maxMQTTMessageHandlers> *client = NULL;

//# of connections
long connection = 0;

//generate random mqtt clientID
char* generateClientID () {
  char* cID = new char[23]();
  for (int i = 0; i < 22; i += 1)
    cID[i] = (char)random(1, 256);
  return cID;
}

//count messages arrived
int arrivedcount = 0;

void subMessage1(MQTT::MessageData& md) {
  Serial.print("Topic ");
  Serial.println(aws_sub1);
  messageArrived(md);
}
void subMessage2(MQTT::MessageData& md) {
  Serial.print("Topic ");
  Serial.println(aws_sub2);
  messageArrived(md);
}

//callback to handle mqtt messages
void messageArrived(MQTT::MessageData& md)
{
  MQTT::Message &message = md.message;

  Serial.print("Message ");
  Serial.print(++arrivedcount);
  Serial.print(" arrived: qos ");
  Serial.print(message.qos);
  Serial.print(", retained ");
  Serial.print(message.retained);
  Serial.print(", dup ");
  Serial.print(message.dup);
  Serial.print(", packetid ");
  Serial.println(message.id);
  Serial.print("Payload ");
  char* msg = new char[message.payloadlen + 1]();
  memcpy (msg, message.payload, message.payloadlen);
  Serial.println(msg);
  delete msg;
}

//connects to websocket layer and mqtt layer
bool connect () {

  if (client == NULL) {
    client = new MQTT::Client<IPStack, Countdown, maxMQTTpackageSize, maxMQTTMessageHandlers>(ipstack);
  } else {

    if (client->isConnected ()) {
      client->disconnect ();
    }
    delete client;
    client = new MQTT::Client<IPStack, Countdown, maxMQTTpackageSize, maxMQTTMessageHandlers>(ipstack);
  }

  //delay is not necessary... it just help us to get a "trustful" heap space value
  delay (1000);
  Serial.print (millis ());
  Serial.print (" - conn: ");
  Serial.print (++connection);
  Serial.print (" - (");
  Serial.print (ESP.getFreeHeap ());
  Serial.println (")");

  int rc = ipstack.connect(aws_endpoint, port);
  if (rc != 1)
  {
    Serial.println("error connection to the websocket server");
    return false;
  } else {
    Serial.println("websocket layer connected");
  }

  Serial.println("MQTT connecting");
  MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  data.MQTTVersion = 3;
  char* clientID = generateClientID ();
  data.clientID.cstring = clientID;
  rc = client->connect(data);
  delete[] clientID;
  if (rc != 0)
  {
    Serial.print("error connection to MQTT server");
    Serial.println(rc);
    return false;
  }
  Serial.println("MQTT connected");
  return true;
}

//subscribe to a mqtt topic
void subscribe () {
  //subscript to a topic
  int rc1 = client->subscribe(aws_sub1, MQTT::QOS0, subMessage1);
  if (rc1 != 0) {
    Serial.print("rc from MQTT subscribe is ");
    Serial.println(rc1);
    return;
  }
  Serial.println("MQTT subscribed");

  int rc2 = client->subscribe(aws_sub2, MQTT::QOS0, subMessage2);
  if (rc2 != 0) {
    Serial.print("rc from MQTT subscribe is ");
    Serial.println(rc2);
    return;
  }
  Serial.println("MQTT subscribed");
}

//send a message to a mqtt topic
void sendmessage () {
  //send a message
  MQTT::Message message;
  char buf[100];
  strcpy(buf, "{\"state\":{\"reported\":{\"on\": false}, \"desired\":{\"on\": false}}}");
  message.qos = MQTT::QOS0;
  message.retained = false;
  message.dup = false;
  message.payload = (void*)buf;
  message.payloadlen = strlen(buf) + 1;
  int rc = client->publish(aws_pub, message);
}

void setup() {
  Serial.begin (115200);
  delay (2000);
  Serial.setDebugOutput(1);

  //fill with ssid and wifi password
  WiFiMulti.addAP(wifi_ssid, wifi_password);
  Serial.println ("connecting to wifi");
  while (WiFiMulti.run() != WL_CONNECTED) {
    delay(100);
    Serial.print (".");
  }
  Serial.println ("\nconnected");
  configTime(3 * 3600, 0, "pool.ntp.org", "time.nist.gov");
  //fill AWS parameters
  awsWSclient.setAWSRegion(aws_region);
  awsWSclient.setAWSDomain(aws_endpoint);
  awsWSclient.setAWSKeyID(aws_key);
  awsWSclient.setAWSSecretKey(aws_secret);
  awsWSclient.setUseSSL(true);

  if (connect ()) {
    subscribe ();
    sendmessage ();
  }

}

void loop() {
  //keep the mqtt up and running
  if (awsWSclient.connected ()) {
    delay(2000);
    sendmessage();
    client->yield();
  } else {
    //handle reconnection
    if (connect ()) {
      subscribe ();
      sendmessage ();
    }
  }

}
handgear commented 6 years ago

change maxMQTTMessageHandlers to 2 ps. uploading IAM key to open web is not a good idea

odelot commented 6 years ago

@jmseight using with pubsubclient fixed the reconnection issue