influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.42k stars 5.54k forks source link

Asynchronous InfluxDB V2 output #15621

Open LarsStegman opened 1 month ago

LarsStegman commented 1 month ago

Use Case

We want to use Telegraf to parse a large binary package that comes in at 50Hz. The package contains about 34k data points. The problem in our case is that in the amount of time it takes to flush all the data from 1 package, another 150 packages have come in already. The machine that hosts Telegraf also runs the Influx instance, so there is little to no network delay. The machine has 32 cores, but only about 2 are being used at 100%. The others are twindling their thumbs.

What would be the best approach for fixing this. We see two ways forward for ourselves.

  1. Create a custom logging program and do not use Telegraf.
  2. Create a new Influx output plugin/extend the existing Influx output plugin to allow multithreaded writing. The only issue we see with this, is that the Write function will be called synchronously. We would need to immediately return while the results are still being written to the database async. This means, we cannot return an error anymore should the writing fail. In our case, this is not too big of an issue, but in general this is bad.

We understand that Telegraf is completely not designed for this use case, but we would like to discuss this with you anyway.

Expected behavior

All data is written to the database.

Actual behavior

Only 0.6% of all data is written to the database.

Additional info

No response

powersj commented 1 month ago

Hi Lars,

After chatting with the team today, we wanted to better understand the situation and the behavior you are seeing.

The problem in our case is that in the amount of time it takes to flush all the data from 1 package, another 150 packages have come in already

Is the parsing and routing of the metrics internally the bottle neck or the actually HTTP request/response of the output the bottleneck?

Is this a situation where you could spread the messages across multiple outputs and route them using tags to send more requests at once?

Is increasing the batch size and reducing flush interval helpful?

In our case, this is not too big of an issue, but in general this is bad.

Agreed, I would not go down this path and would push you to the client libraries and their async clients.

LarsStegman commented 1 month ago

Let me give a bit more context on how we want to use Telegraf in this case. We have the Topside Lift System (TLS) on our vessel the Pioneering Spirit that we use to install/remove offshore installations. We are currently using a custom written logger that writes to an SQL database, but we in the process of moving all our datalogging to InfluxDB.

The TLS control system sends out UDP packets to the logger that contain metrics with the system status/sensor measurements at 50Hz . We were trying to parse these UDP packets using Telegraf, to avoid creating a new custom logger.

Is the parsing and routing of the metrics internally the bottle neck or the actually HTTP request/response of the output the bottleneck?

It appears that the HTTP request/response is really the bottleneck in this case.

Is this a situation where you could spread the messages across multiple outputs and route them using tags to send more requests at once?

That might work, we will try that.

Is increasing the batch size and reducing flush interval helpful?

We tried this, but it didn't give us the performance we need.

Agreed, I would not go down this path and would push you to the client libraries and their async clients.

We are leaning more towards creating a custom logger using the client library.

powersj commented 1 month ago

It appears that the HTTP request/response is really the bottleneck in this case.

What is the error condition you get due to this? Are you seeing that you overflow your buffer because this takes so long?

LarsStegman commented 1 month ago

Yeah, the buffer is overflowing because the data cannot be pumped out quickly enough. We suspect this is because the output plugin write synchronously and single threaded. With our custom logger we run the output async and multithreaded and then we're able to pump data into Influx quickly enough to keep up.

srebhan commented 1 month ago

Hey @LarsStegman! Thanks for sharing the details and for using Telegraf in this super cool project!

Brainstorming a bit more in the team, how about the following, you can achieve the parallelization by using multiple InfluxDB outputs pointing to the same server (assuming the DB can keep up with the amount of data). To ease the splitting to multiple outputs, we can implement a new batching processor (please feel free to suggest a better name!) which gets a batch_size and a max_splitting setting. The processor will then create a new (user-specified) tag which gets a "batch-ID" in the range [0 .. max_splitting) using

    func (p *Batching) Apply(in ...telegraf.Metric) []telegraf.Metric {
        out := make([]telegraf.Metric, 0, len(in))
        for _, m := range in {
            batchID := strconv.FormatUint64((p.count % p.BatchSize) % max_splitting, 10)
            p.count++
            m.AddTag(p.BatchTag, batchID) 
            out = append(out, m)
        }
        return out
    }

So if you e.g. want 16 parallel output workers you setup the config like

[[processors.batching]]
  batch_tag = "_batch"
  batch_size = <your agent setting of batch size>
  max_splitting = 16

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["0"]

...

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["15"]

This has the advantage that you can apply this for all outputs and even processors...

What do you think?

LarsStegman commented 1 month ago

Hey @LarsStegman! Thanks for sharing the details and for using Telegraf in this super cool project!

Hi @srebhan thanks for thinking more about this! We really like using Telegraf as it makes connecting to our IoT sensor much easy, scalable and reliable than all the custom connectors we had in the past. We have several custom plugins (that we can't open source, unfortunately), some that we have already open sourced and we make extensive use of the existing plugins.

For this use case, we decided to go with a custom logger because it is a bit more flexible and we had a deadline we needed to make.

Brainstorming a bit more in the team, how about the following, you can achieve the parallelization by using multiple InfluxDB outputs pointing to the same server (assuming the DB can keep up with the amount of data).

The DB is able to keep up with the data easily actually, but that is probably because we are running on dedicated hardware.

To ease the splitting to multiple outputs, we can implement a new batching processor (please feel free to suggest a better name!) which gets a batch_size and a max_splitting setting. The processor will then create a new (user-specified) tag which gets a "batch-ID" in the range [0 .. max_splitting) using

    func (p *Batching) Apply(in ...telegraf.Metric) []telegraf.Metric {
        out := make([]telegraf.Metric, 0, len(in))
        for _, m := range in {
            batchID := strconv.FormatUint64((p.count % p.BatchSize) % max_splitting, 10)
            p.count++
            m.AddTag(p.BatchTag, batchID) 
            out = append(out, m)
        }
        return out
    }

So if you e.g. want 16 parallel output workers you setup the config like

[[processors.batching]]
  batch_tag = "_batch"
  batch_size = <your agent setting of batch size>
  max_splitting = 16

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["0"]

...

[[outputs.influxdb_v2]]
  ...
  [outputs.influxdb_v2.tagpass]
    _batch = ["15"]

This has the advantage that you can apply this for all outputs and even processors...

What do you think?

I think this is a really interesting approach to this problem and I think it might actually work in this use case as well. I like that it solves the problem in a much more generic way. It is a lot of manual configuration, though I think that is manageable since not a lot of people will run into this problem. Also, in our case the amount of data is very predictable, so there is no need to change the configuration often after the initial configuration.