tobiasschuerg / InfluxDB-Client-for-Arduino

Simple library for sending measurements to an InfluxDB with a single network request. Supports ESP8266 and ESP32.
MIT License
381 stars 94 forks source link

Trouble Batching Data #166

Closed eric-w-dyn closed 2 years ago

eric-w-dyn commented 3 years ago

I've followed many examples and instructions in this repository, but am still having issues batching data.

My situation: I’m currently using an ESP32 iOT microcontroller to collect some data and am sending the data to a local InfluxDB server for storage. I’m using the Arduino InfluxDB client to connect the microcontroller to the database and am wanting to batch my data to increase my sample rate. I’ve looked at batching examples and instructions on GitHub but it doesn’t seem to work. In my testing, I can either only get the data to come in synchronously or it doesn’t make it into InfluxDB at all.

Here is my code, in case it is helpful:

/**
 * Secure Write Example code for InfluxDBClient library for Arduino
 * Enter WiFi and InfluxDB parameters below
 *
 * Demonstrates connection to any InfluxDB instance accesible via:
 *  - unsecured http://...
 *  - secure https://... (appropriate certificate is required)
 *  - InfluxDB 2 Cloud at https://cloud2.influxdata.com/ (certificate is preconfigured)
 * Measures signal level of all visible WiFi networks including signal level of the actually connected one
 * This example demonstrates time handling, how to write measures with different priorities, batching and retry
 * Data can be immediately seen in a InfluxDB 2 Cloud UI - measurements wifi_status and wifi_networks
 **/
#include <Arduino.h>
#include <math.h>
#if defined(ESP32)
#include <WiFiMulti.h>
WiFiMulti wifiMulti;
#define DEVICE "ESP32"
#elif defined(ESP8266)
#include <ESP8266WiFiMulti.h>
ESP8266WiFiMulti wifiMulti;
#define DEVICE "ESP8266"
#define WIFI_AUTH_OPEN ENC_TYPE_NONE
#endif

#include <InfluxDbClient.h>
#include <InfluxDbCloud.h>

// WiFi AP SSID
#define WIFI_SSID "Dynamiq Guest"
// WiFi password
#define WIFI_PASSWORD "comeflywithme"
// InfluxDB v2 server url, e.g. https://eu-central-1-1.aws.cloud2.influxdata.com (Use: InfluxDB UI -> Load Data -> Client Libraries)
#define INFLUXDB_URL "http://10.255.255.226:8086"
// InfluxDB v2 server or cloud API authentication token (Use: InfluxDB UI -> Load Data -> Tokens -> <select token>)
#define INFLUXDB_TOKEN "-qRM4acj3ic_Cf7hmeymLhrJzrXQVcRDgQEANPNlplMXddUEFtSjtWJdazJnZ-_gQH7FjMDYQ6yRYp_4ed5mzQ=="
// InfluxDB v2 organization id (Use: InfluxDB UI -> Settings -> Profile -> <name under tile> )
#define INFLUXDB_ORG "Dynamiq"
// InfluxDB v2 bucket name (Use: InfluxDB UI -> Load Data -> Buckets)
#define INFLUXDB_BUCKET "Wifi_Testing"
// Set timezone string according to https://www.gnu.org/software/libc/manual/html_node/TZ-Variable.html
// Examples:
//  Pacific Time:   "PST8PDT"
//  Eastern:        "EST5EDT"
//  Japanesse:      "JST-9"
//  Central Europe: "CET-1CEST,M3.5.0,M10.5.0/3"
#define TZ_INFO "CET-1CEST,M3.5.0,M10.5.0/3"
// NTP servers the for time synchronization.
// For the fastest time sync find NTP servers in your area: https://www.pool.ntp.org/zone/
#define NTP_SERVER1  "pool.ntp.org"
#define NTP_SERVER2  "time.nis.gov"
#define WRITE_PRECISION WritePrecision::MS
#define MAX_BATCH_SIZE 20
#define WRITE_BUFFER_SIZE 40

// InfluxDB client instance with preconfigured InfluxCloud certificate
InfluxDBClient client(INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKEN, InfluxDbCloud2CACert);
// InfluxDB client instance without preconfigured InfluxCloud certificate for insecure connection 
//InfluxDBClient client(INFLUXDB_URL, INFLUXDB_ORG, INFLUXDB_BUCKET, INFLUXDB_TOKEN);

// Data point
Point sensorSine("sine_wave");
Point sensorCosine("cosine_wave");

// Number for loops to sync time using NTP
int iterations = 0;
double t = 0;

