bertmelis / espMqttClient

MQTT 3.1.1 client library for the Espressif devices ESP8266 and ESP32 on the Arduino framework.
https://www.emelis.net/espMqttClient/
MIT License
92 stars 21 forks source link

Large payload with stream #126

Closed Laxilef closed 6 months ago

Laxilef commented 6 months ago

Hi, great library, thanks! I have a question. Is it possible to publish a payload in chunks like in Print/Stream? I'm using ArduinoJson to create a large object, serialize it and publish in to mqtt. ArduinoJson supports chunk serialization in Stream, Print and custom Writer.

Right now I'm using this code:

  bool publish(const char* topic, JsonDocument& doc) {
    if (this->client == nullptr) {
      return false;
    }

    doc[FPSTR(HA_DEVICE)][FPSTR(HA_IDENTIFIERS)][0] = devicePrefix;
    doc[FPSTR(HA_DEVICE)][FPSTR(HA_SW_VERSION)] = deviceVersion;

    if (deviceManufacturer) {
      doc[FPSTR(HA_DEVICE)][FPSTR(HA_MANUFACTURER)] = deviceManufacturer;
    }

    if (deviceModel) {
      doc[FPSTR(HA_DEVICE)][FPSTR(HA_MODEL)] = deviceModel;
    }

    if (deviceName) {
      doc[FPSTR(HA_DEVICE)][FPSTR(HA_NAME)] = deviceName;
    }

    if (deviceConfigUrl) {
      doc[FPSTR(HA_DEVICE)][FPSTR(HA_CONF_URL)] = deviceConfigUrl;
    }

    Log.straceln("MQTT", topic);

    size_t docSize = measureJson(doc);
    //char* buffer = (char*) malloc(docSize * sizeof(*buffer));
    char buffer[docSize + 1];
    size_t length = serializeJson(doc, buffer, docSize + 1);
    doc.clear();
    doc.shrinkToFit();

    bool pubResult = false;
    if (length != 0) {
      pubResult = this->client->publish(topic, 0, true, buffer);
    }
    //free(buffer);

    if (this->yieldCallback != nullptr) {
      this->yieldCallback(yieldArg);
    }

    return pubResult;
  }

But this doesn't work well. For some reason there seems to be a memory leak and after a while the heap space runs out. I guess this is due to the fact that there are 3 copies in memory: doc, buffer, packet (after call publish). As you can see, I tried to allocate space for the buffer on the heap rather than on the stack. And it works even worse. After each reconnection, I publish sequentially about 50 messages of about 400 to 1500 characters in length. After each publish, I call yield to change the context and run other tasks. But after a while the heap runs out. I see in the logs that the heap is reduced at the time of serializeJson/publish. Strange, but this may not happen immediately, but after 100-600 minutes. That is, there is not always a leak.

Therefore, I had the idea to add packets from a custom writer. Without buffer. Is it possible? Or maybe you have ideas on how this can be done? Thank you!

bertmelis commented 6 months ago

There is no immediate solution. You can write chunked (see example) but espMqttClient makes the calls, not ArduinoJson. So it doesn't work well with your problem. This is a feature that is currently not existing.

On the other hand, your code most probably has a bug. In your code sample alone I already spot two errors: dynamic size arrays are forbidden and you are writing past the array size.

So, fastest way to solve your problem is to fix your memory leak bug. IN the long run (no ETA, sorry) I might think of a "writer" method.

lumapu commented 6 months ago

I can see heap problems with ESP8266 as well. Check out this file:

https://github.com/bertmelis/espMqttClient/blob/main/src%2FOutbox.h

This implements a linked list which will create a new object for each message. After sending the message the memory will be freed. With this design the heap will be fragmented. Especially with ESP8266 you can print the heap fragmentation, which is from my sight a really good indicator about the health state of your ESP. My idea is to have the fragmentation below 10% or at least on a static level (not increasing). I did a workaround by only passing one message to MqTT and then waiting for the loop to call _client.loop()

I hope there will be some change in future because the workaround is not that nice and not cheap. I have no good idea how the implementation could look like to fit for all users.

One idea I had: Reserve one pool which will be hold during run time, no increase, no decrease. Inside this pool the messages will placed. Once the pool runs full the return state of publish function will inform that the message can't be accepted. Problem, the maximum length is limited.

bertmelis commented 6 months ago

The list (and packet) indeed cause memory fragmentation. Since my own devices aren't that feature packed, I never had any problems but I can image that is not the case for everyone.

A memory pool is already something I had in mind but I never got into studying the subject. Limiting the queue size is easy but some caveats can make it very tricky. (QoS handling, connect packet on reconnect)

Laxilef commented 6 months ago

It seems @lumapu is right. When I started monitoring heap fragmentation, I saw that the value is more than 60. But if I comment publish, then the value is 2-6. It seems that at some point another block takes up space and I see an exception. @bertmelis do I understand correctly that packets are sent by loop call?

bertmelis commented 6 months ago

Yes, messages are sent in the loop call, until the TCP buffer is full. Sending continues on the next loop call. So every time a control packet has to be sent, an outbox list item and a packet (with underlying buffer) is created in heap memory. They are a cause of memory fragmentation. that's how the library works. Adding a memory pool may improve things. But it is not as straightforward as it seems and far from memory-efficient. MQTT packets can very between a single byte up to megabytes. Furthermore, you need to reserve some space the handle qos and (re)connect packets...

As long as you have sufficient free memory, fragmentation is not a problem. Also, when you send/receive messages in bursts, memory fragmentation will return to pre-burst levels because all requested memory is freed. Unless other parts of the program throw their dynamic memory into the mix.

