timescale / timescaledb-toolkit

Extension for more hyperfunctions, fully compatible with TimescaleDB and PostgreSQL 📈
https://www.timescale.com
Other
364 stars 46 forks source link

interpolated_average can't pull values through empty buckets #548

Open emanuel-joos opened 1 year ago

emanuel-joos commented 1 year ago

Relevant system information:

Describe the bug Time weighted average fails if buckets without values exist. It just ignores buckets without data (output has less rows than expected) and for buckets with data that are adjacent to buckets without data it computes wrong averages.

To Reproduce

ts, signal_id, value
('2019-12-31T23:10:00', 8, 10);
('2020-01-01T03:10:00', 8, 20);
('2020-01-01T03:50:00', 8, 40);
('2020-01-01T00:00:00', 9, 10);
('2020-01-01T03:10:00', 9, 20);
('2020-01-01T03:50:00', 9, 40);

Workaround with:

with t1 as (
    SELECT signal_id, ts, measurement_value
    From measurements
    Where ts >= '{datetime_from}' AND ts < '{datetime_to}' And signal_id in ({signals})
),
t2 as (SELECT
    time_bucket_gapfill('{resampled_interval} s', ts) AS time_value_gap_fill,
    signal_id,
    locf(last(measurement_value,ts)) as locf_value,
    last(measurement_value, ts) as indicator_empty_bucket
    From measurements
    Where ts BETWEEN '{fall_back_time}' AND '{datetime_to}' and signal_id in ({signals})
    GROUP BY signal_id, time_value_gap_fill),
t3 as (
    select signal_id, time_value_gap_fill as ts, locf_value as measurement_value
    from t2 where indicator_empty_bucket is Null
    and time_value_gap_fill >= '{datetime_from-datetime.timedelta(seconds=resampled_interval)}'
),
t4 as (
select * from t3 union all select * from t1 order by ts),
t5 as (
    SELECT signal_id, time_bucket('{resampled_interval} s', ts) AS interval_label ,
     time_weight('LOCF', ts, measurement_value) AS tws
     FROM t4
    GROUP BY signal_id, interval_label
),
t6 as (
    SELECT interval_label, signal_id,
        toolkit_experimental.interpolated_average(
            tws,
            interval_label,
            '{resampled_interval} s',
            LAG(tws) OVER (Partition By signal_id ORDER BY interval_label),
            LEAD(tws) OVER (Partition By signal_id ORDER BY interval_label)) as time_weighted_average
            from t5
)
Select interval_label +'{resampled_interval} s' ,
 signal_id,
 time_weighted_average from t6 where interval_label < '{datetime_to}'
  and interval_label > '{datetime_from-datetime.timedelta(seconds=resampled_interval)}';

This should work out of the box. The workaround query also works for multiple signals.

davidkohn88 commented 1 year ago

So what you'd like to do, I suppose is be able to do something like:

SELECT
    signal_id,
    time_value,
    toolkit_experimental.interpolated_average(
        tws,
        time_value,
        '3600 s',
        LAG_IGNORE_NULLS(tws) OVER (Partition By signal_id ORDER BY time_value),
        LEAD_IGNORE_NULLS(tws) OVER (Partition By signal_id ORDER BY time_value)
    )
    FROM (
    SELECT
        signal_id,
        time_bucket_gapfill('3600 s', ts) AS time_value,
        time_weight('LOCF', ts, measurement_value) AS tws
    FROM measurements
    Where ts BETWEEN '2020-01-01T00:00:00' AND '2020-01-01T05:00:00'
    And signal_id = 9    
    GROUP BY signal_id, time_value
    ) t

In the gapfill query, you'd end up with null values where there was no data and then when you use the window functions, they'd go back to the last populated value in order to calculate the result and roll it forward. So this way works for now, but this would definitely be easier and should be something we work on at some point, as the 7 step approach feels a bit harder :joy:. (But I'm glad it worked as a workaround!) Did I capture that correctly?

Another, possibly better option could be to work on getting a new interpolation function into gapfill so you could do it directly there, that might be the simplest way from a UX perspective ie:

 SELECT
        signal_id,
        time_bucket_gapfill('3600 s', ts) AS time_value,
        interpolated_average(time_weight('LOCF', ts, measurement_value), '3600 s')
    FROM measurements
    Where ts BETWEEN '2020-01-01T00:00:00' AND '2020-01-01T05:00:00'
    And signal_id = 9    
    GROUP BY signal_id, time_value

However, that will be more difficult to implement, and, at least as of now, would only work with time_bucket_gapfill

emanuel-joos commented 1 year ago

Thank you for your answer.

Do i understand you correctly that the methods LAG_IGNORE_NULLS and LEAD_IGNORE_NULLS are not yet existing ?

Yes up to now it looks like, that our approach can capture the wanted behavior.

Yes, something like your last query would be very useful :-).

jmcarter17 commented 1 year ago

I have the same problem, My current workaround is to use a 3 steps approach using time_bucket_gapfill first, and then use time_weight on the gapfilled data.

Example:

with t1 as (
SELECT
    time_bucket_gapfill('60 s', time) AS dt, 
    locf(last(value, time)) as value
FROM {storename}.sensor_data
WHERE master_key = {sensor_id}
AND time >= '{first_day}'
AND time <= '{today}'
GROUP BY dt
ORDER BY dt
), t2 as (
SELECT 
    time_bucket('15 minutes'::INTERVAL, dt, '{today}'::TIMESTAMPTZ) as ts,
    time_weight('locf', dt, value) as tw
FROM t1
GROUP BY ts
)
SELECT time, average(tw)
FROM t2 ORDER BY ts
davidkohn88 commented 1 year ago

