timescale / timescaledb

An open-source time-series SQL database optimized for fast ingest and complex queries. Packaged as a PostgreSQL extension.
https://www.timescale.com/
Other
17.7k stars 885 forks source link

[Bug]: Real-time aggregates not working as expected #5775

Open Streamlinesx opened 1 year ago

Streamlinesx commented 1 year ago

What type of bug is this?

Other

What subsystems and features are affected?

Continuous aggregate

What happened?

After creating a data table that contains time-series data from a sensor, I get two different behaviors depending on if I insert data into the table before or after the creation of an hourly continuous aggregate on this table.

If I insert data before creating the continuous aggregate, the first bucket will not have real-time aggregates on data that is inserted after the creation of the cagg. Only the second bucket will have real-time aggregates on new inserts.

If I create the continuous aggregate first, and only then start inserting data, the first bucket will have real-time aggregates, as expected and required.

In both cases, the bucket should not be materialized yet since the start_offset is set to one hour.

For my application, a workaround is to rename the base-table, create a new table with the original name of the old table, create the continuous aggregate on the new table, and only then insert data from the old table into the new one. However this is not ideal since the table I want to work with and first encountered this phenomenon on is rather large (60m+ entries)

TimescaleDB version affected

2.9.3

PostgreSQL version used

12

What operating system did you use?

Windows 10 x64

What installation method did you use?

Docker

What platform did you run on?

Other, Not applicable

Relevant log output and stack trace

No response

How can we reproduce the bug?

-- Before performing this test, make sure to not be on the cusp of a new hour, or have a new hour begin during the test. The test wants to show behavior of the first bucket that comes up after the creation of a new continuous aggregate.

-- Create a docker container with: 
-- FROM timescale/timescaledb:2.9.3-pg12
-- After creating a database, create the timescaledb extension:

CREATE EXTENSION IF NOT EXISTS timescaledb;

-- Create the first data table

CREATE TABLE IF NOT EXISTS data_telegram
(
    dongle_id                integer      not null,
    meter_id                 varchar(255),
    timestamp                timestamp(0) not null,
    exported_1 integer,
    exported_2 integer,
    exported_3 integer,
        constraint data_telegram_pkey primary key (dongle_id, timestamp)
);

-- Create hypertable
SELECT create_hypertable('data_telegram', 'timestamp', partitioning_column := 'dongle_id', number_partitions := 2, chunk_time_interval := INTERVAL '1 day');

-- Create function to insert data into the data table:

CREATE OR REPLACE FUNCTION insert_data()
    RETURNS VOID AS $$
DECLARE
    next_value_1 INTEGER;
    next_value_2 INTEGER;
    next_value_3 INTEGER;
BEGIN
    SELECT COALESCE(MAX(exported_1), 0) + 1 INTO next_value_1 FROM data_telegram;
    SELECT COALESCE(MAX(exported_2), 0) + 1 INTO next_value_2 FROM data_telegram;
    SELECT COALESCE(MAX(exported_3), 0) + 1 INTO next_value_3 FROM data_telegram;

    INSERT INTO data_telegram (dongle_id, meter_id, timestamp, exported_1, exported_2, exported_3) VALUES (1, 'a_meter', CURRENT_TIMESTAMP, next_value_1, next_value_2, next_value_3);

END;
$$ LANGUAGE plpgsql;

-- insert data into the table by calling this function a few times:

SELECT insert_data();

-- Now create the continuous aggregate:

CREATE MATERIALIZED VIEW data_aggregation_hour_min_max WITH (timescaledb.continuous, timescaledb.materialized_only= false) AS
SELECT dongle_id,
       meter_id,
       time_bucket(INTERVAL '1 hour', timestamp) AS timestamp,
       min(timestamp)                              AS first_data_received,
       max(timestamp)                              AS last_data_received,
       min(exported_1)                             AS exported_1_min,
       max(exported_1)                             AS exported_1_max,
       min(exported_2)                             AS exported_2_min,
       max(exported_2)                             AS exported_2_max,
       min(exported_3)                             AS exported_3_min,
       max(exported_3)                             AS exported_3_max
FROM data_telegram
GROUP BY dongle_id, meter_id, time_bucket(INTERVAL '1 hour', timestamp)
WITH DATA;

