ArroyoSystems / arroyo

Distributed stream processing engine in Rust
https://arroyo.dev
Apache License 2.0
3.8k stars 220 forks source link

Does watermark work with events from the past? #756

Open ahassany opened 1 month ago

ahassany commented 1 month ago

I'm trying to do some streaming analytics on historical data pushed to Kafka. The kafka message timestamp is recent but the actual event date is older (over a year).

CREATE TABLE input_table (
    ts TIMESTAMP NOT NULL,
    sourceIPv6Address VARCHAR,
    destinationIPv6Address VARCHAR,
    octetDeltaCount BIGINT,
    watermark TIMESTAMP GENERATED ALWAYS AS (ts - INTERVAL '30' SECOND) STORED
) WITH (
  connector = 'kafka',
  format = 'json',
  bootstrap_servers = 'kafka-1:19091',
  topic = 'input-topic',
  type = 'source',
  'source.offset' = 'earliest',
  'source.read_mode' = 'read_uncommitted',
  event_time_field = 'ts',
  watermark_field = 'watermark'
);

CREATE VIEW AGG AS SELECT window.start as window_start, "sourceIPv6Address", "destinationIPv6Address", octetDeltaCount
FROM (
    SELECT TUMBLE(interval '1 minute') as window, "sourceIPv6Address", "destinationIPv6Address", SUM("octetDeltaCount") AS octetDeltaCount
    FROM input_table
    GROUP BY window, "sourceIPv6Address", "destinationIPv6Address"
);

CREATE TABLE agg_sink (
    window_start TIMESTAMP,
    sourceIPv6Address VARCHAR,
    destinationIPv6Address VARCHAR,
    octetDeltaCount BIGINT
) WITH (
    'connector' = 'kafka',
    'bootstrap_servers' = 'kafka-1:19091',
    'type' = 'sink',
    'topic' = 'arroyo-agg',
    'format' = 'json'
);

INSERT INTO agg_sink SELECT window_start, "sourceIPv6Address", "destinationIPv6Address", octetDeltaCount FROM AGG;

However nothing is inserted into the output kafka topic?