influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.63k stars 5.58k forks source link

Need a more flexable histogram / tdigest #6440

Closed PhoenixRion closed 3 years ago

PhoenixRion commented 5 years ago

Feature Request

Groupon needed a more flexible histogram aggregation. Support for central re-aggregation is also needed. Central aggregations is currently accomplished outside of telegraf.

Proposal: https://github.com/PhoenixRion/telegraf/tree/master/plugins/aggregators/tdigestagg

Configuration:

## TDigest Compression
  ## This value corresponds to the number of centroids the histogram will use
  ## Higher values increase size of data but also precision of calculated percentiles
  compression = 30.0

[[aggregators.tdigestagg.bucketing]]
  ## List of tags that will not be considered for aggregation and not emitted.
  ## Empty list is valid
  exclude_tags=[host]

  ## "source" is required in output by SLA but typically mapped from another input tag
  ## If source_tag_key is not set on an input point, a default value will be set and
  ## a sla_violation tag will be added
  source_tag_key=service

  ## Optional: Default value is "atom"
  ## "atom" is required for output by SLA.  Tag can be submitted with input points or mapped
  ## from another input tag.  If "atom" or configured replacement tag is not set on an input
  ## point, a default value will be set and a sla_violation tag will be added
  atom_replacement_tag_key=az

Local Aggregation Output:

{
  "fields": {
    "count": 2,
    "max": 0.12385434728741589,
    "med": 0.0742942278114139,
    "min": 0.024734108335411897
  },
  "name": "cpu_usage_nice",
  "tags": {
    "atom": "sea1",
    "az": "sea1",
    "cpu": "cpu-total",
    "env": "dev",
    "host": "rion-laptop",
    "rion": "tdigest-test",
    "source": "rion-laptop"
  },
  "timestamp": 1569347320
}

Central Aggregation Output:

{
  "fields": {
    "sum._utility": 1230.0,
    "centroids": "[{97.97979797979798 1} {97.97979797979798 1} {98 1} {98 1} {98 1} {98 1} {98 1} {98 1} {98.00990099009901 2} {98.01980198019803 2} {98.01980198019803 2} {98.01980198019803 2} {98.98989898989899 1} {98.98989898989899 2} {99 1} {99 2} {99 2} {99 2} {99 2} {99 2} {99 2} {99 2} {99 2} {99 2} {99 2} {99 2} {99.00990099009901 2} {99.00990099009901 2} {99.00990099009901 2} {100 2} {100 2} {100 1} {100 1} {100 1} {100 1} {100 1} {100 1} {100 1}]",
    "compression": 30
  },
  "name": "cpu_usage_idle",
  "tags": {
    "cpu": "cpu1",
    "source": "rion-laptop",
    "az": "snc1",
    "env": "dev",
    "service": "awesome",
    "aggregates": "max,min,count,p99,p95,avg,med",
    "bucket_key": "cpu_usage_idle_awesome_snc1_dev"
  },
  "timestamp": 1532630290113371000
}

Current behavior:

Statically defined bucket boundaries Aggregations not generated based on histogram

Desired behavior:

Dynamic histogram buckets Ability to emit histogram for central aggregation Arbitrary list of aggregation buckets

Use case

Mathematically accurate percentiles for metrics across multiple sources.

danielnelson commented 5 years ago

I've learned that we have a t-digest library for Go that we are using in Flux: github.com/influxdata/tdigest. It would make a lot of sense to share this library, it is fast, we would have completely consistent behavior with Flux, and it would be less code to maintain. I'm not sure that it would be possible to access the centroids currently, but I think this could probably be added, we can work out the details with @goller.

[[aggregators.tdigestagg.bucketing]]
  ## List of tags that will not be considered for aggregation and not emitted.
  ## Empty list is valid
  exclude_tags=[host]

Should be able to use tagexclude metric filters for this.

  ## "source" is required in output by SLA but typically mapped from another input tag
  ## If source_tag_key is not set on an input point, a default value will be set and
  ## a sla_violation tag will be added
  source_tag_key=service

