influxdata / flux

Flux is a lightweight scripting language for querying databases (like InfluxDB) and working with data. It's part of InfluxDB 1.7 and 2.0, but can be run independently of those.
https://influxdata.com
MIT License
765 stars 154 forks source link

Proposal – Time Interpolation #2428

Open sanderson opened 4 years ago

sanderson commented 4 years ago

I've talked with three users just this week who want/need to be able to generate points at given intervals if no data exists – time interpolation. I've been thinking through possible implementations and have the following proposals:

Proposal 1 - Create an interpolate() function

|> interpolate(every: 10s)

This function would essentially "window" the data by the specified duration and check to see if a point exists in each window. If the window is empty, it inserts a row with a _time value calculated from the right bound of the window. All columns that are in the group key inherit the key values. All columns that are not in the group key are set to null.

You could then use fill() to replace the null values. I don't think we could fill as part of the interpolate() function because we don't necessarily know what or how many columns will be null in the output. So it's best left to the user to fill the columns as they need to.

Proposal 2 - Add interpolation param(s) to fill()

fill(
  column: "_value",
  value: "value"
  usePrevious: false,
  interpolate: true,
  interpolateEvery: 10s
)

This solution would add interpolate and interpolateEvery parameters to the fill() function. As in proposal 1, it would "window" the data by the specified interpolateEvery duration and check to see if a point exists in each window. If the window is empty, it inserts a row with a _time value calculated from the right bound of the window and fills the specified column with the specified fill value. All columns that are in the group key inherit the key values. All columns that are not the specified fill column and are not in the group key are dropped from the table.

The downside of this approach is that the user may want to preserve columns that would get dropped in this process.

invernizzie commented 4 years ago

Is there an update on this proposal? This is a much needed feature when joining data, since there's no "outer" join method either.

KoenWindey commented 4 years ago

Highly recommended function!! Particularly for set ups where you only record data 'on change' to preserve bandwith. It results in big gaps in the data by design, but then impacts on many mathematical operations if you can't recreate the 'missing' data on the fly.

Seems people at Timescale understand this very well: https://docs.timescale.com/latest/tutorials/tutorial-howto-visualize-missing-data-grafana

Cheers K

ggear commented 3 years ago

+1, this would be a great feature and quite necessary

giter commented 3 years ago

+1, quite necessary

ryo0ka commented 3 years ago

+1 I'm surprised this is not an option.

pbegg commented 3 years ago

+1

tedblad commented 3 years ago

+1

fredmef commented 3 years ago

+1, this would be a major feature and necessary

collinmcnulty commented 3 years ago

I think this is a critical feature. I have written up a small but challenging test case. Note that in this case the interpolation interval (how frequently result points are spaced out) and the interpolation radius (how far a raw point can be away from the result point while still being considered) are the same at 5 minutes.

Time Raw Data
1:59 6
2:01 5
2:02 1
2:07 3
2:21 4
2:25 2
2:29 0.7

Interpolate data with 5 minute interpolation interval, 5 minute interpolation radius, from 2:00 to 2:35:

Time 5 Min Interpolated Value Explanation
2:00 5.5 Interpolated b/w 1:59 and 2:01, note point outside query interval is considered
2:05 2.2 Interpolated b/w 2:02 and 2:07
2:10 3 Only one point within interpolation radius, 2:07
2:15 nan No points within interpolation radius
2:20 4 Only one point within interpolation radius, 2:21
2:25 2 Exact point available
2:30 0.7 Only one point within interpolation radius
2:35 nan No points within interpolation radius
igabe79 commented 3 years ago

+1 much needed here as well!

konstk1 commented 3 years ago

+1

tillsc commented 3 years ago

I did a small workaround with an union/pivot (some sort of "outer join") of two querys: https://gist.github.com/tillsc/2d71f3e89409049833e75ed22689ad40

Maybe this helps someone in the meantime... Use "staircase" plotting mode (not the "Step" plotting mode of the influx ui itself!).

nathanielc commented 3 years ago

We have added a first implementation of an interpolation function here https://github.com/influxdata/flux/blob/master/stdlib/interpolate/interpolate.flux It does simple linear interpolation on a regular time interval. We have plans to implement other types of interpolation as well.

For now that is a link to the source code with a comment that has some docs. Public docs are in progress.

empz commented 3 years ago

I'm also very interested in this. Both TimescaleDB and Azure Data Explorer (which is not even an actual time-series DB) can do this. It's extremely useful for systems that do delta-logging (storing values only on change to save bandwidth and storage costs).

