risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.07k stars 582 forks source link

Tracking: time-series related functions #19343

Open lmatz opened 1 week ago

lmatz commented 1 week ago

Downsampling (with gap-filling)

https://questdb.io/docs/reference/sql/sample-by/#fill-options https://docs.timescale.com/api/latest/hyperfunctions/downsampling/ https://www.timescale.com/blog/how-to-proactively-manage-long-term-data-storage-with-downsampling/ https://docs.timescale.com/api/latest/hyperfunctions/gapfilling/time_bucket_gapfill/

Users would love to have this in materialized views. https://github.com/timescale/timescaledb/issues/1324

three common filling strategies:

Agg Helper Functions

Mathematics and Statistics

fuyufjh commented 1 week ago

These features are completed

fuyufjh commented 1 week ago

The left part is

  1. Gap-filling aggregation. Can be done either by
    • (Preferred) Extend GROUP BY e.g. GROUP BY ... FILL(NULL), wich means GROUP BY with gapfilling method: NONE/NULL/PREV/LINEAR/... (ref)
    • Extend TVF e.g. TUMBLE -> tumble_with_gap_filling
    • The former one is preferred because it's easier to support interpolating.
  2. Aggregation functions
    • See Mathematics and Statistics

Related: Some old discussions about gap-filling https://github.com/risingwavelabs/risingwave/issues/15830

riaan53 commented 1 week ago

I'm glad this is getting some attention. I can share some of the things I'm trying to do that might help with direction. I'm working with a lot of IoT data that needs to be downsampled to specific intervals with aggregation functions, gap-filled only up to a maximum duration, then filled with nulls or other values to indicate the sensor is offline. Sampled timestamps should always be continuous so sensors can be compared, correlated, pulled into ML sequences, etc. Interpolation functions can vary—LOCF and linear are common but don’t cover all IoT sensor types.

Note that generate_series does not work on historical data, especially for streaming, if you do it with smaller timestamp like 1s the cluster falls over for more than a few hours of data. I had to specifically use it only if the timestamp is missing. This far from ideal query also does not work for the next values or in real time if the last point is unavailable but its the best i could come up with in RW that plays with MVs. Hope with the new timeseries support i can simplify it a lot.

Also see: https://www.timescale.com/blog/sql-functions-for-time-series-analysis/

create MATERIALIZED VIEW signals AS
WITH userQuery AS (SELECT name, value, timestamp FROM signals_raw),
        hop_data AS (
            SELECT
                userQuery.name,
                window_start,
                window_end,
                AVG(userQuery.value) AS value,
                COUNT(userQuery.value) AS datapoint_count
            FROM HOP(userQuery, userQuery.timestamp, INTERVAL '1 minute', INTERVAL '1 minutes')
            GROUP BY userQuery.name, window_start, window_end
        ),
        global_max_timestamp AS (
            SELECT MAX(window_end) AS max_timestamp FROM hop_data
        ),
        lead_data AS (
            SELECT
                hd.name,
                hd.window_start,
                hd.window_end,
                hd.value,
                hd.datapoint_count,
                COALESCE(
                    LEAD(hd.window_start) OVER (PARTITION BY hd.name ORDER BY hd.window_start),
                    (SELECT max_timestamp FROM global_max_timestamp)
                ) AS next_window_start
            FROM hop_data hd
        ),
        missing_intervals AS (
            SELECT
                name,
                NULL::double precision AS value,
                0 AS datapoint_count,
                generate_series(
                    (window_start + INTERVAL '1 minute')::timestamp,
                    (next_window_start)::timestamp - INTERVAL '1 minute',
                    INTERVAL '1 minute'
                ) AS missing_timestamp
            FROM lead_data
            WHERE next_window_start - window_start >= INTERVAL '1 minute'
        ),
        combined_data AS (
            SELECT
                name,
                value,
                datapoint_count,
                window_start,
                window_end,
                TRUE AS is_observed  -- Observed data
            FROM hop_data
            UNION ALL
            SELECT
                name,
                NULL::double precision AS value,
                0 AS datapoint_count,
                missing_timestamp AS window_start,
                missing_timestamp + INTERVAL '1 minute' AS window_end,
                FALSE AS is_observed  -- Missing data
            FROM missing_intervals
        ),
        data_with_last_observed AS (
            SELECT
                name,
                window_start,
                window_end,
                value,
                datapoint_count,
                is_observed,
                MAX(CASE WHEN is_observed THEN window_end ELSE NULL END) OVER (
                    PARTITION BY name ORDER BY window_end
                    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                ) AS last_observed_time,
                MAX(CASE WHEN is_observed THEN value ELSE NULL END) OVER (
                    PARTITION BY name ORDER BY window_end
                    ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
                ) AS last_observed_value
            FROM combined_data
        )
        SELECT
            name,
            window_end AS timestamp,
            CASE
                WHEN last_observed_time IS NOT NULL
                    AND window_end - last_observed_time <= INTERVAL '15 minutes'
                THEN last_observed_value
                ELSE 0  -- Fallback value for gaps beyond max interpolation interval
            END AS value,
            datapoint_count,
            CASE
                WHEN is_observed THEN FALSE  -- Observed data
                WHEN last_observed_time IS NOT NULL
                    AND window_end - last_observed_time <= INTERVAL '15 minutes'
                THEN TRUE  -- Interpolated via LOCF
                ELSE FALSE  -- Fallback value applied
            END AS interpolated
        FROM data_with_last_observed
        ORDER BY name, window_start;

I have not tested the performance with this but would assume it would not be the best. This is just an example i swop out parts in code for different aggregation function and interpolations. But cant get it to fill gaps if there is no next value for realtime data up till now(). Hope it helps and let me know if you need me to test out some things.