This should be done outside of the plugin, I think that it can use the global_tags section or it is possible to add it in the override processor for more control.

  ## Optional: Default value is "atom"
  ## "atom" is required for output by SLA.  Tag can be submitted with input points or mapped
  ## from another input tag.  If "atom" or configured replacement tag is not set on an input
  ## point, a default value will be set and a sla_violation tag will be added
  atom_replacement_tag_key=az

I don't entirely understand this but it should probably be separate from this plugin. It may be possible to do this with clever use of the rename processor. If the "atom" tag is not set then rename the "az" tag.


It appears there are two modes for the aggregator. In the "local" mode a set of quantiles are requested and in the "central" mode the centroid data is sent to the server so that the t-digest algorithm can be calculated later with different quantile values?

Here is how I suggest we layout the "local" mode aggregations:

Same in line protocol:

cpu,cpu=cpu-total,quantile=min usage_nice_tdigest=0.12,usage_idle_tdigest=0.042
cpu,cpu=cpu-total,quantile=median usage_nice_tdigest=0.12,usage_idle_tdigest=0.042
cpu,cpu=cpu-total,quantile=max usage_nice_tdigest=0.12,usage_idle_tdigest=0.042
cpu,cpu=cpu-total usage_nice_tdigest_count=2,usage_idle_tdigest_count=2

For "central" mode metrics, something like this based on what data we need to save:

{
  "fields": {
    "usage_nice_centroid": 1,
    "usage_idle_centroid": 1,
  },
  "name": "cpu",
  "tags": {
    "cpu": "cpu-total",
    "centroid": "97.979797",
  },
  "timestamp": 1569347320
}
{
  "fields": {
    "usage_nice_centroid": 1,
    "usage_idle_centroid": 1
  },
  "name": "cpu",
  "tags": {
    "cpu": "cpu-total",
    "centroid": "98",
  },
  "timestamp": 1569347320
}
{
  "fields": {
    "usage_nice_centroid_sum": 1230.0
    "usage_nice_centroid_compression": 30
    "usage_idle_centroid_sum": 1230.0
    "usage_idle_centroid_compression": 30
  },
  "name": "cpu",
  "tags": {
    "cpu": "cpu-total",
  },
  "timestamp": 1569347320
}

Same in line protocol:

cpu,cpu=cpu-total,centroid=97.97 usage_nice_centroid=1,usage_idle_centroid=1
cpu,cpu=cpu-total,centroid=98 usage_nice_centroid=1,usage_idle_centroid=1
cpu,cpu=cpu-total usage_nice_centroid_sum=1230.0,usage_nice_centroid_compression=1000,usage_idle_centroid_sum=1230.0,usage_idle_centroid_compression=1000,

This would align the storage model with our plans for other histogram/quantile data such as in #4415 or the histogram aggregator, and I believe it would be more friendly for frontends like Grafana/Chronograf.

PhoenixRion commented 5 years ago

Single TDigest library

The only reason I didn't use any library that was published was the fact that they all seem to hide the centroids and that is necessary for sending the data to a central aggregator. Using the same library as Flux could benefit people sending data to InfluxDB and will likely have no adverse impact on anyone else. I think the conversation around the central output format could likely influence how that library exposes the data.

Tag manipulation outside of the plugin

The tag manipulation that is done within the plugin was implemented there to support multiple bucketing configurations. The most common example is that you want to aggregate data once for each host that is running your service and then again for the service as a whole, excluding the host info.

[[aggregators.tdigestagg.bucketing]]
  exclude_tags=[host,az]
  source_tag_key=service
  atom_replacement_tag_key=env
[[aggregators.tdigestagg.bucketing]]
  exclude_tags=[]
  source_tag_key=host
  atom_replacement_tag_key=az

We did not want to go the route of multiple instances of the plugin to reduce duplication of common tasks between configurations.

Local aggregation output format

The field name "quantile" is inaccurrate but that could be changed to "aggregation" or something similar. I would add a tag "aggregator": "tdigest" instead of appending to a field name The logic to group by aggregation should not be too complicated. I am not completely sold that it would be better yet. As far as how the data renders in front ends. Writing a query for usage.nice.p99 vs usage.nice && agg=p99 seems like a matter of preference. The query usage.nice.* vs usage.nice to see every aggregation seems like it might be nicer to see the aggregation as a tag but not by much.

Central aggregation output format

