timescale / timescaledb-toolkit

Extension for more hyperfunctions, fully compatible with TimescaleDB and PostgreSQL 📈
https://www.timescale.com
Other
383 stars 47 forks source link

Detect data gaps and track elapsed time in time_weight summaries #787

Open smyrman opened 11 months ago

smyrman commented 11 months ago

What's the functionality you would like to add

We would like bette handling of explicit data gaps for time_weight() summaries and accessors. That is, be able to exclude gaps in data within and between time buckets for the weighted sum, and let this affect the average, integral, interpolated_average and interpolated_integral accessors. For this exclusion to work, we must also track the elapsed time, as we can no longer assume it to be last_time - first_time.

Gaps can be detected in different ways. For us, we would like to declare a gap when there is more than duration D between two samples. There are other possible ways to detect gaps, such as look for explicit NULL values (which could e.g. be inserted by pre-processing the source time-series before aggregation). We don't mind which method is used as long as there is one method to detect them. There are some details here in how to treat the last value before a gap. Is it valid for duration D? Is it valid for a 0 duration because it's followed by a gap? Is this choice affected by the choice of LOCF or LINEAR algorithm? Let's not get into that just yet, but a choice needs to be made.

What matters is that when you then calculate the TimeWeightSummary, we would like for the detected gaps to affect the weighted sum field. We would also want to propose adding an additional field which accurately tracks the elapsed time where the time spent in gaps are omitted.

Finally, let average, integral, interpolated_average and interpolated_integral use the elapsed time value to report their numbers. Also allow an accessor for the elapsed_time value.

How would the function be used

Allow the same time-weight summaries to be used to cover more cases:

We could e.g. query for the average and elapsed time.

Why should this feature be added?

We have been struggeling to find a consistent way of dealing with gaps (not filling them, but not counting them neither), using the Timescale two-step aggregations, such as [compact_]state_agg and time_weight. Especially gaps within an evaluation bucket, but also between them (with interpolate), they must be handled correctly.

For us, we define a gap if the distance between two points is larger than a "gap detection" interval D, where D is configurable attribute per data-source. I.e. something similar to the heartbeat_agg, except we are interested in weighted sums and state aggregations, not the gaps themselves. Ideally, if using locf, we consider the last value as valid for duration D before a gap, while for linear, we would start the gap at the last point. At least this is what we thought makes the most sense..

To explain why we care, consider a time_weight solution that should work generically, either the system logs a temperature where we want a weighted average, or power consumption (W) that should be integrated into kWh. We can not rely on inserting fake values into the source signal if we want the same time weight summary to allow collecting good data for both. I.e. without elapsed time and gap-detection for for power consumption, if we consider the system to not draw power when it doesn't log data, we would need to insert 0 values before each gap in the source signal before aggregation. If we however do the same for temperature, then we draw the TWA closer to 0, which makes the result invalid.

A much better solution, in our view, is to not pre-insert any fake data, but rather add explicit gap detection and tracking of elapsed time.

What scale is this useful at?

Especially useful in general purpose applications for time-series rather than tailor made systems were you can do what you want.

Drawbacks

Handling backwards compatibility may be a challenge since the solution is proposing changes to the TimeWeightSummary structure?

Open Questions

Are any questions we'd need to address before releasing this feature?

One must decide how to detect gaps. E.g. either look for explicit NULL values or look for durations longer than a specific period D between two time-stamps.

For either alternative, one must decide how the last value before a gap should be counted. In our view, this means considering the last value before a gap valid for the gap detection duration if using the locf algorithm. However, this comes with additional challenges for accessors when the query is returning results in time buckets, as you would ideally want to split the final value of each bucket into one or more successive buckets, if the next gap is beyond the end of the bucket.

Alternatives

Use custom SQL aggregation functions and accessors.

smyrman commented 11 months ago

An SQL based proto-type for LOCF. assuming that the last value before a gap should be carried forward for the duration of the gapDetection setting. The sum does not count the last LOCF value carried forward; this must be handled by rollup and accessor methods.

An easier to use solution should store the method and gap detection in the TWS type, and refuse to combine results where the gap detection setting do not match, similar to what toolkit does today for the method parameter.

