risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.87k stars 569 forks source link

Discussion: create source without consuming data until a start command #13103

Open chenzl25 opened 11 months ago

chenzl25 commented 11 months ago

Currently, when users create a source and then create some Mvs on it, it will immediately consume data. However, in some circumstances, it will be better if we don't consume it first.

Case1: Users may want to rebuild the entire streaming job without waiting for each individual Mv to be backfilled. To achieve this, they can create a source without consuming the data. Once the pipeline is built, they can then start consuming the source data.

Case2: I have found that a user used temporal join to enforce some invariants for the output data. To ensure that the invariants are valid for all output, we need to ensure that there is no data consumption between Mv latest_b_per_kind and a_b. Otherwise, the temporal join may lookup future data.

SET streaming_parallelism = 1;
​
CREATE TABLE events (seq bigint, event_type int,  kind varchar) APPEND ONLY
WITH (
    connector = 'datagen',
    fields.seq.kind = 'sequence',
    fields.seq.end = '9223372036854775807',
    fields.event_type.kind = 'random',
    fields.event_type.min = '1',
    fields.event_type.max = '2',
    fields.event_type.seed = 1,
    fields.kind.kind = 'random',
    fields.kind.length = 2,
    fields.kind.seed = 1,
    datagen.rows.per.second = 10000
) FORMAT PLAIN ENCODE JSON;
​
​
CREATE MATERIALIZED VIEW IF NOT EXISTS a AS (
    SELECT
        DISTINCT ON (seq) seq,
        kind
    FROM
        events
    WHERE
        event_type = 1
);
​
CREATE MATERIALIZED VIEW IF NOT EXISTS b AS (
    SELECT
        DISTINCT ON (seq) seq,
        kind
    FROM
        events
    WHERE
        event_type = 2
);
​
CREATE MATERIALIZED VIEW IF NOT EXISTS latest_b_per_kind AS (
    SELECT
        kind,
        seq
    FROM
        (
            SELECT
                kind,
                seq,
                row_number() OVER (
                    PARTITION BY kind
                    ORDER BY
                        seq DESC
                ) as rank
            FROM
                b
        )
    WHERE
        rank = 1
);
​
CREATE MATERIALIZED VIEW IF NOT EXISTS a_b AS (
    SELECT
        a.seq as a_seq,
        a.kind as kind,
        latest_b_per_kind.seq as b_seq
    FROM
        a
        LEFT JOIN latest_b_per_kind FOR SYSTEM_TIME AS OF PROCTIME() ON a.kind = latest_b_per_kind.kind
);

Check invariant:

SELECT count(1) from a_b where b_seq is not null and a_seq < b_seq;
hzxa21 commented 11 months ago

This can also be a very useful admin operations for troubleshooting and issue mitigation. Related: https://github.com/risingwavelabs/risingwave/issues/12997

BugenZhao commented 11 months ago

There's a similar issue long ago: https://github.com/risingwavelabs/risingwave/issues/3073

StrikeW commented 11 months ago

For case1, I think transactional DDL makes more sense.

st1page commented 11 months ago

some discussion about case1: https://github.com/risingwavelabs/risingwave/issues/12771

github-actions[bot] commented 8 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.