marvinroger / async-mqtt-client

📶 An Arduino for ESP8266 asynchronous MQTT client implementation
MIT License
834 stars 266 forks source link

Handling partial messages (TCP Buffer Filled) #98

Closed Cosford closed 3 years ago

Cosford commented 6 years ago

Hi, a question I'm hoping someone can help with.

My application uses ~100 topics with retained messages up to 100 bytes in each. It seems that when these get sent from the broker to the ESP8266 I'm using, occasionally the message in one particular topic gets split into two calls to the onMessage callback function. Some reading seems to suggest this is the case if there is not enough space in the TCP buffer.

How could one go about handling this in the most elegant way to recombine the packet? Can this buffer be expanded from whatever the default value is, perhaps, as an alleviation?

Thanks in advance.

timpur commented 6 years ago

Have a look at the docs https://github.com/marvinroger/async-mqtt-client/blob/master/docs/3.-Memory-management.md

To my knowledge you can not change the buffer size, as i understand, onMessage is called one to one with tcp packets received and the size of each packet depends on the size selected when the firmware is built in the arduino IDE. That's why we have size_t len, size_t index, size_t total. You'll have to buffer your own messages. total refers to the total size of the payload, index refers to the offset for a particular packet of the total size, length refers to the size of this packet.

eg. with max packet size of 100 bytes (MTU)

packet # index length total
1 0 100 350
2 100 100 350
3 200 100 350
4 300 50 350

from https://github.com/marvinroger/homie-esp8266/blob/99161ce03c7ff1c8d3dc1fcc52abbf081212b68f/src/Homie/Boot/BootConfig.cpp#L417

void onMessage(..., uint8_t *data, size_t len, size_t index, size_t total) {
  if (total > MAX_SIZE) {
    Serial.println(F("Request is to large to be processed."));
  } else {
    if (index == 0) { // at start
      _tempObject = new char[total + 1];
    }
    char* buff = _tempObject + index;
    memcpy(buff, data, len);
    if (index + len == total) { // at end
      char* buff =  _tempObject + total;
      *buff = '\0'; // terminate c style string
    }
  }
}

Your buffer code could look something like ^ this is an adaptation used in homie. (this will not work out of the box)

Cosford commented 6 years ago

Thanks for your prompt and full response. I hadn't realised that this was the purpose of the 'total' argument and I can see now that it will allow me to identify what is a partial message.

As a good solution, I'll look to implement something akin to that suggested, although as a matter of curiosity, you mention "the size of each packet depends on the size selected when the firmware is built in the arduino IDE". Is it possible to modify this, statically? And if so, where could this be done?

This could function as a quick, interim solution until I have more time to devote to building a proper solution.

timpur commented 6 years ago

The TCP buffer size is down to the lower TCP/IP stack lib which is precompiled and included in the build by the Arduino IDE, you can change this in your sketch when you upload via the tools menu drop down under lwIP (think this is a 2.4 option only). The lwIP (lightweight IP) option changes the size of the packet size used, refered to as the MTU size. You can play around with this, but issues my occurred if you have an older internet connection....

Cosford commented 6 years ago

Hi again, thanks for your help. I've implemented (although yet-to-test) a solution similar to that of Homie.

The only thing bothering me about the implementation, is that for QoS 2, an acknowledgement of the message appears to be sent even on only a fragment of the message (as the library does not take responsibility for re-assembling messages split across TCP packets). I haven't read the MQTT docs enough to understand if this is conformal or not, but it seems erroneous that an acknowledgement can be sent on only a fragment of a message being delivered and thus undermining the usage of Qos2?

timpur commented 6 years ago

Good point. If that is the case then we should try fix that. I'd need to look into it more..... Thanks for the feedback

romansavrulin commented 6 years ago

@Cosford

I don't see memory free in the example given by @timpur , so there's a definitely memleak. You can use more intuitive cpp way of maintain buffer and partial messages


String payloadBuf;

mqtt.onMessage([this](char* topic, char* payload, AsyncMqttClientMessageProperties properties, size_t len, size_t index, size_t total) {
    if (index == 0) {
        payloadBuf = "";
    }

    auto pl = len;
    auto p = payload;
    while (pl--) {
        payloadBuf += *(p++);
    }

    if (index + len == total) {
        /// Do your stuff with complete payloadBuf here
    }
})
timpur commented 6 years ago

The example I gave was a non working example and lacked some critical things like memory managment.... The snipit I got for the example had memory management else where. Sorry for the confusion. Definitely free your dynamic memory !!!

bertmelis commented 3 years ago

The only thing bothering me about the implementation, is that for QoS 2, an acknowledgement of the message appears to be sent even on only a fragment of the message (as the library does not take responsibility for re-assembling messages split across TCP packets). I haven't read the MQTT docs enough to understand if this is conformal or not, but it seems erroneous that an acknowledgement can be sent on only a fragment of a message being delivered and thus undermining the usage of Qos2?

this should be taken care of here: https://github.com/marvinroger/async-mqtt-client/blob/2991968a97193aaa6402d146490b93ea671c7e02/src/AsyncMqttClient/Packets/PublishPacket.cpp#L87

Feel free to reopen if this is still an issue