--- time-weight experiment

CREATE TABLE signal (time TIMESTAMPTZ, value DOUBLE PRECISION);

INSERT INTO signal VALUES ('2022-01-01T00:00:00Z', 1),  ('2022-01-01T02:00:00Z', 2), ('2022-01-01T02:30:00Z', 0);

CREATE TYPE tws AS (
    -- weighted sum
    ws DOUBLE PRECISION,
    -- elapsed duration
    d  INTERVAL,
    -- first point
    ft TIMESTAMPTZ,
    fv DOUBLE PRECISION,
    -- last point
    lt TIMESTAMPTZ,
    lv DOUBLE PRECISION
);

CREATE OR REPLACE FUNCTION  tws_add_locf(s TWS, ts TIMESTAMPTZ, value DOUBLE PRECISION, gap INTERVAL) RETURNS TWS AS $$
    DECLARE
        d INTERVAL;
    BEGIN
        IF s IS NULL
            THEN
                RETURN ROW(CAST(0 AS DOUBLE PRECISION), CAST('PT0S' AS INTERVAL), ts, value, ts, value);
            ELSE
                d := ts - s.lt;
                IF d > gap
                    THEN
                        d := gap;
                END IF;
                RETURN ROW(
                    s.ws + (s.lv*EXTRACT(epoch FROM d)),
                    s.d + d,
                    s.ft,
                    s.fv,
                    ts,
                    value);
        END IF;
    END
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE AGGREGATE tws_locf (ts TIMESTAMPTZ, value DOUBLE PRECISION, gap INTERVAL) (
    sfunc = tws_add_locf,
    stype = tws
);

CREATE OR REPLACE FUNCTION  tws_combine_locf(a TWS, b TWS, gap INTERVAL) RETURNS TWS AS $$
    DECLARE
        d INTERVAL;
    BEGIN
        CASE
            WHEN a is NULL
                THEN
                    return b;
            WHEN b is NULL
                THEN
                    return a;
            ELSE
                d := b.ft - a.ft;
                IF d > gap
                    THEN
                        d := gap;
                END IF;
                RETURN ROW(
                    a.ws + b.ws + (a.lv*EXTRACT(epoch FROM d)),
                    a.d + b.d + d,
                    a.ft,
                    a.fv,
                    b.lt,
                    b.lv);
        END CASE;
    END
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE AGGREGATE rollup_locf(s tws, gap INTERVAL) (
    sfunc = tws_combine_locf,
    stype = tws
);

SELECT time_bucket('PT3H', time) AS start_time, tws_locf(time, value, 'PT30M'::INTERVAL) AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;

