influxdata / telegraf

Agent for collecting, processing, aggregating, and writing metrics, logs, and other arbitrary data.
https://influxdata.com/telegraf
MIT License
14.14k stars 5.52k forks source link

"Histogram" statistics aggregator plugin #1662

Closed pauldix closed 6 years ago

pauldix commented 7 years ago

Feature Request

I'm attempting to bring discussion about #380 and #1364 into one place so we can talk about the design and implementation to move things forward. This has been requested pretty frequently so I think it's worth looking into.

Proposal

Give users the ability to pre-aggregate data in Telegraf. The most common use case for this is to calculate aggregates or downsamples that would get stored in other retention policies. I assume that users will want to aggregate everything from an input plugin or measurement or all measurements matching a pattern.

This might make sense to be implemented as a new type called middleware rather than as an input or output plugin. However, it would need to be able to map between input and output plugins if we wanted to do something like that. Having that mapping would be tricky because we'd need a method for identifying each input and output, which currently doesn't exist.

Alternately, it could just be implemented as part of the influxdb output plugin. This would probably keep things simpler in the short term. Doing it as middleware could be tricky because each output plugin has different options and you may want to set different options for different aggregations.

So we'll go with the updated InfluxDB output plugin for our example.

Desired behavior:

If we implement it as something that could be added as an InfluxDB output plugin, you might have the following.

# Configuration for influxdb server to send metrics to
[[outputs.influxdb]]
  ## this one sends the raw data to whatever RP is configured on that UDP endpoint
  urls = ["udp://localhost:8089"] # UDP endpoint example
  precision = "s"
  udp_payload = 512

[[outputs.influxdb]]
  ## this one sends the short aggregates to a different endpoint/RP
  urls = ["udp://localhost:10050"] # UDP endpoint example
  precision = "s"
  udp_payload = 512

  [rollup]
    # measurements  = [] # could match against specific measurment names
    measurement_match = "*" # match all measurements
    # fields = [] # could match against specific field names
    fields_match = "*" # match all fields
    functions = ["min", "max", "first", "last", "sum", "count"]
    periods = ["5m", "10m"]

[[outputs.influxdb]]
  ## this one sends longer aggregates to yet another endpoint/RP
  urls = ["udp://localhost:10050"] # UDP endpoint example
  precision = "s"
  udp_payload = 512

  [rollup]
    # measurements  = [] # could match against specific measurment names
    measurement_match = "*" # match all measurements
    # fields = [] # could match against specific field names
    fields_match = "*" # match all fields
    functions = ["min", "max", "first", "last", "sum", "count"]
    periods = ["1h"]

Use case

Gives users the ability to calculate downsamples and aggregates in a decentralized way, amortizing the cost of computing the aggregates across their Telegraf infrastructure as opposed to only in the database.

alimousazy commented 7 years ago

hi @pauldix:

Check my implementation as middleware (filters) , I did test with different kind of output plugin and it works fine since it pass metrics as it came from the input plugin it self. I get your idea about routing which implemented in Fluentd where user can match metrics based on tags and route it to different outputs but I think this is a different problem to tackle maybe in the future.

Here is a sample (working example)

 [[filter.histogram]]
  bucketsize = 20
  flush_interval = "30s"
  rollup = [
    "(Name interface_rollup) (Tag interface en*) (Functions mean 0.90) (Pass true)",
    "(Name cpu_rollup) (Measurements cpu*) (Functions mean sum)",
    "(Name disk_rool) (Tag fstype hfs) (Functions 0.90)",
  ]

where is Name is the name of rollup. Tag is the tag to match (support glob) you can have multi tag match (Tag tag1 bla) (Tag tag2 foo) Functions functions to apply mean, sum, sum, min, max, mean, variance, numbers for percentile ex 0.90 Pass a flag to pass original metric or drop it.

I think changing all the output plugins will require a lot of effort. I hope some time we will reach a sophisticated routing mechanism like the one implemented in fluentd. besides aggregation should happen in one place and it waste of space and cpu usage to implement and run it in multiple places.

The big question, Why I haven't used TOML ?

TOML is pretty limited when it comes to nested configuration.

Data flows

Input output2 Input 2 ---- Metric a --> Filter --> Metric rollup --> output1 Input 3

Sample output:

{"fields":{"bytes_recv.mean":3.0057850165e+09,"bytes_recv.p0.90":3.005787266e+09,"bytes_sent.mean":2.65015301e+08,"bytes_sent.p0.90":2.6501842e+08,"drop_in.mean":0,"drop_in.p0.90":0,"drop_out.mean":12,"drop_out.p0.90":12,"err_in.mean":0,"err_in.p0.90":0,"err_out.mean":0,"err_out.p0.90":0,"packets_recv.mean":2.8129555e+06,"packets_recv.p0.90":2.812976e+06,"packets_sent.mean":1.440443e+06,"packets_sent.p0.90":1.440471e+06},"name":"interface_rollup","tags":{"host":"L-SNVT0RAFD5-M.local","interface":"en0"},"timestamp":1472178570}

