Open theRosyProject opened 8 months ago
I was able to achieve the desired result of accessing the last published payload in the serial output. I've used a mixed trial and error approach with the help of ChatGPT, with the following modification of the uMQTTBroker.h and uMQTTBroker.cpp files. Attaching the final version of the files
uMQTTBroker *uMQTTBroker::TheBroker;
// Get access to last payload F.Dallo char uMQTTBroker::lastPayload[uMQTTBroker::MAX_PAYLOAD_SIZE] = {0};
const char* uMQTTBroker::getLastPayload() {
return lastPayload;
}
void uMQTTBroker::clearLastPayload() {
lastPayload[0] = '\0';
}
// end F.Dallo
bool uMQTTBroker::_onConnect(struct espconn *pesp_conn, uint16_t client_count) {
IPAddress connAddr(pesp_conn->proto.tcp->remote_ip[0], pesp_conn->proto.tcp->remote_ip[1],
pesp_conn->proto.tcp->remote_ip[2], pesp_conn->proto.tcp->remote_ip[3]);
return TheBroker->onConnect(connAddr, client_count);
}
void uMQTTBroker::_onDisconnect(struct espconn *pesp_conn, const char *client_id) {
IPAddress connAddr(pesp_conn->proto.tcp->remote_ip[0], pesp_conn->proto.tcp->remote_ip[1],
pesp_conn->proto.tcp->remote_ip[2], pesp_conn->proto.tcp->remote_ip[3]);
TheBroker->onDisconnect(connAddr, (String)client_id);
}
bool uMQTTBroker::_onAuth(const char* username, const char *password, const char* client_id, struct espconn *pesp_conn) {
return TheBroker->onAuth((String)username, (String)password, (String)client_id);
}
void uMQTTBroker::_onData(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t length) {
char topic_str[topic_len+1];
os_memcpy(topic_str, topic, topic_len);
topic_str[topic_len] = '\0';
// Get access to last payload F.Dallo length = (length < MAX_PAYLOAD_SIZE) ? length : MAX_PAYLOAD_SIZE - 1; memcpy(lastPayload, data, length); lastPayload[length] = '\0'; // TheBroker->onData((String)topic_str, data, length);
}
uMQTTBroker::uMQTTBroker(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics) {
TheBroker = this;
_portno = portno;
_max_subscriptions = max_subscriptions;
_max_retained_topics = max_retained_topics;
MQTT_server_onConnect(_onConnect);
MQTT_server_onDisconnect(_onDisconnect);
MQTT_server_onAuth(_onAuth);
MQTT_server_onData(_onData);
}
void uMQTTBroker::init() {
MQTT_server_start(_portno, _max_subscriptions, _max_retained_topics);
}
bool uMQTTBroker::onConnect(IPAddress addr, uint16_t client_count) {
return true;
}
void uMQTTBroker::onDisconnect(IPAddress addr, String client_id) {
return;
}
bool uMQTTBroker::onAuth(String username, String password, String client_id) {
return true;
}
void uMQTTBroker::onData(String topic, const char *data, uint32_t length) {
}
bool uMQTTBroker::publish(String topic, uint8_t* data, uint16_t data_length, uint8_t qos, uint8_t retain) {
return MQTT_local_publish((uint8_t*)topic.c_str(), data, data_length, qos, retain);
}
uint16_t uMQTTBroker::getClientCount() {
return MQTT_server_countClientCon();
}
bool uMQTTBroker::getClientId(uint16_t index, String &client_id) {
const char *c = MQTT_server_getClientId(index);
if (c == NULL)
return false;
client_id = c;
return true;
}
bool uMQTTBroker::getClientAddr(uint16_t index, IPAddress& addr) {
const struct espconn* pesp_conn = MQTT_server_getClientPcon(index);
if (pesp_conn == NULL)
return false;
addr = pesp_conn->proto.tcp->remote_ip;
return true;
}
bool uMQTTBroker::publish(String topic, String data, uint8_t qos, uint8_t retain) {
return MQTT_local_publish((uint8_t*)topic.c_str(), (uint8_t*)data.c_str(), data.length(), qos, retain);
}
bool uMQTTBroker::subscribe(String topic, uint8_t qos) {
return MQTT_local_subscribe((uint8_t*)topic.c_str(), qos);
}
bool uMQTTBroker::unsubscribe(String topic) {
return MQTT_local_unsubscribe((uint8_t*)topic.c_str());
}
void uMQTTBroker::cleanupClientConnections() {
MQTT_server_cleanupClientCons();
}
extern "C" {
// Interface for starting the broker
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
// Callbacks for message reception, username/password authentication, and client connection
typedef void (MqttDataCallback)(uint32_t args, const char topic, uint32_t topic_len, const char data, uint32_t lengh); typedef bool (MqttAuthCallback)(const char username, const char password, const char client_id, struct espconn pesp_conn); typedef bool (MqttConnectCallback)(struct espconn pesp_conn, uint16_t client_count); typedef void (MqttDisconnectCallback)(struct espconn pesp_conn, const char client_id);
void MQTT_server_onData(MqttDataCallback dataCb); void MQTT_server_onAuth(MqttAuthCallback authCb); void MQTT_server_onConnect(MqttConnectCallback connectCb); void MQTT_server_onDisconnect(MqttDisconnectCallback disconnectCb);
// Interface for local pub/sub interaction with the broker
bool MQTT_local_publish(uint8_t topic, uint8_t data, uint16_t data_length, uint8_t qos, uint8_t retain); bool MQTT_local_subscribe(uint8_t topic, uint8_t qos); bool MQTT_local_unsubscribe(uint8_t topic);
// Interface to cleanup after STA disconnect
void MQTT_server_cleanupClientCons();
// Interface for persistence of retained topics // Topics can be serialized to a buffer and reinitialized later after reboot // Application is responsible for saving and restoring that buffer (i.e. to/from flash)
void clear_retainedtopics(); int serialize_retainedtopics(char buf, int len); bool deserialize_retainedtopics(char buf, int len);
// Interface for getting some infos on the currently connected clients // MQTT_server_getClientId() and MQTT_server_getClientPcon() return NULL on invalid indices
uint16_t MQTT_server_countClientCon(); const char MQTT_server_getClientId(uint16_t index); const struct espconn MQTT_server_getClientPcon(uint16_t index); }
class uMQTTBroker { private: static uMQTTBroker *TheBroker; uint16_t _portno; uint16_t _max_subscriptions; uint16_t _max_retained_topics;
static bool _onConnect(struct espconn *pesp_conn, uint16_t client_count);
static void _onDisconnect(struct espconn *pesp_conn, const char *client_id);
static bool _onAuth(const char* username, const char *password, const char *client_id, struct espconn *pesp_conn);
static void _onData(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t length);
// Get access to last payload F.Dallo static const int MAX_PAYLOAD_SIZE = 512; static char lastPayload[MAX_PAYLOAD_SIZE];
public: uMQTTBroker(uint16_t portno=1883, uint16_t max_subscriptions=30, uint16_t max_retained_topics=30);
void init();
// Callbacks on client actions
virtual bool onConnect(IPAddress addr, uint16_t client_count);
virtual void onDisconnect(IPAddress addr, String client_id);
virtual bool onAuth(String username, String password, String client_id);
virtual void onData(String topic, const char *data, uint32_t length);
// Infos on currently connected clients
virtual uint16_t getClientCount();
virtual bool getClientId(uint16_t index, String &client_id);
virtual bool getClientAddr(uint16_t index, IPAddress& addr);
// Interaction with the local broker
virtual bool publish(String topic, uint8_t* data, uint16_t data_length, uint8_t qos=0, uint8_t retain=0);
virtual bool publish(String topic, String data, uint8_t qos=0, uint8_t retain=0);
virtual bool subscribe(String topic, uint8_t qos=0);
virtual bool unsubscribe(String topic);
// Get access to last payload F.Dallo static const char* getLastPayload(); static void clearLastPayload();
// Cleanup all clients on Wifi connection loss
void cleanupClientConnections();
};
extern "C" {
// Interface for starting the broker
bool MQTT_server_start(uint16_t portno, uint16_t max_subscriptions, uint16_t max_retained_topics);
// Callbacks for message reception, username/password authentication, and client connection
typedef void (MqttDataCallback)(uint32_t args, const char topic, uint32_t topic_len, const char data, uint32_t lengh); typedef bool (MqttAuthCallback)(const char username, const char password, const char client_id, struct espconn pesp_conn); typedef bool (MqttConnectCallback)(struct espconn pesp_conn, uint16_t client_count); typedef void (MqttDisconnectCallback)(struct espconn pesp_conn, const char client_id);
void MQTT_server_onData(MqttDataCallback dataCb); void MQTT_server_onAuth(MqttAuthCallback authCb); void MQTT_server_onConnect(MqttConnectCallback connectCb); void MQTT_server_onDisconnect(MqttDisconnectCallback disconnectCb);
// Interface for local pub/sub interaction with the broker
bool MQTT_local_publish(uint8_t topic, uint8_t data, uint16_t data_length, uint8_t qos, uint8_t retain); bool MQTT_local_subscribe(uint8_t topic, uint8_t qos); bool MQTT_local_unsubscribe(uint8_t topic);
// Interface to cleanup after STA disconnect
void MQTT_server_cleanupClientCons();
// Interface for persistence of retained topics // Topics can be serialized to a buffer and reinitialized later after reboot // Application is responsible for saving and restoring that buffer (i.e. to/from flash)
void clear_retainedtopics(); int serialize_retainedtopics(char buf, int len); bool deserialize_retainedtopics(char buf, int len);
// Interface for getting some infos on the currently connected clients // MQTT_server_getClientId() and MQTT_server_getClientPcon() return NULL on invalid indices
uint16_t MQTT_server_countClientCon(); const char MQTT_server_getClientId(uint16_t index); const struct espconn MQTT_server_getClientPcon(uint16_t index); }
class uMQTTBroker { private: static uMQTTBroker *TheBroker; uint16_t _portno; uint16_t _max_subscriptions; uint16_t _max_retained_topics;
static bool _onConnect(struct espconn *pesp_conn, uint16_t client_count);
static void _onDisconnect(struct espconn *pesp_conn, const char *client_id);
static bool _onAuth(const char* username, const char *password, const char *client_id, struct espconn *pesp_conn);
static void _onData(uint32_t *args, const char* topic, uint32_t topic_len, const char *data, uint32_t length);
// Get access to last payload F.Dallo static const int MAX_PAYLOAD_SIZE = 512; static char lastPayload[MAX_PAYLOAD_SIZE];
public: uMQTTBroker(uint16_t portno=1883, uint16_t max_subscriptions=30, uint16_t max_retained_topics=30);
void init();
// Callbacks on client actions
virtual bool onConnect(IPAddress addr, uint16_t client_count);
virtual void onDisconnect(IPAddress addr, String client_id);
virtual bool onAuth(String username, String password, String client_id);
virtual void onData(String topic, const char *data, uint32_t length);
// Infos on currently connected clients
virtual uint16_t getClientCount();
virtual bool getClientId(uint16_t index, String &client_id);
virtual bool getClientAddr(uint16_t index, IPAddress& addr);
// Interaction with the local broker
virtual bool publish(String topic, uint8_t* data, uint16_t data_length, uint8_t qos=0, uint8_t retain=0);
virtual bool publish(String topic, String data, uint8_t qos=0, uint8_t retain=0);
virtual bool subscribe(String topic, uint8_t qos=0);
virtual bool unsubscribe(String topic);
// Get access to last payload F.Dallo static const char* getLastPayload(); static void clearLastPayload();
// Cleanup all clients on Wifi connection loss
void cleanupClientConnections();
};
Then in the Arduino code setup:
myBroker.subscribe("windsled/thespecifictopicofinterest");
and loop:
// Process the last received payload const char* payload = uMQTTBroker::getLastPayload(); if (payload[0] != '\0') { // Check if there's a new payload Serial.println(payload); // Example: print payload // Clear the payload after processing to avoid processing it multiple times uMQTTBroker::clearLastPayload(); }
Thank you for developing this library. I've been using it to program ESP8266 devices with ESPHome for sensor data collection (including temperature, relative humidity, and air quality) and to transmit the data as MQTT payloads to an ESP8266 MQTT broker that also serves as the WiFi access point. This setup works flawlessly. However, I am now looking to enhance it by processing and storing the incoming messages on an SD card. Indeed, while I can handle the data in Python on my ubuntu system by reading the messages (mosquito_sub) and storing them in a dataframe, I am also interested in implementing this functionality directly on the ESP8266, saving measurements in an SD card and eventually using the sd.h library which I've used in the past. This would involve accessing the payload and storing it as a character array within the loop function. Since I am not very familiar with C, I am uncertain whether a function already exists in your library that I could use for this purpose or if I need to modify the onData() function in the uMQTTBroker.cpp file(?). Any guidance or suggestions you can provide would be greatly appreciated. Thank you for your support and again, great library! Best regards.