Do i understand you correctly that the methods LAG_IGNORE_NULLS and LEAD_IGNORE_NULLS are not yet existing ?

Yes, something like your last query would be very useful :-).

Yes. those don't exist, they're something we'd think about introducing in order to do this. Or the other thing, we'll have to see where the tradeoff falls for how we'd do that.

davidkohn88 commented 1 year ago

I have the same problem, My current workaround is to use a 3 steps approach using time_bucket_gapfill first, and then use time_weight on the gapfilled data.

Example:

with t1 as (
SELECT
    time_bucket_gapfill('60 s', time) AS dt, 
    locf(last(value, time)) as value
FROM {storename}.sensor_data
WHERE master_key = {sensor_id}
AND time >= '{first_day}'
AND time <= '{today}'
GROUP BY dt
ORDER BY dt
), t2 as (
SELECT 
    time_bucket('15 minutes'::INTERVAL, dt, '{today}'::TIMESTAMPTZ) as ts,
    time_weight('locf', dt, value) as tw
FROM t1
GROUP BY ts
)
SELECT time, average(tw)
FROM t2 ORDER BY ts

Unfortunately, this does lose some information because you're downsampling with last first, it also is probably a bit less efficient because to compensate for that you're doing a smaller bucket with gapfill which then means you may need to process more data. But it could very well work for some folks! But we'll see what we can do so that neither of these workarounds are necessary!

emanuel-joos commented 1 year ago

@davidkohn88 Thanks a lot for your answer. Keep me updated if you have news considering that.

davidkohn88 commented 1 year ago

Looking at this again, I think I found a more efficient workaround:


with 
t1 as (SELECT
    time_bucket_gapfill('{resampled_interval} s', ts) AS tsl,
    signal_id,
    locf(last(measurement_value,ts)) as locf_value,
   time_weight('LOCF', ts, measurement_value) AS tws
    From measurements
    Where ts BETWEEN '{fall_back_time}' AND '{datetime_to}' and signal_id in ({signals})
    GROUP BY signal_id, time_value_gap_fill),
t2 as (
    select signal_id, ts, time_weight('LOCF', ts, locf_value) AS tws
    from t1 where tws IS NULL
    and time_value_gap_fill >= '{datetime_from-datetime.timedelta(seconds=resampled_interval)}'
),
t3 as (
select signal_id,  ts, tws from t1 WHERE tws IS NOT NULL 
UNION ALL
select signal_id,  ts, tws from t2),

t4 as (
    SELECT ts, signal_id,
        toolkit_experimental.interpolated_average(
            tws,
            ts,
            '{resampled_interval} s',
            LAG(tws) OVER (Partition By signal_id ORDER BY interval_label),
            LEAD(tws) OVER (Partition By signal_id ORDER BY interval_label)) as time_weighted_average
            from t3
)
Select ts + '{resampled_interval} s' ,
 signal_id,
 time_weighted_average from t4 where ts < '{datetime_to}'
  and ts > '{datetime_from-datetime.timedelta(seconds=resampled_interval)}';

Was working on this with Tara, I think she's gonna drop a slightly simpler version here as well, but wanted to have this and see if it worked for you @emanuel-joos

tlarrue commented 1 year ago

I had the same issue. I have customer cellular data sessions info that stream in as irregularly sampled byte rates. And, I often want to pull down the total amount of bytes used within a regular interval.

I was able to write a super inefficient query just using time_bucket & locf that essentially upsampled to 1-second intervals, only to downsample to larger intervals:

WITH  

-- transform to rate data uniform at 1-second intervals without gaps
rate_seconds AS (
SELECT
    device_id,
    time_bucket_gapfill('1 s', time) AS time, 
    locf(LAST(bytes_per_second, time)) AS bytes
FROM public.sample_usage
GROUP BY 1,2
ORDER BY 1,2
)

-- downsample to intervals over 1-second & sum per-second rates 
SELECT
    device_id, 
    time_bucket('10 minute', time) AS time,
        SUM(bytes) AS bytes
FROM rate_seconds
GROUP BY 1,2
ORDER BY 1,2;

After working with @davidkohn88, he came up with the following workaround using interpolated_integral & separating out "non-value" rows, which is MUCH faster to run, by which I mean, the old one took about 46 minutes to run and this one takes seconds at most:

WITH  t1 as(

SELECT
   device_id,
   time_bucket_gapfill('10 minute', time) AS time,
   locf(last(bytes_per_second, time)) as locf_value,
   last(bytes_per_second, time) as indicator_empty_bucket, 
   time_weight('locf', time, bytes_per_second) AS bps
FROM public.sample_usage
GROUP BY 1,2), 

t2 as(

SELECT 
    device_id, 
    time, 
    time_weight('locf', locf_value) as bps
FROM t1 
WHERE indicator_empty_bucket IS NULL
GROUP BY 1, 2),

t3 as(
SELECT 
   device_id, 
   time, 
   bps 
FROM t1 
WHERE indicator_empty_bucket IS NOT NULL
UNION ALL
SELECT device_id, time, bps FROM t2)

SELECT
    device_id,
    time,
    toolkit_experimental.interpolated_integral(
        bps,
        time,
        '10 minute',
        LAG(bps) OVER (Partition by device_id ORDER BY time),
        LEAD(bps) OVER (Partition by device_id ORDER BY time),
        'seconds'
    )
FROM t3;