Other filters may follow after we merge the pull request like: 1- Metric shaping (Renaming tags fields and change value). 2- Bandwidth limitation. 3- Blocking metrics. 4- Routing. 5- Metric Statistics. 6- Split metrics to multi metric based on fields. 7- Combine metrics to one metrics.

sparrc commented 7 years ago

A few of my thoughts on this:

  1. we should use TOML throughout
  2. for the purposes of aggregation, I think there should just be a single type of histogram plugin. I can't think of another use-case of a general "aggregator" type of plugin, and I think that it would be OK to create a separate "transformation" or "filter" plugin later for transforming/filtering metrics.
  3. tagpass/tagdrop should be supported in the same way that it is currently supported for input/output plugins.
  4. measurement and field filtering should be a list of glob-supported filters. The filtering should support both "include" and "exclude" options for measurements and fields.
  5. percentiles should be a top-level configuration option

This looks fairly similar to what @pauldix proposed, but differs in that it is a general type that can aggregate metrics from all inputs and send them on to all outputs.

[[histograms]]
  ## measurements to calculate histogram data for
  ## only one of measurement_include & measurement_exclude should be defined
  measurement_include = ["*"]
  # measurement_exclude = []
  ## fields to calculate histogram data for
  ## only one of field_include & field_exclude should be defined
  field_include = ["*"]
  # field_exclude = []
  ## If true, drop the original metric field(s), only sending the aggregates
  ## to output plugins.
  drop_original = false
  ## Histogram functions to calculate for each field
  functions = ["min", "max", "first", "last", "sum", "count", "stddev"]
  ## quantiles to collect for each metric field.
  quantiles = [0.50, 0.95, 0.99]
  [[histograms.tagdrop]]
    cpu = ["cpu0", "cpu1", "cpu2"]
sparrc commented 7 years ago

Still one open question: How would we support metric "periods"? Do we want to support this within telegraf? A few problems that can arise:

  1. how do we separate metrics from different periods when they get sent onto output plugins?
  2. how do we prevent users shooting themselves by storing too many metrics in memory to calculate periods?
  3. what would be the resulting timestamp of the aggregated metric?

My preference is not to support periods, and instead only calculate running cumulative metrics. This is the way that statsd does it, for example, and I think it's fair to leave the calculation of metric periods up to queryable datastores (influxdb, prometheus, etc.)

sparrc commented 7 years ago

We will also need to consider how we are going to handle metrics that are already counters.

For example, in the net plugin there are fields for bytes_recv and bytes_sent. These fields are already supplied as summed counters and are always increasing. For that reason they will very quickly overflow if we try tracking them in a histogram (sum will become huge very quickly).

For this feature, we might need to first begin adding statsd-style types to metrics (ie, counters, gauges, etc). This won't be too hard for system metrics, but will be challenging for other plugins.

From the outset we would likely need to simply assume that all metrics are gauges unless specified otherwise by the input plugin.

alimousazy commented 7 years ago

@sparrc don't worry about memory consummation since I'm using streaming algorithm which always has fixed size memory usage based on the number of buckets https://www.vividcortex.com/blog/2013/07/08/streaming-approximate-histograms/.

I can only support one period (every 1 minute for example ), multi period is tricky.

I'm assuming that all the metric are gauges from source but adding support for other metric type can be done also.

One of the problem I had with toml that Histogram should only have one instance. I'm not sure how to represent this

[[filter.histogram]]
  bucketsize = 20
  flush_interval = "30s"
  rollup = [
    "(Name interface_rollup) (Tag interface en*) (Functions mean 0.90) (Pass true)",
    "(Name cpu_rollup) (Measurements cpu*) (Functions mean sum)",
    "(Name disk_rool) (Tag fstype hfs) (Functions 0.90)",
  ]

TOML version (Not working)

[[filter.histogram]]
  bucketsize = 20
  flush_interval = "30s"
  [[filter.histogram.rollup1]]
    name=interface_rollup
    tag=interface en* 
    functions=[mean, 0.90]
    pass= true
  [[filter.histogram.rollup2]]
    name=bal2
    tag=tag2
    functions=[mean, 0.90, sum]
    pass= true
sparrc commented 7 years ago

I believe the correct form would be:

[[filter.histogram]]
  bucketsize = 20
  flush_interval = "30s"
  [[filter.histogram.rollup]]
    name="interface_rollup"
    tag="interface en*"
    functions=["mean", 0.90]
    pass= true
  [[filter.histogram.rollup]]
    name="bal2"
    tag="tag2"
    functions=["mean", 0.90, "sum"]
    pass= true
alimousazy commented 7 years ago

@sparrc I will change the config to TOML during the weekend, could you please review the code and let me know if there is extra stuff to be done.

alimousazy commented 7 years ago

@pauldix do you agree on the above ?