SELECT add_continuous_aggregate_policy('data_aggregation_hour_min_max', start_offset => INTERVAL '3 hours',
                                       end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 minute');

SELECT add_retention_policy('data_aggregation_hour_min_max', INTERVAL '62 days', false);

-- What you should observe when opening the view on the cagg is one line for the current hour, with the timestamp of the last insert of the data table in the 'last_data_received' column
-- Now call the insert_data function again:

SELECT insert_data();

-- And you can observe that in the cagg, the 'last_data_received' column is not updated according to the latest data in the base-table, which is not expected

-- Now create a new table data_telegram_2:

CREATE TABLE IF NOT EXISTS data_telegram_2
(
    dongle_id                integer      not null,
    meter_id                 varchar(255),
    timestamp                timestamp(0) not null,
    exported_1 integer,
    exported_2 integer,
    exported_3 integer,
    constraint data_telegram_2_pkey primary key (dongle_id, timestamp)
);

ALTER TABLE data_telegram_2
    OWNER TO smartbirdsuser_test;

SELECT create_hypertable('data_telegram_2', 'timestamp', partitioning_column := 'dongle_id', number_partitions := 2,
                         chunk_time_interval := INTERVAL '1 day');

-- Create a new insert_data function to insert data in both tables simultaneously

CREATE OR REPLACE FUNCTION insert_data_2()
    RETURNS VOID AS $$
DECLARE
    next_value_1 INTEGER;
    next_value_2 INTEGER;
    next_value_3 INTEGER;
BEGIN
    SELECT COALESCE(MAX(exported_1), 0) + 1 INTO next_value_1 FROM data_telegram;
    SELECT COALESCE(MAX(exported_2), 0) + 1 INTO next_value_2 FROM data_telegram;
    SELECT COALESCE(MAX(exported_3), 0) + 1 INTO next_value_3 FROM data_telegram;

    INSERT INTO data_telegram (dongle_id, meter_id, timestamp, exported_1, exported_2, exported_3) VALUES (1, 'a_meter', CURRENT_TIMESTAMP, next_value_1, next_value_2, next_value_3);
    INSERT INTO data_telegram_2 (dongle_id, meter_id, timestamp, exported_1, exported_2, exported_3) VALUES (1, 'a_meter', CURRENT_TIMESTAMP, next_value_1, next_value_2, next_value_3);

END;
$$ LANGUAGE plpgsql;

-- Create a cagg on data_telegram_2, identical to the one on data_telegram:

CREATE MATERIALIZED VIEW data_aggregation_hour_min_max_2 WITH (timescaledb.continuous, timescaledb.materialized_only= false) AS
SELECT dongle_id,
       meter_id,
       time_bucket(INTERVAL '1 hour', timestamp) AS timestamp,
       min(timestamp)                              AS first_data_received,
       max(timestamp)                              AS last_data_received,
       min(exported_1)                             AS exported_1_min,
       max(exported_1)                             AS exported_1_max,
       min(exported_2)                             AS exported_2_min,
       max(exported_2)                             AS exported_2_max,
       min(exported_3)                             AS exported_3_min,
       max(exported_3)                             AS exported_3_max
FROM data_telegram_2
GROUP BY dongle_id, meter_id, time_bucket(INTERVAL '1 hour', timestamp)
WITH DATA;

SELECT add_continuous_aggregate_policy('data_aggregation_hour_min_max_2', start_offset => INTERVAL '3 hours', end_offset => INTERVAL '1 hour', schedule_interval => INTERVAL '1 minute');

SELECT add_retention_policy('data_aggregation_hour_min_max_2', INTERVAL '62 days', false);

-- Insert data from data_telegram into data_telegram_2:

INSERT INTO data_telegram_2 (select * from data_telegram);

-- Now insert new data into both data tables by calling insert_data_2:

SELECT insert_data_2();

-- You should observe the 'last_data_received' column in the new view updating on each insert, and the one in the old view not updating until calling the manual refresh, and still not updating in real time when inserting new data again

CALL refresh_continuous_aggregate('data_aggregation_hour_min_max', null, null);
jnidzwetzki commented 1 year ago

Hello @Streamlinesx,

Thanks for the detailed reproduction steps. I was able to reproduce the issue with the current development version (9c7ae3e8a983ff1a19645c3d2dc0508ae8c69550) of TimescaleDB. The problem seems to be related to #5379.

For the first CAGG (data_aggregation_hour_min_max), the materialization watermark is set to the end of the current time bucket. For the second CAGG (data_aggregation_hour_min_max_2), the watermark is set to the min value. It appears that the more recent watermark for the first CAGG is disabling the real-time aggregation for this CAGG bucket.

test2=# select user_view_name,
    _timescaledb_internal.to_timestamp(
        _timescaledb_internal.cagg_watermark(mat_hypertable_id)
    )
from _timescaledb_catalog.continuous_agg;
         user_view_name          |          to_timestamp           
---------------------------------+---------------------------------
 data_aggregation_hour_min_max   | 2023-06-13 14:00:00+02
 data_aggregation_hour_min_max_2 | 4714-11-24 00:53:28+00:53:28 BC
(2 rows)
Streamlinesx commented 1 year ago

@jnidzwetzki

Thank you for looking into this.

I have investigated a bit further and it seems related to this troubleshooting step:

continuous-aggregate-watermark-is-in-the-future

When initializing the first cagg WITH NO DATA then it works as expected but does require the call of the refresh_continuous_aggregate() function now