Besides linear interpolation, I'd like to propose having the option to do backward and forward fill interpolation.

Looking forward for this to be available on InfluxDB Cloud.

414501408 commented 3 years ago

This did take me a lot of time, but eventually I managed to solve it with some ideas to share with you

step1: search the last data in your database , and then store it in a hashmap like this: from(bucket:"dcs_data") |> range(start: -2000000h, stop: 1621526400) |> filter(fn: (r) => r["_measurement"] == "dcs_real") |> filter(fn: (r) => r["_field"] == "value") |> filter(fn: (r) => r["point"] == "TLLGHG1B051002402PSV" or r["point"] == " TLLGHG1B0101036010FQ") |> group(columns: ["point"]) |> last()

in my example , I want to get the last value of 2 points before the stop timestamp ( which time is the next query of start time of your query)

step2: white your query in range ( start: .... , stop: ....)

from(bucket:"dcs_data") |> range(start: 1621526400, stop: 1622533567) |> filter(fn: (r) => r["_measurement"] == "dcs_real") |> filter(fn: (r) => r["_field"] == "value") |> filter(fn: (r) => r["point"] == "TLLGHG1B051002402PSV" or r["point"] == " TLLGHG1B0101036010FQ") |> window(every: 60000s, createEmpty:true) |> mean() |> duplicate(column:"_stop", as:"_time") |> window(every:inf) |> fill(column: "_value", usePrevious: true) |> group(columns: ["point"]) |> sort(columns:["_time"], desc: false) |> limit(n:99999999)

you may find the fill option (usePrevious: true) is not working, because it is mentioned in documents: image

it is Terrible!but you can use your data get from. STEP1, write some code to fill it your self

By The Way: I usually got nothing , when I use range(start: xxx, stop: xxx) to query data, when no data in this time arrange, not every time, but too often... Although I Added (createEmpty:true) in my query Window still got nothing

I think this maybe a new BUG I found

after a lot of tests, I find another way to solve it, I use |> timeShift(duration: xxxx) instead of stop in range

so, I rewrote code in STEP2 to:

from(bucket:"dcs_data") |> range(start: 1621526400) |> filter(fn: (r) => r["_measurement"] == "dcs_real") |> filter(fn: (r) => r["_field"] == "value") |> filter(fn: (r) => r["point"] == "TLLGHG1B051002402PSV" or r["point"] == " TLLGHG1B0101036010FQ") |> timeShift(duration: -1036800s) |> window(every: 60000s, createEmpty:true) |> mean() |> duplicate(column:"_stop", as:"_time") |> window(every:inf) |> fill(column: "_value", usePrevious: true) |> group(columns: ["point"]) |> sort(columns:["_time"], desc: false) |> limit(n:99999999)

every thing seems work for me

pseudocoder commented 3 years ago

Unless I'm mistaken, previous comment doesn't interpolate?

Repeating the same value across successive time windows is straightforward using fill, but then you get a step function not interpolation which is what this topic is about.

Scrumplex commented 2 years ago

I use this:

from(bucket: "hass")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "W")
  |> filter(fn: (r) => r["entity_id"] == "smart_power_plug")
  |> filter(fn: (r) => r["_field"] == "value")
// let's pretend we are interpolating
  |> aggregateWindow(every: 1m, fn: last, createEmpty: true)
// fill nulls generated by aggregateWindow with previous
  |> fill(column: "_value", usePrevious: true)
  |> aggregateWindow(every: v.windowPeriod, fn: mean)
  |> yield(name: "mean")

It's slow with large queries AND inaccurate with smaller queries. So the worst of both worlds.

sanderson commented 2 years ago

@Scrumplex since this issue was create, the interpolate package was introduced and provides interpolate.linear() for linear interpolation.

Scrumplex commented 2 years ago

@Scrumplex since this issue was create, the interpolate package was introduced and provides interpolate.linear() for linear interpolation.

Sadly that's not what I am looking for. I would rather have it interpolate nulls and deal with it with fill instead.

vicent-dev commented 2 years ago

Is the issue still open or the main problem can be fixed using interpolate.linear()? I don't know how could I fill nulls with that function because as far as I can see interpolate.linear() will generate more rows in the measurement instead of fill null values. I can't imagine a way to combine those functions together to achieve the desired result.

Thanks.

fidovo commented 2 years ago

finally we need this +1

TicoM1 commented 2 years ago

Too bad the original proposal wasn't considered. Finding the correct timestamps is the issue, not filling the new data with values. So I'd have loved to see interpolate.null() alongside interpolate.linear(). Espacially as the fill() documentation explicitly serves that issue on a silver platter already.