If you encounter exceptions because of this, I'd like to see what is causing this. Do you have any error codes, stack trace (decoded)...

Laxilef commented 6 months ago
    0x40239aac: do_memp_malloc_pool at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/core/memp.c:255
    0x4023d2b3: tcp_create_segment at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/core/tcp_out.c:190
    0x4024221c: ip4_output_if_opt at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/core/ipv4/ip4.c:1577
    0x40238944: std::char_traits::assign(char&, char const&) at /workdir/arena.x86_64/gcc-gnu/xtensa-lx106-elf/libstdc++-v3/include/bits/char_traits.h:329
    0x40238944: std::__cxx11::basic_string, std::allocator >::_M_set_length(unsigned int) at /workdir/arena.x86_64/gcc-gnu/xtensa-lx106-elf/libstdc++-v3/include/bits/basic_string.h:219
    0x40238944: std::__cxx11::basic_string, std::allocator >::_M_append(char const*, unsigned int) at /workdir/arena.x86_64/gcc-gnu/xtensa-lx106-elf/libstdc++-v3/include/bits/basic_string.tcc:374
    0x40232cc5: memcpy_P at /workdir/repo/newlib/newlib/libc/sys/xtensa/string_pgmspace.c:135
    0x40232cba: memcpy_P at /workdir/repo/newlib/newlib/libc/sys/xtensa/string_pgmspace.c:137
    0x4023562c: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:179
    0x4023562c: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:179
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x4023562c: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:179
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x4022fcac: _printf_i at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf_i.c:246
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x402358ed: _svfprintf_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:528
    0x40232ac9: _vsnprintf_r at /workdir/repo/newlib/newlib/libc/stdio/vsnprintf.c:71
    0x40232ac9: _vsnprintf_r at /workdir/repo/newlib/newlib/libc/stdio/vsnprintf.c:71
    0x40232cba: memcpy_P at /workdir/repo/newlib/newlib/libc/sys/xtensa/string_pgmspace.c:137
    0x40232cba: memcpy_P at /workdir/repo/newlib/newlib/libc/sys/xtensa/string_pgmspace.c:137
    0x4023562c: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:179
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x4023562c: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:179
    0x4022fb7d: _printf_i at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf_i.c:196
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x4023562c: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:179
    0x4022fcac: _printf_i at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf_i.c:246
    0x402356f0: __ssputs_r at /workdir/repo/newlib/newlib/libc/stdio/nano-vfprintf.c:232
    0x40242e48: sntp_recv at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/apps/sntp/sntp.c:440
    0x4023f6d0: dhcp_recv at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/core/ipv4/dhcp.c:1763
    0x4023e3c4: tcpip_tcp_timer at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/core/timeouts.c:145
    0x40242cec: sntp_request at /local/users/gauchard/arduino/arduino_esp8266/origin/tools/sdk/lwip2/builder/lwip2-src/src/apps/sntp/sntp.c:609

But I'm not sure that this particular stack trace is associated with the library. It looks like due to fragmentation, the maximum free block is too small.

bertmelis commented 6 months ago

What makes you think fragmentation causes this? What is the reset code?

Laxilef commented 6 months ago

This is my guess. I saw in the logs how the maximum block was decreasing. But the available heap size was more than 10kb. For now, I unfortunately had to go back to pubsubclient because I have large messages and there is no way to split them. I didn't save the reset code :(

bertmelis commented 6 months ago

You can send large messages, only not the way you want. I might add this in the future, but adding a stream-like interface doesn't really fit the paradigm.

Laxilef commented 6 months ago

Okay, can we add the ability to add packets with data? For example, now I do this:

    size_t docSize = measureJson(doc);
    uint8_t* buffer = (uint8_t*) malloc(docSize * sizeof(*buffer));
    size_t length = serializeJson(doc, buffer, docSize);

    size_t written = 0;
    if (length != 0) {
      if (this->client->beginPublish(topic, docSize, true)) {
        for (size_t offset = 0; offset < docSize; offset += 128) {
          size_t packetSize = offset + 128 <= docSize ? 128 : docSize - offset;
          written += this->client->write(buffer + offset, packetSize);
        }

        this->client->flush();
      }
    }
    free(buffer);

That is, I send a packet with headers and data length, and then I send packets with data of 128 characters.

As I see, your library create packet with headers and data. Maybe we can create a packet with headers and then create packets with data? https://github.com/bertmelis/espMqttClient/blob/main/src/MqttClient.cpp#L152C1-L156C4

I think I can make a writer with a static buffer. And pass this writer to serializeJson. This way we get rid of a large buffer. But the problem with storing packages remains. In theory, we could create a vector and immediately allocate a memory pool for the vector.

bertmelis commented 6 months ago

I suppose you now use pubsubclient which is a great library. However, it only supports sending qos 0. I tried to support all qos levels and therefore I keep pakcets in memory until they are acknowledged. This also means that in case of retransmission, the writer function has to run again.

I'll try to work out something. I is possible to adapt the "large payload" feature to have a mechanism like pubsubclient. Mind that I will also change the max qos for such type of messages.

PS while reviewing the current code I spotted a few bugs which causes the "large payload" feature to fail

Laxilef commented 6 months ago

Yes, for now I had to go back to pubsubclient, but the library has stopped developing and there are bugs there, so I'm looking for another one :)

I'll try to work out something. I is possible to adapt the "large payload" feature to have a mechanism like pubsubclient. Mind that I will also change the max qos for such type of messages.

That would be awesome!

PS while reviewing the current code I spotted a few bugs which causes the "large payload" feature to fail

Are you talking about the code I gave above?

bertmelis commented 6 months ago

No, bugs are in my own code, in the library. I believe the current "large payload" code is broken.

No ETA for a solution.