Breaking up the centroid objects is going to be a bad idea. The numeric pairs represent a value and a weight. These values are dynamically calculated as values are added to the histogram and neither weights or counts from any two histograms will have any logical mapping. Even in the simple example you listed, if cpu_idle had values 98 and 99 then cpu_nice would end up with values 2 and 1. In a more real world example of http timers, the odds of two histograms sharing values is quite small. Second and potentially more imporant, the more the centroid is broken up, the more pieces that will have to be put together. The recombining of the histograms from multiple hosts that must be done to create central aggregations is already a computationally intensive task. The task also has a time limit. Because new histograms are generated every 60s, every batch MUST be finished in less than that time or you will get a perpetually increasing delay for metrics processing.

I am not opposed to alterring the output format but in addition to performance, it needs to be ingestable by libraries other than what was used to encode the source histogram. Our central aggregation is not even written in GoLang, for example.

danielnelson commented 5 years ago

Tag manipulation outside of the plugin

I haven't looked at the implementation but the best and expected way to do bucketing is to create a bucket for every measurement+tagset. The user can then use tagexclude to remove tags, which byt there removal will result in the buckets be merged. I can show an example if my explanation doesn't make sense.

Local aggregation output format

The field name "quantile" is inaccurrate but that could be changed to "aggregation" or something similar.

I must have misunderstood the way this works, doesn't the local mode work by calulating one or more estimated quantiles? With a range from 0-1, the median aggregation is the 0.5 quantile, and min is the 0 quantile and the max is the 1 quantile?

usage.nice.p99 vs usage.nice && agg=p99 seems like a matter of preference.

This is essentially the difference between older systems like graphite and newer tagged time series databases. By now almost all TS databases have adopted tags because they make querying easier and the style is more extensible. Our rule of thumb is to not write multiple values to a single column.

Central aggregation output format

Storing the centroid JSON as multiple fields allows InfluxDB to store the data in an efficient manner. With higher compression settings this JSON will become to long and will cause issues on many InfluxDB installs that have a limited write payload and line size. Also it will be completely unwritable to a fair number of other outputs that don't support a string type (prometheus, graphite, etc).


I do think we need to make at least one change from my proposal above, since the centroid could have so many values and they would be presumably changing over time, this wouldn't give proper identity to the items and would have poor cardinality:

cpu,cpu=cpu-total,centroid=97.97 usage_nice_centroid=1
cpu,cpu=cpu-total,centroid=98 usage_nice_centroid=1

Using the centroid number instead as the tag would probably make more sense, there would never be more than compression different tags, and you could see how the centroids change over time though not sure if that would be interesting. It seems it should be more like:

cpu,cpu=cpu-total,centroid=1 usage_nice_centroid=97.97,usage_nice_centroid_count=1
cpu,cpu=cpu-total,centroid=2 usage_nice_centroid=98,usage_nice_centroid_count=1
PhoenixRion commented 5 years ago

Quantile would not include count or sum which are necessary for me to support. I think we are not on the same page for the concept of how the central aggregation is working. We are not storing the histogram data in our TSDB, it is being processed by an intermediate service. Perhaps there can be a flag for output formats of the centroid data.

I will be attending the Flux training at InfluxDays SF, perhaps there is less processing time sensitive method for merging histograms from multiple sources.

danielnelson commented 5 years ago

Just a quick update for those watching this issue. @PhoenixRion, @goller, and I have been discussing how exactly this plugin could come together.

Our main goal in Telegraf is to generate a data model for storing tdigests in a format that will allow post collection merging and summarization. The data format should be well documented and applicable to should be possible to use with multiple output plugins.

We would like to follow this up with functions for merging the digests and estimating quantiles using Flux.

The next items we are planning to work on are:

  1. Expose centroids in influxdata/tdigest -- @goller
  2. Update PR to use influxdata/tdigest -- @PhoenixRion
  3. Investigate storage and serialization methods in use by other systems such as Clickhouse, MS Data Explorer -- @danielnelson
danielnelson commented 4 years ago

Update to proposed format:

cpu,host=foo,name=usage_nice,centoid=1 mean=42,weight=42
cpu,host=foo,name=usage_nice,centoid=2 mean=42,weight=42
cpu,host=foo,name=usage_nice,centoid=30 mean=42,weight=42