void setup() {
  Serial.begin(9600);

  // Setup wifi
  WiFi.mode(WIFI_STA);
  wifiMulti.addAP(WIFI_SSID, WIFI_PASSWORD);

  Serial.print("Connecting to wifi");
  while (wifiMulti.run() != WL_CONNECTED) {
    Serial.print(".");
    delay(500);
  }
  Serial.println();

  // Add tags
  sensorSine.addTag("device", DEVICE);
  sensorSine.addTag("SSID", WiFi.SSID());
  sensorCosine.addTag("device", DEVICE);
  sensorCosine.addTag("SSID", WiFi.SSID());

  // Alternatively, set insecure connection to skip server certificate validation 
  client.setInsecure();

  // Accurate time is necessary for certificate validation and writing in batches
  // Syncing progress and the time will be printed to Serial.
  timeSync(TZ_INFO, NTP_SERVER1, NTP_SERVER2);

  // Check server connection
  if (client.validateConnection()) {
    Serial.print("Connected to InfluxDB: ");
    Serial.println(client.getServerUrl());
  } else {
    Serial.print("InfluxDB connection failed: ");
    Serial.println(client.getLastErrorMessage());
  }

  // Enable messages batching and retry buffer
  client.setWriteOptions(WriteOptions().writePrecision(WRITE_PRECISION).batchSize(MAX_BATCH_SIZE).bufferSize(WRITE_BUFFER_SIZE));
}

void loop() {
  // Sync time for batching once per hour
  if (iterations++ >= 100) {
    timeSync(TZ_INFO, NTP_SERVER1, NTP_SERVER2);
    iterations = 0;
  }

  for(int i = 0; i < MAX_BATCH_SIZE; i++){
    // Report RSSI of currently connected network
    sensorSine.clearFields();
    sensorCosine.clearFields();
    //sensorSine.setTime(time(nullptr));
    //sensorCosine.setTime(time(nullptr));
    sensorSine.addField("sin", 10*cos(0.5*t)+5);
    sensorCosine.addField("cos", 10*sin(0.5*t)+5);

    // Print what are we exactly writing
    Serial.print("Writing: ");
    Serial.print(client.pointToLineProtocol(sensorSine));
    Serial.print("\t");
    Serial.println(client.pointToLineProtocol(sensorCosine));

    // Write point into buffer - high priority measure
    client.writePoint(sensorSine);
    client.writePoint(sensorCosine);

    // Clear fields for next usage. Tags remain the same.
    sensorSine.clearFields();
    sensorCosine.clearFields();
    t = t + 0.01;
  }
  client.flushBuffer();

    // If no Wifi signal, try to reconnect it
    if (wifiMulti.run() != WL_CONNECTED) {
      Serial.println("Wifi connection lost");
    }

  // End of the iteration - force write of all the values into InfluxDB as single transaction
  Serial.println("Flushing data into InfluxDB");
  if (!client.flushBuffer()) {
    Serial.print("InfluxDB flush failed: ");
    Serial.println(client.getLastErrorMessage());
    Serial.print("Full buffer: ");
    Serial.println(client.isBufferFull() ? "Yes" : "No");
  }

  // Wait 10s
  //Serial.println("Wait 10s");
  //delay(2000);
}
vlastahajek commented 3 years ago

@eric-w-dyn, thanks for using this client.

I don't see a big problem in your code. All points should be written unless there is an error. How do you check data?

Few points to improve:

eric-w-dyn commented 2 years ago

@vlastahajek Thanks for the tips. I think I got batching working properly. However, I'm still not happy for the results that I'm getting. Do you have any configuration tips to minimize the time it take to flush the batch? I'm collecting data from an analog sensor at 80Hz, thus I would like to write a continuous stream to InfluxDB at 80Hz. I'm getting about 80Hz within each batch, but there are relatively long gaps between each batch where I lose information. I have attached a screenshot of my Grafana dashboard that shows these gaps. I would expect an ESP32 chip to be able to handle concurrent collection and networking considering the multicore design specifically for these types of applications... image

vlastahajek commented 2 years ago

@eric-w-dyn, when writing data less more then 2 per minute, you should set HTTP reuse:

client.setHTTPOptions(HTTPOptions().connectionReuse(true);

if you gather data at a different time than adding points, keep time for it.

If you will still encounter a lag, please enable debug and post the debug output here.

eric-w-dyn commented 2 years ago

@vlastahajek I already had HTTP reuse enabled. And yes, I am syncing with a time server to ensure the points have the correct time.

Here is some output from debug:

Writing: cosine_wave cos=4.79
449.189 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
449.270 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=4.96
449.395 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
449.476 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=5.13
449.600 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
449.681 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=5.31
449.805 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
449.886 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=5.49
450.010 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
450.091 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=5.67
450.215 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
450.296 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=5.85
450.421 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
450.502 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=6.03
450.626 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
450.707 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=6.21
450.831 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
450.912 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=6.40
451.036 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
451.117 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=6.59
451.241 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
451.323 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=6.78
451.447 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
451.528 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=6.97
451.652 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
451.733 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=7.16
451.857 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
451.938 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=7.35
452.062 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
452.143 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=7.54
452.267 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
452.349 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=7.74
452.473 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
452.554 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=7.93
452.678 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
452.759 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=8.13
452.883 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
452.964 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=8.32
453.088 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
453.169 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=8.52
453.294 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
453.375 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Flushing data into InfluxDB
453.500 [D] Writing batch, batchpointer: 0, size 100
453.552 [D] Writing to http://192.168.1.8:8086/api/v2/write?org=Dynamiq&bucket=Edoras&precision=ms
453.655 [D] Sending:
cosine_wave cos=1.51 1639416077220
cosine_wave cos=1.41 1639416077425
cosine_wave cos=1.31 1639416077630
cosine_wave cos=1.21 1639416077836
cosine_wave cos=1.12 1639416078041
cosine_wave cos=1.03 1639416078246
cosine_wave cos=0.94 1639416078451
cosine_wave cos=0.86 1639416078656
cosine_wave cos=0.78 1639416078862
cosine_wave cos=0.70 1639416079067
cosine_wave cos=0.63 1639416079272
cosine_wave cos=0.56 1639416079477
cosine_wave cos=0.50 1639416079682
cosine_wave cos=0.44 1639416079888
cosine_wave cos=0.38 1639416080093
cosine_wave cos=0.33 1639416080298
cosine_wave cos=0.28 1639416080503
cosine_wave cos=0.24 1639416080708
cosine_wave cos=0.19 1639416080914
cosine_wave cos=0.16 1639416081119
cosine_wave cos=0.12 1639416081324
cosine_wave cos=0.09 1639416081529
cosine_wave cos=0.07 1639416081734
cosine_wave cos=0.05 1639416081940
cosine_wave cos=0.03 1639416082145
cosine_wave cos=0.02 1639416082350
cosine_wave cos=0.01 1639416082555
cosine_wave cos=0.00 1639416082760
cosine_wave cos=0.00 1639416082966
cosine_wave cos=0.00 1639416083171
cosine_wave cos=0.01 1639416083376
cosine_wave cos=0.02 1639416083581
cosine_wave cos=0.03 1639416083787
cosine_wave cos=0.05 1639416083992
cosine_wave cos=0.07 1639416084197
cosine_wave cos=0.10 1639416084402
cosine_wave cos=0.13 1639416084607
cosine_wave cos=0.17 1639416084813
cosine_wave cos=0.20 1639416085018
cosine_wave cos=0.25 1639416085223
cosine_wave cos=0.29 1639416085428
cosine_wave cos=0.34 1639416085633
cosine_wave cos=0.40 1639416085839
cosine_wave cos=0.45 1639416086044
cosine_wave cos=0.52 1639416086249
cosine_wave cos=0.58 1639416086454
cosine_wave cos=0.65 1639416086659
cosine_wave cos=0.72 1639416086865
cosine_wave cos=0.80 1639416087070
cosine_wave cos=0.88 1639416087275
cosine_wave cos=0.96 1639416087480
cosine_wave cos=1.05 1639416087685
cosine_wave cos=1.14 1639416087891
cosine_wave cos=1.24 1639416088096
cosine_wave cos=1.33 1639416088301
cosine_wave cos=1.44 1639416088506
cosine_wave cos=1.54 1639416088712
cosine_wave cos=1.65 1639416088917
cosine_wave cos=1.76 1639416089122
cosine_wave cos=1.88 1639416089327
cosine_wave cos=1.99 1639416089532
cosine_wave cos=2.12 1639416089738
cosine_wave cos=2.24 1639416089943
cosine_wave cos=2.37 1639416090148
cosine_wave cos=2.50 1639416090353
cosine_wave cos=2.63 1639416090558
cosine_wave cos=2.77 1639416090764
cosine_wave cos=2.91 1639416090969
cosine_wave cos=3.05 1639416091174
cosine_wave cos=3.20 1639416091379
cosine_wave cos=3.34 1639416091584
cosine_wave cos=3.49 1639416091790
cosine_wave cos=3.65 1639416091995
cosine_wave cos=3.80 1639416092200
cosine_wave cos=3.96 1639416092405
cosine_wave cos=4.12 1639416092610
cosine_wave cos=4.29 1639416092816
cosine_wave cos=4.45 1639416093021
cosine_wave cos=4.62 1639416093226
cosine_wave cos=4.79 1639416093431
cosine_wave cos=4.96 1639416093637
cosine_wave cos=5.13 1639416093842
cosine_wave cos=5.31 1639416094047
cosine_wave cos=5.49 1639416094252
cosine_wave cos=5.67 1639416094457
cosine_wave cos=5.85 1639416094663
cosine_wave cos=6.03 1639416094868
cosine_wave cos=6.21 1639416095073
cosine_wave cos=6.40 1639416095278
cosine_wave cos=6.59 1639416095483
cosine_wave cos=6.78 1639416095689
cosine_wave cos=6.97 1639416095894
cosine_wave cos=7.16 1639416096099
cosine_wave cos=7.35 1639416096304
cosine_wave cos=7.54 1639416096509
cosine_wave cos=7.74 1639416096715
cosine_wave cos=7.93 1639416096920
cosine_wave cos=8.13 1639416097125
cosine_wave cos=8.32 1639416097330
cosine_wave cos=8.52 1639416097535

457.354 [D] Dropped batch, batchpointer: 1
457.368 [D] Success: 1, _bufferPointer: 1, _batchPointer: 1, _writeBuffer[_bufferPointer]_0x0
457.466 [D] Buffer empty
Writing: cosine_wave cos=8.72
457.525 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
457.606 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=8.92
457.730 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
457.811 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=9.12
457.935 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
458.016 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=9.32
458.140 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
458.221 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=9.52
458.346 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
458.427 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=9.72
458.551 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
458.632 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=9.92
458.756 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
458.837 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=10.12
458.962 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
459.043 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=10.32
459.169 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
459.250 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=10.52
459.375 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
459.456 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
Writing: cosine_wave cos=10.72
vlastahajek commented 2 years ago

The log shows it takes nearly 4s to write a batch. And this is the gab shown in the chart when you are not reading data.

Try sending smaller batches more frequently. But you will still get gaps.

As you want to read 80Hz data and you are using ESP32, I would suggest reading a sensor on core 0 (using xTaskCreatePinnedToCore) and writing data to a queue that would be read on in the loop (core 1) and written to the database.

eric-w-dyn commented 2 years ago

Yeah, I've tried splitting tasks into different cores a little already, but haven't yet reached a working program. My understanding is that ESP32 should already split networking tasks into a different core by default. Perhaps I am misled in that understanding...

vlastahajek commented 2 years ago

My understanding is that core 0 is used for radio (WiFi, BLE) management, not networking. I have a custom 433Mhz ASK sensors network. ESP32 acts as a gateway that writes data from sensors to a database. As writing takes time and had to put the radio receiving task to core 0 to don't miss 433MHz radio packets.

Where do have the server? I tried your use case and when I have a server on rpi on the same network, it's written almost instantly. Such long times are usually on remote SSL server (e.g. AWS)

If nothing changes in your setup to speed up writing, as sending 80 lines takes 4s (total roundtrip), you won't be able to write it at such speed. Even you will create data on core 0, writing takes so long that writing task will be a lot slower than reading task.

eric-w-dyn commented 2 years ago

My InfluxDB database is just local on my computer. Everything stays local. I'll try xTaskCreatePinnedToCore a little more and see where that takes me. Thanks for the advice

vlastahajek commented 2 years ago

If you have a server on the local computer, writing shouldn't be a problem. No more than 100ms, which should be sufficient for you use case. If writing takes seconds, there must be a problem somewhere else.
When you turn off debug (writing of debug info adds some delay) and measure flushing, what is the time? E.g. if batch-size is 100:

 uint32_t d = millis();
 if (!client.writePoint(sensorCosine)) {
    Serial.print("InfluxDB write failed: ");
    Serial.println(client.getLastErrorMessage());
  }
  d = millis()-d;
  if(i==99) {
      Serial.printf("Sending took %dms\n", d);
  }
eric-w-dyn commented 2 years ago

Ok, I just tried that. For some reason, it's working a lot faster now... I'm getting much faster write speeds (up to 1000 Hz), but I'm still getting little breaks in the data stream. See screenshot below: image

In debug mode, I notice that it says "Flushing buffer: is oversized false [...]" after every write record (see output below). This doesn't seem write. The whole point of batching is to avoid flushing data after every point.

5.415 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
5.504 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
5.583 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
5.673 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
5.752 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
5.842 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
5.921 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
6.011 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0
6.090 [D] Flushing buffer: is oversized false, is timeout false, is buffer full false
6.179 [D] writeRecord: bufferPointer: 0, batchPointer: 0, _bufferCeiling: 0

This is confirmed in Grafana. Every time I refresh my dashboard, I see new data (even when I refresh it at like 2 Hz). It shouldn't be batching and flushing this frequently as I have my max batch size set to 2000 currently.

Here is my current loop() method, in case you want to inspect it:

void loop() {
  for (int i = 0; i < MAX_BATCH_SIZE; i++){
    sensorCosine.addField("cos", 10.0*cos(0.04*t)+10.0);

    if (!client.writePoint(sensorCosine)) {
        Serial.print("InfluxDB write failed: ");
        Serial.println(client.getLastErrorMessage());
    }

      t += 0.01;
      sensorCosine.clearFields();
  }

  // If no Wifi signal, try to reconnect it
  if (wifiMulti.run() != WL_CONNECTED) {
    Serial.println("Wifi connection lost");
  }

  // End of the iteration - force write of all the values into InfluxDB as single transaction
  Serial.println("Flushing data into InfluxDB");
  if (!client.flushBuffer()) {
    Serial.print("InfluxDB flush failed: ");
    Serial.println(client.getLastErrorMessage());
    Serial.print("Full buffer: ");
    Serial.println(client.isBufferFull() ? "Yes" : "No");
  }
}
vlastahajek commented 2 years ago

The debug output [D] Flushing buffer: is oversized false, is timeout false, is buffer full false just says: there is no reason for automatic flushing.

If you have milliseconds precicion, some points will be overwritten because of the same timestamp, as writting to buffer is fast.

You don't need to call client.flushBuffer(), as data is written automatically when buffer if full. This would happen at the last cycle of the for loop, however, not in case of batch size 2000. More bellow.

Actually, maximum supported batch size is 255 and higher number are trimmed to this. It is due to memory restrictions, mainly on ESP8266 device. Regular points are usually long (several tags and fields), so this limit is set to prevent OOM crash.

eric-w-dyn commented 2 years ago

Oh ok. I wasn't aware that 255 was the max batch size. I remember reading somewhere that the optimal batch size for Influx is 2000, but it must not be applicable in my situation.

Since ESP32 has more memory, is it possible to change the library code to increase the maximum support batch size?

vlastahajek commented 2 years ago

If fact, recommended bachsize is 5000. However this is meant for normal computer/server, not embeded device.
Even 2000 points will be a problem. Maximum allocable memory even on ESP32 is only 110K, despite there is more RAM available. This means you would need to be sure that points will have at maximum 50 chars!

GeorgeDeac commented 2 years ago

I have struggled with this also. I have a high data rate application with 8k sample rate on esp32 reading vibration data with external psram and did a custom implementation for buffering to it. However the overhead is too big when sending packets even when queuing from the external buffer and for some reason I encounter random crashes when I still have heap space, I guess line protocol just isn't good for this. Can you recommend a binary alternative of it, I've tried looking up the header specifications for influx to do something, but it's pretty hard.

eric-w-dyn commented 2 years ago

I’ve had better success recently using Bluetooth rather than Wifi. I’m using Python with the Bleak library to ingest the data into Influx. I can get close to 1kHz sample rate with no batching

GeorgeDeac commented 2 years ago

Internet communication is required in this project. The main issue with line protocol here is the use of strings in multiple buffers and then they are copied in the tcp packet buffer again from what I reckon, that's a lot of overhead when you have a lot of points. I guess a tcp packet can't be build as a stream of strings to create it while sending it sequentially in order to save resources, or isn't implemented. Best option is to send binary data and I would have preferred to send directly to the db with this, tried with mqtt before but had some problems configuring the broker with the latest telegraf server. Another potential solution would be if I try to patch this lib to use the psram directly for its buffers, I'm not sure that would help either considering the overhead.

vlastahajek commented 2 years ago

I have good news for you.
I have already rewritten posting data by directly streaming batch data. This means, no more max alloc block size limitation. As each line is separately allocated, it should allow a big batch on ESP32. Increasing batch size limit is not done yet, but you can try zero-alloc writing in the feat/stream_batch branch

GeorgeDeac commented 2 years ago

I was away for the holidays, will go testing it 👍. This feature should be mainstream