SELECT  time_bucket('PT3H', start_time) as start_time2, rollup_locf(s, 'PT30M'::INTERVAL) FROM (SELECT time_bucket('PT1H', time) AS start_time, tws_locf(time, value, 'PT30M'::INTERVAL) AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;
smyrman commented 11 months ago

Second proto-type, storing gap detection setting and method in the summary, and also also handling linear aggregation (:

--- time-weight experiment

CREATE TABLE signal (time TIMESTAMPTZ, value DOUBLE PRECISION);

INSERT INTO signal VALUES ('2022-01-01T00:00:00Z', 1),  ('2022-01-01T02:00:00Z', 2), ('2022-01-01T02:30:00Z', 0);

CREATE TYPE tws_interpolation AS ENUM('linear', 'locf');
DROP TYPE tws CASCADE;
CREATE TYPE tws AS (
    -- Identify aggregation type.
    m tws_interpolation,
    gap DOUBLE PRECISION,
    -- Time-weighted sum.
    ws DOUBLE PRECISION,
    -- Aggregated duration.
    d  DOUBLE PRECISION,
    -- First point.
    ft TIMESTAMPTZ,
    fv DOUBLE PRECISION,
    -- Last point.
    lt TIMESTAMPTZ,
    lv DOUBLE PRECISION
);

CREATE OR REPLACE FUNCTION  add_to_tws(s TWS, ts TIMESTAMPTZ, value DOUBLE PRECISION, gap_seconds DOUBLE PRECISION, method tws_interpolation) RETURNS TWS AS $$
    DECLARE
        d DOUBLE PRECISION;
    BEGIN
        CASE
            WHEN s IS NULL
                THEN
                    RETURN ROW(method, gap_seconds, CAST(0 AS DOUBLE PRECISION), CAST(0 AS DOUBLE PRECISION), ts, value, ts, value);
            WHEN s.m != method
                THEN
                    RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'interpolation method must match aggregation state interpolation method';
        WHEN s.gap != gap_seconds
                THEN
                    RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'gap detection must match aggregation state';
        WHEN method = 'locf'
            THEN
                d := EXTRACT(epoch FROM (ts - s.lt));
                IF d > gap_seconds
                    THEN
                        d := gap_seconds;
                END IF;
                RETURN ROW(
                    method,
                    gap_seconds,
                    s.ws + (s.lv*d),
                    s.d + d,
                    s.ft,
                    s.fv,
                    ts,
                    value);
        WHEN method = 'linear'
            THEN
                d := EXTRACT(epoch FROM (ts - s.lt));
                IF d > gap_seconds
                    THEN
                        d := 0;
                END IF;
                RETURN ROW(
                    method,
                    gap_seconds,
                    s.ws + ((s.lv+value)*d/2),
                    s.d + d,
                    s.ft,
                    s.fv,
                    ts,
                    value);
        ELSE
            RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'invalid interpolation method';
        END CASE;
    END
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE AGGREGATE time_weight_gap(ts TIMESTAMPTZ, value DOUBLE PRECISION, gap_seconds DOUBLE PRECISION, method tws_interpolation) (
    sfunc = add_to_tws,
    stype = tws
);

CREATE OR REPLACE FUNCTION  tws_combine(a TWS, b TWS) RETURNS TWS AS $$
    DECLARE
        d DOUBLE PRECISION;
    BEGIN
        CASE
            WHEN a is NULL
                THEN
                    return b;
            WHEN b is NULL
                THEN
                    return a;
            WHEN a.m != b.m
                THEN
                    RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'summary interpolation method must match';
            WHEN a.gap != b.gap
                THEN
                    RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'summary gap detection must match';
            WHEN a.m = 'locf'
                THEN
                    d := EXTRACT(epoch FROM (b.ft - a.lt));
                    IF d > a.gap
                        THEN
                            d := a.gap;
                    END IF;
                    RETURN ROW(
                        a.m,
                        a.gap,
                        a.ws + b.ws + (a.lv*d),
                        a.d + b.d + d,
                        a.ft,
                        a.fv,
                        b.lt,
                        b.lv);
            WHEN a.m = 'linear'
                THEN
                    d := EXTRACT(epoch FROM (b.ft - a.lt));
                    IF d > a.gap
                        THEN
                            d := 0;
                    END IF;
                    RETURN ROW(
                        a.m,
                        a.gap,
                        a.ws + b.ws + ((a.lv+b.fv)*d/2),
                        a.d + b.d + d,
                        a.ft,
                        a.fv,
                        b.lt,
                        b.lv);
        ELSE
            RAISE EXCEPTION SQLSTATE '90001' USING MESSAGE = 'invalid interpolation method';
        END CASE;
    END
$$ LANGUAGE plpgsql IMMUTABLE;

CREATE OR REPLACE AGGREGATE rollup(s tws) (
    sfunc = tws_combine,
    stype = tws
);

SELECT time_bucket('PT3H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'locf') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;
SELECT time_bucket('PT3H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'linear') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time;

SELECT time_bucket('PT3H', start_time) as start_time2, rollup(s) FROM (SELECT time_bucket('PT1H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'locf') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;
SELECT time_bucket('PT3H', start_time) as start_time2, rollup(s) FROM (SELECT time_bucket('PT1H', time) AS start_time, time_weight_gap(time, value, 1800.0, 'linear') AS s FROM (SELECT time, value FROM signal ORDER BY time) GROUP BY start_time) GROUP BY start_time2;
smyrman commented 11 months ago

PS! Ideally we would like a similar solution for [compact] stats aggregation with similar logic to locf interpolation. I.e. that if there is a gap, the value is considered valid for the duration of the gap detection, and that we are able to not count gap values has having a state.

smyrman commented 11 months ago

image