timescale / timescaledb-toolkit

Extension for more hyperfunctions, fully compatible with TimescaleDB and PostgreSQL 📈
https://www.timescale.com
Other
364 stars 46 forks source link

[Enhancement]: Missing data in time-weighted averages #323

Open miguelteixeiragomes opened 2 years ago

miguelteixeiragomes commented 2 years ago

What type of enhancement is this?

API improvement

What subsystems and features will be improved?

Continuous aggregate

What does the enhancement do?

When you have timeseries (time, value) pairs that needs to be seen as a continuous value with LOCF interpolation, an aggregate based on time_weight does not produce the desired result since it does not fetch the value before the the begining of the bucket. This is especially noticeable when the aggregation interval is relatively close to the larger intervals between samples.

Implementation challenges

The only workaround I can think of is to use the continuous aggregates to produce the "nearly correct" result and then create a view on top of it that fetches the necessary data-point from before the bucket. However - although I've not tested it - I suspect this will a bit slow since for every aggregate point I still have to go through a non-aggregate index and heap to find one more row.

epgts commented 2 years ago

Hi,

Can you offer an example of what you think this query ought to look like and what kind of output you'd expect? And also an example of your workaround? I think I understand what you're after, but making it concrete would be a big help.

Thanks!

miguelteixeiragomes commented 2 years ago

My workaround would be something like this:

create table tsdata_raw (
    device_id int8,
    time timestamptz,
    value float8
);

select create_distributed_hypertable('tsdata', 'time', 'device_id');

create view tsdata_daily_almost with (timescaledb.continuous)
    as select 
            device_id,
            time_bucket(‘1 day’, time) as day,
            min(time) as first_ts,
            max(time) as last_ts,
            argmin(time, value) as first_val, -- custom aggregation functions
            argmax(time, value) as first_val,
            average(time_weight(value, time, 'locf')) as value
       from tsdata_raw
       group by device_id, day;

create view tsdata_daily as select
        device_id,
        day,
        value*(last_ts - min_ts) + last_val*(day + '1 day' - last_ts) + (first_ts - day)*(
            select value
            from tsdata_raw tsr
            where tsr.device_id = tsd.device_id and time < last_ts
            order by time
            limit 1
        ) as value
    from tsdata_daily_almost tsd

I've omitted some extract(epoch from ...) and type conversions, but that is the idea. The problem I see with it is that you still have to fetch a lot pages from the raw data/indices, and that data might even be compressed (I'd like to use a compression policy on the compressed data).

Ideally - I don't know whether it would be possible - the time_weight should have an option to handle this missing data. Alternatively, a different workaround that does no require access to the raw data could also work.

I think this example also clarifies what output I'm looking for (to answer your first question), but if it's not clear, please tell me and I can maybe make some plots with the samples and bins to explain better.

miguelteixeiragomes commented 2 years ago

Just a few minutes after posting the previous comment I realized that a much better workaround would be to apply the same concept of making a view but just do it all on the aggregate data, you can still find previous value there.

The difference is that you may have to fetch several values until you get non-null data, since some buckets may end up with no data at all. I can write up that work around later but I have to test it.

In any case, I would still welcome some other ideas or feedback on these ones.

WireBaron commented 2 years ago

We have a proposed workaround that should work for this in #440.

I realize it's been awhile, but we'd appreciate it if you could take a look at the issue and let us know if that solution would have solved this issue.

jerryxwu commented 2 years ago

@miguelteixeiragomes if you haven't noticed, we've just included a solution (see #440) in the latest 1.8.0 release to address the kind of issues that you were experiencing. We'd love to have you try it out and give us feedback 🙏

milgner commented 1 year ago

I'm currently working on a very similar issue and the query structure outlined in https://github.com/timescale/timescaledb-toolkit/issues/548#issuecomment-1284392569 seems to work very well.

Unfortunately in the data I'm processing there is a special case where null values mark times when no data was available. As such, the intervals should be shortened accordingly.

Example: the bucket size is 1h. Based on LOCF, the value 10 from the previous bucket is carried. At 0:15 in the current bucket there is a null value and at 0:30 the value is 20; the duration with data is now 45 minutes and the interpolated time-weighted average is 16.66.

After some reading of the toolkit source code it seems to me like this isn't really possible, with the current data structures: the LOCF function, when invoked on its own already has a treat_null_as_missing argument which sounds related. But in the TimeWeightSummary structure, there is only TSPoint whose value must always be f64 instead of Option<f64> which would probably be required here. I have a rough idea about how it could be implemented by extending the TimeWeightSummary type to support both Option<f64> and also adding an attribute for the duration for which there was data. This is (in a nutshell) how I implemented it in my own aggregate for testing purposes:

    fn state_transition(&mut self, elem: (i64, Option<f64>)) {
        self.update_state(&elem.0);
        self.last = elem;
    }

    fn update_state(&mut self, ts: &i64) {
        if let Some(prev_val) = self.last.1 {
            let interval = duration_secs(&self.last.0, ts);
            self.duration += interval;
            self.acc += prev_val * interval;
        }
    }

then, when calculating the integral, only acc is used and when calculating the average, I can decide whether to divide by the duration of the whole range or just by the duration for which there was data. But if preferable I'd like to avoid having to install my own aggregates in addition to TimescaleDB-provided ones. So I wonder whether you'd be interested in discussing an enhancement there?

Maybe as an inspiration to @miguelteixeiragomes :

I'm using a UNION ALL construct to pull in the data from before the processed timeframe:

WITH regular AS (SELECT ts, value
                 FROM "data"
                 WHERE "data"."ts" > '2022-08-19 11:00:00'
                   AND "data"."ts" <= '2022-08-20 21:00:00')
   , starting AS (SELECT ts, val
                  FROM "data"
                  WHERE "data"."ts" <= '2022-08-19 11:00:00'
                  ORDER BY "data"."ts" DESC
                  LIMIT 1)
   , relevant_data AS (SELECT * FROM starting UNION ALL SELECT * FROM regular)

(the rest of the query then follows the same pattern as in #548)

I'm not sure whether null-handling in interpolated aggregates is really the focus of this issue, so let me know if I should open a separate issue or discussion for this.

jerryxwu commented 1 year ago

@milgner, thank you for the great feedback! It would be very helpful if you could open a separate issue for this as it would be an enhancement to what we've already developed.