open-telemetry / opentelemetry-collector-contrib

Contrib repository for the OpenTelemetry Collector
https://opentelemetry.io
Apache License 2.0
2.9k stars 2.27k forks source link

[processor/interval]: time-based batching #34906

Open sh0rez opened 2 weeks ago

sh0rez commented 2 weeks ago

Component(s)

processor/interval

Is your feature request related to a problem? Please describe.

intervalprocessor exports all metrics strictly on interval. with sufficient scale, this poses challenges, as metrics are collected over e.g. 60 seconds and then flushed all at once, leading to spikes and silence, instead of a constant load on the network and receiving side.

Describe the solution you'd like

Distribute metrics export over the entire interval.

I suggest this "sharding" is done on the stream level, grouping the streams as such (pseudocode):

const interval = 60*time.Second
var streams [60]map[identity.Stream]metric.DataPoint

func ingest(in []metric.Stream) {
  for id, dp := range in {
    k := id.Hash() % 60
    streams[k][id] = dp
}

func export() {
  for ts := range time.Tick(time.Second) {
    k := ts.Seconds() % 60
    next.ConsumeMetrics(streams[k])
  }
}
github-actions[bot] commented 2 weeks ago

Pinging code owners:

ArthurSens commented 2 weeks ago

I wonder if we could discard old samples during ingestion. Like, do not store an array of datapoints, just replace the old one if we receive another with more recent timestamp 🤔

sh0rez commented 2 weeks ago

oh that's absolutely the case here, sorry if my pseudocode wasn't clear enough.

we are only storing the last datapoint per stream, but sharding our stored streams into 60 maps, so that we can flush one every second, instead of all every minute, more evenly distributing load over the course of a minute.

note we store streams in a map[identity.Stream]metric.DataPoint, which we have 60 of (as an array)

say you have a stream with id cbf29ce484222325, that would go into streams[17] (because 0xcbf29ce484222325 % 60 = 17). once the clock hits xx:xx:17, that set would be sent to the next pipeline step and cleared

RichieSams commented 2 weeks ago

Interesting.... I like the idea. For any given data stream, we're still aggregating at the given interval. But overall, we're doing flushes at interval / 60 (which could be configured) rate. To reduce the spikiness

crobert-1 commented 2 weeks ago

Issue filed by code owner, and another has voiced support. Removing needs triage with the understanding that discussion is still happening here.