If I'm not mistaken even if I'm only missing data points due to "drop data point with non-changing values" I still have to fix my unregularly timed data to a regular interval (that I have to decide on manually) via aggregation on group by time(interval) and then fill(previous).

Good probability I'm missing something here, but I expected this to be an everyday use case, that deserves an easy 1-2 liner in a temp database.

srittau commented 2 years ago

A null interpolation would be very welcome for working with Grafana time series graphs. Currently, when using solid line style, it will always connect the data points. The only way to suppress this is to use null data points. interpolate.null() would be ideal to fill these missing data points.

M0LTE commented 2 years ago

Yet another backwards step in Flux vs InfluxQL

Bostwickenator commented 1 year ago

I too would really like a interpolate.null() concept

andrus801 commented 1 year ago

+1 for "interpolate.null()"

DenisMir commented 1 year ago

Is there still no solution for that one here?

The linear function is not enough.

In the end we get graphs like:

test

The first available data point is out of the time window. So linear interpolation doesn't help at all here.

This was no problem with InfluxDB1. WTF.

Danoxv commented 1 year ago

@DenisMir, how did you fill in the blank spaces between the values??

gabrielmocan commented 1 year ago

Any progress on that matter?

twosdai commented 1 year ago

Just wanted to post my workaround in case others find it useful. I use InfluxDB as a backend DB which is wrapped by an API. If you're looking to just use influx directly this method probably isn't a good idea since it requires that you have an application client calling influx, and then you return that data to a different process. In my case I'm using the NodeJS client lib

In code solution step Basically if the response from your query doesn't contain any data you'd need to do something like the following in your program afterwards

           // Basic Client setup example
         const client = new InfluxDB({ url:'myInfluxURL', token: "myToken", timeout: 60000 })
        const query = ` some really cool query with aggregationIntervals `; // This would be your query
        const queryAPI = client.getQueryApi("myOrg");
        const data = queryApi.collectRows<any>(query);

        // Actual work around logic
                if (data.length === 0) {
                    const start = new Date(startTime); // The startTime of the query
                    const end = new Date(endTime); // The endTime of the query
                    const interval = end.getTime() - start.getTime(); // The time in ms between them
                    const intervalInMilliseconds = aggregationIntervalInMS[argumentAgg]; // Convert the aggregation interval chosen to ms, I used a static mapping since my aggregation intervals were well known and only a subset of all the other ones. 
                    const numberOfIntervals = Math.ceil(interval / intervalInMilliseconds); // Get the number of intervals
                    const aggregatedResults = [];
                    for (let i = 0; i < numberOfIntervals; i++) {
                    // You may want to change this result to include any relevant metadata you want assigned to each interval. 
                        aggregatedResults.push(
                                 {
                                value: '0',
                                startTime: new Date(start.getTime() + i * intervalInMilliseconds).toISOString(),
                                endTime: new Date(start.getTime() + (i + 1) * intervalInMilliseconds).toISOString(),
                            }
                        );
                    }
                    return aggregatedResults
                }

The aggregationIntervalInMS mapping I have is simply the following:

{

    hour:3600000,
    day:86400000,
    month:2592000000,

}

I think in specific use cases this can help fill in data if you're looking to wrap influx.

SindreBrurberg commented 7 months ago

If it is of interest to anyone, This is the closest i managed to get with a update on change type data stream. Example is with data from homeassistant.

from(bucket: "home_assistant")
  |> range(start: v.timeRangeStart, stop: v.timeRangeStop)
  |> filter(fn: (r) => r["_measurement"] == "%")
  |> filter(fn: (r) => r["_field"] == "value")
  |> filter(fn: (r) => r["domain"] == "sensor")
  |> filter(fn: (r) => r["entity_id"] == "meter_plus_8fe5_humidity")
  |> aggregateWindow(every: 10s, fn: mean, createEmpty: true)
  |> fill(usePrevious: true)
  |> filter(fn: (r) => r._value > 0)
  |> yield(name: "mean")

It still can't fix the missing data before the time range, however it does a good job at representing the inactivity / staircase of data that does not change linearly.

github-actions[bot] commented 4 months ago

This issue has had no recent activity and will be closed soon.

srittau commented 4 months ago

This is still an unsolved problem, closing it would be premature.

Zincr0 commented 3 months ago

Four years and counting. Commenting here so the bot doesn't close this (still relevant) issue.

github-actions[bot] commented 1 month ago

This issue has had no recent activity and will be closed soon.

srittau commented 1 month ago

This is still an unsolved problem, closing it would be premature.