sparrc commented 7 years ago

@alimousazy I would prefer if your implementation looked like the one that I wrote earlier:

[[filters.histogram]]
  ## measurements to calculate histogram data for
  ## only one of measurement_include & measurement_exclude should be defined
  measurement_include = ["*"]
  # measurement_exclude = []
  ## fields to calculate histogram data for
  ## only one of field_include & field_exclude should be defined
  field_include = ["*"]
  # field_exclude = []
  ## If true, drop the original metric field(s), only sending the aggregates
  ## to output plugins.
  drop_original = false
  ## Histogram functions to calculate for each field
  functions = ["min", "max", "first", "last", "sum", "count", "stddev"]
  ## quantiles to collect for each metric field.
  quantiles = [0.50, 0.95, 0.99]
  [[histogram.tagdrop]]
    cpu = ["cpu0", "cpu1", "cpu2"]

if you wanted to make that [[filter.histogram]] that would be fine as well.

The reason I say this is because you can define multiple [[histogram]] instances, so there isn't much point to defining multiple "rollups" within a single histogram, the user can just define multiple "histograms".

note that it should also support tagdrop in the same form as input/output plugins.

sparrc commented 7 years ago

You can also add support for a period to be added to that, but the period should support aggregating continuously from the beginning as well, rather than only within specified periods.

And the last thing is that we still haven't answered when the timestamp should be set to? the end of the period? the middle? I'd like to see what other similar products do for this.

daviesalex commented 7 years ago

@sparrc Given that this is going to require a fairly detailed config, could we not make that configurable? Per http://stackoverflow.com/questions/23847332/rrdtool-outputs-wrong-numbers that is what rrdtool does. (-t argument). Manually inspecting some of our ganglia .rrd files (i.e rrdtool dump /path/hostname/load_one.rrd) shows that ganglia is snapping to the "end" timestamp of the period for downsampled data, so (to us) that seems like a sensible default.

sparrc commented 7 years ago

sure, config option would work, I agree that end seems like the best default

alimousazy commented 7 years ago

@sparrc just to answer your question:- 1- To get the best performance there should be one instance of Histogram, Imagine the number of message passing that should happen if there are 5 instances in addition of timers . Unless there is a way to melt down all the instances config to one big config and pass it to Histogram (Filter) which will make all the other filters has the same treatment Multi-config -> One instance (If you agree on this model I will be more than happy to do it) . I assume that blocking channel and timers has a performance impact, based on your experience what you think ? 2- I can introduce flushing period per rollup to the configuration but it must be multiplayer of minute since it easy to implement that way and it does require only a one-minute timer. 3- Currently I'm using the flush time as the rollup metric time same as statsd.

sparrc commented 7 years ago

1- To get the best performance there should be one instance of Histogram, Imagine the number of message passing that should happen if there are 5 instances in addition of timers . Unless there is a way to melt down all the instances config to one big config and pass it to Histogram (Filter) which will make all the other filters has the same treatment Multi-config -> One instance (If you agree on this model I will be more than happy to do it) . I assume that blocking channel and timers has a performance impact, based on your experience what you think ?

I would say the main performance hits are: (1) you need to keep a histogram object per-rollup and (2) you need to check every metric per-rollup. Either way, you need to do both of these things, so for me it's OK to define multiple histograms. The number of channels that the metric passes through shouldn't have a very large impact as they are quite lightweight data structures. Timers also do not need to block metrics.

The metrics should pass through uninhibited, just matched and their value added.

2- I can introduce flushing period per rollup to the configuration but it must be multiplayer of minute since it easy to implement that way and it does require only a one-minute timer.

I'm not sure I understand your reasoning here.....timers are not expensive, so it should be fine to have it based on seconds.

3- Currently I'm using the flush time as the rollup metric time same as statsd.

I don't think that "flush" interval should be applied to histograms, it's more like a histogram "period"

sparrc commented 7 years ago

see here for discussion of filter plugins (a pre-requirement for supporting histograms): https://github.com/influxdata/telegraf/issues/1726

jasonkeller commented 7 years ago

Would this support blobbing and derivatives? I've got a looking glass project on my plate where I'll be querying roughly 1600-1800 devices via SNMP, pulling the octect counters and graphing a time-based derivative to show interface bandwidth for each device. However, a request also came in asking to retain a daily average of said bandwidth for two years to glean trend lines (again, on each separate device) :( . Could this potentially cover that requirement?

danielnelson commented 7 years ago

@taishan69 You could have InfluxDB or Kapacitor run a continuous query: https://docs.influxdata.com/influxdb/v1.2/guides/downsampling_and_retention/

jasonkeller commented 7 years ago

I don't think I can use Kapacitor with Elasticsearch as a backend, or did I misread the documentation?

danielnelson commented 6 years ago

The histogram aggregator #2387 contributed by @vlamug has been merged for version 1.4.0. Another aggregator to keep an eye on is the "basic stats" aggregator #2167.