timescale / timescaledb

An open-source time-series SQL database optimized for fast ingest and complex queries. Packaged as a PostgreSQL extension.
https://www.timescale.com/
Other
17.85k stars 884 forks source link

[Bug]: Continuous Aggregate - Missing rows in hypertable #6690

Open afpgit opened 8 months ago

afpgit commented 8 months ago

What type of bug is this?

Incorrect result

What subsystems and features are affected?

Continuous aggregate

What happened?

Problem: missing OHLC bars (candlestick) for financial data when there is no transaction.

I am expecting that even if the tick data is missing for that period, it still produces a new bar with Open=High=Low=Closed=Previous closed.

I have a tick table in timescaledb as follows from which I create a materialized view for 1-minute OHLC bars with its corresponding refresh policy. However, my tick_1min table does not end up having the correct OHLC bar and there are missing bars in the table, as can be seen in below. I have created a trigger signal that whenever the OHLC is created in the table a client is notified and will act upon the new data to process the update.

CREATE TABLE tick
(
    tstamp         TIMESTAMP WITH TIME ZONE NOT NULL,
    tstamp_micro   INT                      NOT NULL,
    exchange       VARCHAR(100)             NOT NULL,
    symbol         VARCHAR(100)             NOT NULL,
    price          DOUBLE PRECISION         NOT NULL,
    vwap           DOUBLE PRECISION         NOT NULL,
    size           INT                      NOT NULL,
    volume_bought  INT                      NOT NULL,
    volume_sold    INT                      NOT NULL,
    side           SMALLINT                 NOT NULL,
    context        VARCHAR(200) DEFAULT NULL
);
SELECT create_hypertable('tick', 'tstamp');

materialized view for 1-minute OHLC bars

CREATE MATERIALIZED VIEW tick_1min
    WITH (timescaledb.continuous) AS
SELECT time_bucket('1 minute', tstamp) AS bucket,
       symbol,
       first(tick.price, tick."tstamp")  AS open,
       max(tick.price)                   AS high,
       min(tick.price)                   AS low,
       last(tick.price, tick."tstamp")   AS close
FROM tick
GROUP BY symbol, bucket;

-- Refresh policy
SELECT add_continuous_aggregate_policy('tick_1min',
                                       start_offset => INTERVAL '1 day', --
                                       end_offset => INTERVAL '1 minute',
                                       schedule_interval => INTERVAL '1 minute');

-- Manual refresh
CALL refresh_continuous_aggregate('tick_1min', now() - interval '1 week', now())

Missing rows in tick_1min, e.g., data from 22:25:00 to 22:28:00 is not added (with expected Open=High=Low=Closed=Previous closed)

2024-02-21 22:23:00.000 -0800   MESH4   5008.25 5008.25 5008.25 5008.25
2024-02-21 22:24:00.000 -0800   MESH4   5008.0  5008.25 5008.0  5008.25
2024-02-21 22:29:00.000 -0800   MESH4   5008.0  5008.0  5008.0  5008.0
2024-02-21 22:31:00.000 -0800   MESH4   5008.25 5008.25 5008.25 5008.25
2024-02-21 22:32:00.000 -0800   MESH4   5008.25 5008.25 5008.25 5008.25
2024-02-21 22:34:00.000 -0800   MESH4   5008.0  5008.0  5008.0  5008.0
2024-02-21 22:38:00.000 -0800   MESH4   5008.25 5008.25 5008.25 5008.25
2024-02-21 22:41:00.000 -0800   MESH4   5008.25 5008.25 5008.25 5008.25
2024-02-21 22:44:00.000 -0800   MESH4   5008.0  5008.25 5008.0  5008.25
2024-02-21 22:45:00.000 -0800   MESH4   5008.25 5008.25 5008.25 5008.25
2024-02-21 22:49:00.000 -0800   MESH4   5008.0  5008.0  5008.0  5008.0

TimescaleDB version affected

2.10.2

PostgreSQL version used

14.7

What operating system did you use?

Ubuntu 14.7-1.pgdg22.04+1

What installation method did you use?

Docker

What platform did you run on?

Other

Relevant log output and stack trace

No response

How can we reproduce the bug?

# -------------------------------
# Random tick data generator
# -------------------------------

import psycopg2
import random
import time
from datetime import datetime, timedelta

# Connection parameters
conn_params = {
    "dbname": "postgres",
    "user": "postgres",
    "password": "password",
    "host": "127.0.0.1",
    "port": "5432"
}

# Connect to the database
conn = psycopg2.connect(**conn_params)
cursor = conn.cursor()

# Start time for the simulation
start_time = datetime.now()
end_time = start_time + timedelta(seconds=simulation_duration)

# Simulation duration
simulation_duration = 15 * 60  # 15 minutes in seconds

# Symbols and exchanges
symbols = ['AAPL']
exchanges = ['NASDAQ']

# Simulate writing data to the table at random times
while datetime.now() < end_time:
    timestamp = datetime.now()

    # Generate random data
    data = {
        "tstamp": timestamp,
        "tstamp_micro": int(timestamp.microsecond),
        "exchange": random.choice(exchanges),
        "symbol": random.choice(symbols),
        "price": round(random.uniform(100, 1000), 2),
        "net_change": round(random.uniform(-10, 10), 2),
        "percent_change": round(random.uniform(-1, 1), 2),
        "vwap": round(random.uniform(100, 1000), 2),
        "size": random.randint(100, 1000),
        "volume_bought": random.randint(100, 1000),
        "volume_sold": random.randint(100, 1000),
        "aggressor_side": random.randint(0, 1),
        "context": None
    }

    # Insert data into the table
    insert_query = """
    INSERT INTO tick (tstamp, tstamp_micro, exchange, symbol, price, net_change, percent_change, vwap, size, volume_bought, volume_sold, aggressor_side, context)
    VALUES (%(tstamp)s, %(tstamp_micro)s, %(exchange)s, %(symbol)s, %(price)s, %(net_change)s, %(percent_change)s, %(vwap)s, %(size)s, %(volume_bought)s, %(volume_sold)s, %(aggressor_side)s, %(context)s)
    """
    cursor.execute(insert_query, data)
    conn.commit()

    # Sleep for a random amount of time (simulate random intervals between writes)
    time.sleep(random.uniform(0.1, 90))

# Close connection
cursor.close()
conn.close()
jnidzwetzki commented 8 months ago

Hello @afpgit,

Thanks for reaching out. The reported behavior is the expected behavior of a continuous aggregate using time_bucket. When no data is available in the raw hypertable, no data is available in the continuous aggregate.

However, there are two different time_bucket functions available in TimescaleDB: time_bucket and time_bucket_gapfill. time_bucket_gapfill is intended to fill the gaps of missing data with previous values.

So, time_bucket_gapfill is intended to handle use-cases like yours. Unfortunately, time_bucket_gapfill is currently not supported in continuous aggregates (see #1324). I will keep the issue open but re-label it from bug to enhancement.