influxdata / influxdb

Scalable datastore for metrics, events, and real-time analytics
https://influxdata.com
Apache License 2.0
28.86k stars 3.55k forks source link

timeWeightedAvg should not be greater than maximal value / integral should not extrapolate based on incline #23929

Open bnord01 opened 1 year ago

bnord01 commented 1 year ago

Steps to reproduce: Consider the following query,

import "date"
import "array"

t0 = today()
t1 = date.add(to: t0, d: 11h)
t2 = date.add(to: t0, d: 12h)
t3 = date.add(to: t0, d: 13h)
end = date.add(to: t0, d: 1d)

input = array.from(rows: [{_time: t1, _value: 1},{_time: t2, _value: 0},{_time: t3, _value: 1}])

input
    |> range(start: t0, stop: end)
    |> yield(name: "input")
    |> timeWeightedAvg(unit: 1s)
    |> duplicate(column: "_start", as: "_time")
    |> yield(name: "timeWeightedAvg")

input
    |> range(start: t0, stop: end)
    |> window(every: 1d)
    |> integral(unit: 1d, interpolate:"linear")    
    |> duplicate(column: "_start", as: "_time")
    |> yield(name: "integral")

Expected behavior: The timeWeightedAvg should not be greater than the maximal input value and either be computed only within the existing data points or extrapolate more reasonably and not as sensitive on fluctuations on the boundary values.

Actual behavior: The timeWeightedAvg/integral produces nonsensical values, 6 in the above example even though all values are between 0 and 1. It seems integral extrapolates based on the incline of the boundary values making it extremely sensitive to fluctuations.

grafik

This gets worse if the fluctuations at bounding data points happen in a shorter timespan. If we make the window during which the data points are 0 bigger the integral explodes:

t0 = today()
t1 = date.add(to: t0, d: 11h)
t2 = date.add(to: t0, d: 11h1s)
t3 = date.add(to: t0, d: 13h)
t4 = date.add(to: t0, d: 13h1s)
end = date.add(to: t0, d: 1d)

input = array.from(rows: [{_time: t1, _value: 1},{_time: t2, _value: 0},{_time: t3, _value: 0},{_time: t4, _value: 1}])

This yields a timeWeightedAvg of 18150 even though all data points are still between 0 and 1 and the function is 0 for most of the time while there is data.

grafik

This makes the timeWeightedAvg and integral mostly useless in scenarios with unevenly distributed data points (like IoT data) where one usually would especially want a time weighted average.

Environment info:

alberanid commented 1 year ago

we are experiencing the same problem. Surely the correct solution is to fix the built-in timeWeightedAvg function.

In the meanwhile, we found this example of a supposedly correct implementation: https://github.com/influxdata/flux/issues/2891#issuecomment-677915219

Unfortunately it seems to be written for an old version of InfluxDB and I'm unable to adapt it; can someone help?

My current version is:

myTimeWeightedAvg = (tables=<-, unit) => tables
  |> reduce(fn: (r, accumulator) => {
    t = (int(v: r._time) - accumulator._time) / int(v:unit)
    va = r._value + accumulator.prev
    value = if accumulator.start then 0.0 else accumulator._value + 0.5 * va * float(v: t)
    total = if accumulator.start then 0 else accumulator.total + t
    return { start: false, _time: r._time, prev: r._value, total: total, _value: value }
  }, identity: { start: true, _time: 0, total: 0, prev: 0.0, _value: 0.0 })
  |> map(fn: (r) => ( { r with _value: if r.total == 0 then 0.0 else r._value / float(v: r.total)  } ))

from(bucket: "test3")
  |> range(start: 2022-12-02T15:15:00.000Z, stop: 2022-12-02T15:25:00.000Z)
  |> filter(fn: (r) => r["_measurement"] == "items")
  |> filter(fn: (r) => r["_field"] == "temp")
  |> filter(fn: (r) => r["address"] == "1/1/1")
  |> filter(fn: (r) => r["type"] == "dtp_1")
  |> window(every: 1m)
  |> myTimeWeightedAvg(unit: 1m)

but I get:

 runtime error @2:6-8:76: reduce: type conflict: int != time