timescale / timescaledb-toolkit

Extension for more hyperfunctions, fully compatible with TimescaleDB and PostgreSQL 📈
https://www.timescale.com
Other
370 stars 46 forks source link

Dervive statistics from frequency of data with an agg. #771

Open ollz272 opened 1 year ago

ollz272 commented 1 year ago

Is your feature request related to a problem? Please describe. We have a usecase where we'd like to be able to collect statistics on the frequency of data recieved by a sensor (avg/stdev). ATM we've had to do this in a rather hacky way, but this feels like an easy agg to build into timescaledb-toolki

Describe the solution you'd like I think we could have a 2 step rollup agg. Which could be used like:

with frequencies as (SELECT
     sensor_uuid,
     frequency_agg(time) as agg
FROM
    datapoint
WHERE
    time >= NOW() - '1 MONTH'::INTERVAL
)

select,
    sensor_uuid,
    average(agg),
    stddev(agg)
from
   frequencies

This would hopefully also support rollups, etc. Its a bit like a heartbeat agg, but:

  1. We dont care about when its dead or not. just about frequency of the heartbeat?
  2. We dont need to supply a time, thats g

Describe alternatives you've considered We've got some implementation of this, but its hacky. We basically had to:

Additional context I thought what we needed was: https://github.com/timescale/timescaledb-toolkit/pull/770, but actually what we need to do is calculate the avg frequency, and then i want to be able to use that in a heartbeat agg, which while this PR is kinda nice as a feature, creates a chicken and egg problem! having some kinda frequency agg would be great.

I'd be interested in helping out with development of this if needed

ollz272 commented 1 year ago

To update here, at our org we've created this aggregate that does what whats described here, roughly. It would be great however to get this into timescale-toolkit:

/*
         We need a new aggregate to basically help use track time changes. This agg will:
         - Aggregate the times when we get a new row within the group
         - Aggregate the times when we get a new row with a new value within the new group

         Lets first start off by defining the component parts
         */

        /*
         We need a convince type to help track values from each row we care about
         */
        CREATE TYPE time_tracker_state_entry AS (
            ts timestamp,
            v numeric
        );
        /*
         We need a type to store the final result in.
         */
        CREATE TYPE time_tracker_final AS (
            raw_values time_tracker_state_entry[],
            starts timestamp[],
            ends timestamp[],
            cov_starts timestamp[],
            cov_ends timestamp[]
        );

         /*
          This function here is the internal state manager. This runs for everyrow in the grouping AS the aggregate
          is processed.
          For this, its very simple. We just add on the time and value of that row to an array.
          */
        CREATE OR REPLACE function time_tracker_state_func(tts time_tracker_state_entry[], ts timestamp, value double precision)
        RETURNS time_tracker_state_entry[] AS
        $$
            BEGIN
                RETURN array_append((tts), (ts, value)::public.time_tracker_state_entry);
            END;
        $$
        LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;

        /*
          This function converts the last state we have from the aggregate into a useable type.
          This is the bulk of the work.
        */
        CREATE OR REPLACE function time_tracker_ffunc(tt time_tracker_state_entry[])
        RETURNS time_tracker_final AS
        $$
            DECLARE
                entries public.time_tracker_state_entry[] := ARRAY[]::public.time_tracker_state_entry[];
                starts timestamp[] := ARRAY[]::timestamp[];
                ends timestamp[] := ARRAY[]::timestamp[];
                cov_starts timestamp[] := ARRAY[]::timestamp[];
                cov_ends timestamp[] := ARRAY[]::timestamp[];
                current_value int := null;
                index int;
            BEGIN
                -- We first order everything in the aggregate by the time. This helps us get an ordered list.
                entries := (SELECT ARRAY(SELECT e from unnest(tt) e ORDER BY e.ts));
                FOR index IN SELECT generate_series(1, array_upper(entries, 1) - 1)
                LOOP

                    -- Then for every row (minus the last), we track it and the next row AS the start and end
                    -- This tracks the row time changes.
                    starts := array_append(starts, entries[index].ts);
                    ends := array_append(ends, entries[index+1].ts);
                    -- We then need to consider the change of value times.
                    -- IF null then its the first value, so we set the value and start
                    if current_value is null THEN
                        current_value = entries[index].v;
                        cov_starts := array_append(cov_starts, entries[index].ts);
                    -- IF it doesn't equal the current value then theres been a change, so make a new end and start
                    elsif current_value != entries[index].v then
                        current_value = entries[index].v;
                        cov_ends := array_append(cov_ends, entries[index].ts);
                        cov_starts := array_append(cov_starts, entries[index].ts);
                    end if;
                END LOOP;
                --- At last value in the series then create the final end.
                cov_ends := array_append(cov_ends, entries[array_upper(entries, 1)].ts);

                -- We finally return a type with all these values in
                -- Note we keep the raw entries that were processed AS well.
                -- This is kinda overkill and there will be smarter ways to do this,
                -- but helps with a naive implementation of being able to rollup
                -- these results.
                RETURN (tt, starts, ends, cov_starts, cov_ends)::public.time_tracker_final;
            END;
        $$
        LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;

        /*
         Finally we put together our aggregate, using the definitions above.
         */
        CREATE OR REPLACE AGGREGATE time_tracker_agg(timestamp, double precision) (
            -- SFUNC is our state func which runs on every row
            SFUNC = time_tracker_state_func,
            -- STYPE is the type thats passed to our state func.
            STYPE = time_tracker_state_entry[],
            -- FINALFUNC is the function used to process the state once all rows are processed.
            FINALFUNC = time_tracker_ffunc
        );

        /*
         Now we have the aggregate, we define a bunch of accessor functions that can extract data from the agg.
         */
        CREATE OR REPLACE function cov_periods(tt time_tracker_final)
        -- Returns a table with the start and end times for any change in value period.
        RETURNS TABLE(
            start_ts timestamp,  -- The start time of the period
            end_ts timestamp  -- the end time of the period
        ) AS
        $$
        BEGIN
            RETURN QUERY
            SELECT
                unnest(tt.cov_starts),
                unnest(tt.cov_ends)
            ;
        END;
        $$
        LANGUAGE plpgsql IMMUTABLE;

        CREATE OR REPLACE function frequency_periods(tt time_tracker_final)
        -- Returns a table with the start and end times for any change time values.
        RETURNS TABLE(
            start_ts timestamp,
            end_ts timestamp
        ) AS
        $$
        BEGIN
            RETURN QUERY
            SELECT
                unnest(tt.starts),
                unnest(tt.ends)
            ;
        END;
        $$
        LANGUAGE plpgsql IMMUTABLE;

        CREATE OR REPLACE function avg_frequency_time_diff(tt time_tracker_final)
        -- Returns the average change in time.
        RETURNS numeric AS
        $$
        BEGIN
            RETURN (
                WITH frequency_periods AS (
                    SELECT
                       unnest(tt.starts) AS start_ts,
                       unnest(tt.ends) AS end_ts
                )
                SELECT avg(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM frequency_periods
            );
        END;
        $$
        LANGUAGE plpgsql IMMUTABLE;

        CREATE OR REPLACE function max_frequency_time_diff(tt time_tracker_final)
        -- Returns the max change in time.
        RETURNS numeric AS
        $$
        BEGIN
            RETURN (
                WITH frequency_periods AS (
                    SELECT
                        unnest(tt.starts) AS start_ts,
                        unnest(tt.ends) AS end_ts
                )
                SELECT max(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM frequency_periods
            );
        END;
        $$
        LANGUAGE plpgsql IMMUTABLE;

        CREATE OR REPLACE function min_frequency_time_diff(tt time_tracker_final)
        -- Returns the min change in time.
        RETURNS numeric AS
        $$
        BEGIN
            RETURN (
                WITH frequency_periods AS (
                    SELECT
                        unnest(tt.starts) AS start_ts,
                        unnest(tt.ends) AS end_ts
                )
                SELECT min(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM frequency_periods
            );
        END;
        $$
        LANGUAGE plpgsql IMMUTABLE;

       CREATE OR REPLACE function stddev_frequency_time_diff(tt time_tracker_final)
       -- Returns the stand deviation of change in time.
       RETURNS numeric AS
       $$
       BEGIN
            RETURN (
                WITH frequency_periods AS (
                    SELECT
                        unnest(tt.starts) AS start_ts,
                        unnest(tt.ends) AS end_ts
                )
                SELECT stddev(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM frequency_periods
            );
       END;
       $$
       LANGUAGE plpgsql IMMUTABLE;

       CREATE OR REPLACE function avg_cov_time_diff(tt time_tracker_final)
       -- Returns the average change in value.
       RETURNS numeric AS
       $$
       BEGIN
           RETURN (
               WITH cov_periods AS (
                   SELECT
                       unnest(tt.cov_starts) AS start_ts,
                       unnest(tt.cov_ends) AS end_ts
               )
               SELECT avg(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM cov_periods
           );
       END;
       $$
       LANGUAGE plpgsql IMMUTABLE;

       CREATE OR REPLACE function max_cov_time_diff(tt time_tracker_final)
       -- Returns the max change in value.
       RETURNS numeric AS
       $$
       BEGIN
           RETURN (
               WITH cov_periods AS (
                   SELECT
                       unnest(tt.cov_starts) AS start_ts,
                       unnest(tt.cov_ends) AS end_ts
               )
               SELECT max(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM cov_periods
           );
       END;
       $$
       LANGUAGE plpgsql IMMUTABLE;

       CREATE OR REPLACE function min_cov_time_diff(tt time_tracker_final)
       -- Returns the min change in value.
       RETURNS numeric AS
       $$
       BEGIN
           RETURN (
               WITH cov_periods AS (
                   SELECT
                       unnest(tt.cov_starts) AS start_ts,
                       unnest(tt.cov_ends) AS end_ts
               )
               SELECT min(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM cov_periods
           );
       END;
       $$
       LANGUAGE plpgsql IMMUTABLE;

       CREATE OR REPLACE function stddev_cov_time_diff(tt time_tracker_final)
       -- Returns the stand deviation of change in values.
       RETURNS numeric AS
       $$
       BEGIN
           RETURN (
               WITH cov_periods AS (
                   SELECT
                       unnest(tt.cov_starts) AS start_ts,
                       unnest(tt.cov_ends) AS end_ts
               )
               SELECT stddev(EXTRACT(EPOCH FROM end_ts - start_ts)) FROM cov_periods
           );
       END;
       $$
       LANGUAGE plpgsql IMMUTABLE;

       /*
        Now we've got our accessors, we need to define our rollup aggregate
        This is important to us AS we want to pre-calculate these aggs on a daily basis,
        and then combine them later on to get a full set of data.
        */

       /*
        As before, we start by defining our state func
         This just takes the raw entries from the aggregate and concatenates them to
         a new list.
        */
       CREATE OR REPLACE function time_tracker_rollup_state_func(ttrs time_tracker_state_entry[], ttf time_tracker_final)
       RETURNS time_tracker_state_entry[] AS
       $$
           BEGIN
               RETURN (array_cat(ttrs, ttf.raw_values));
           END;
       $$
       LANGUAGE plpgsql IMMUTABLE;

       /*
         We now have a list of entries from all the aggregates in the rollup. So
         We do similar to what we did before and turn this into our final state
         for further analysis.
         This is essentially the same algorithm.
        */
       CREATE OR REPLACE function time_tracker_rollup_ffunc(ttrs time_tracker_state_entry[])
       RETURNS time_tracker_final AS
       $$
           DECLARE
               entries public.time_tracker_state_entry[] := ARRAY[]::public.time_tracker_state_entry[];
               starts timestamp[] := ARRAY[]::timestamp[];
               ends timestamp[] := ARRAY[]::timestamp[];
               cov_starts timestamp[] := ARRAY[]::timestamp[];
               cov_ends timestamp[] := ARRAY[]::timestamp[];
               current_value int := null;
               index int;
           BEGIN
               entries := (SELECT ARRAY(SELECT e from unnest(ttrs) e ORDER BY e.ts));
               FOR index IN SELECT generate_series(1, array_upper(entries, 1) - 1)
               LOOP
                   starts := array_append(starts, entries[index].ts);
                   ends := array_append(ends, entries[index+1].ts);
                   --- IF null then its the first value, so we set the value and start
                   IF current_value IS null THEN
                       current_value = entries[index].v;
                       cov_starts := array_append(cov_starts, entries[index].ts);
                   -- IF it doesn't equal the current value then theres been a change, so make a new end and start
                   ELSIF current_value != entries[index].v THEN
                       current_value = entries[index].v;
                       cov_ends := array_append(cov_ends, entries[index].ts);
                       cov_starts := array_append(cov_starts, entries[index].ts);
                   --- If we get to the last value in the series then create the final end.
                   END IF;
               END LOOP;
                --- At last value in the series then create the final end.
                cov_ends := array_append(cov_ends, entries[array_upper(entries, 1)].ts);

               RETURN (ttrs, starts, ends, cov_starts, cov_ends)::time_tracker_final;
            END;
        $$
        LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;

       /*
        Finally we define the aggregate using the functions above.
        */
       CREATE OR REPLACE AGGREGATE rollup(time_tracker_final) (
           SFUNC = time_tracker_rollup_state_func,
           STYPE = time_tracker_state_entry[],
           FINALFUNC = time_tracker_rollup_ffunc
       );