risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.99k stars 575 forks source link

bug: panic on mv-on-mv #2759

Closed neverchanje closed 2 years ago

neverchanje commented 2 years ago

Describe the bug

I created 3 materialized views and the creation on mv1, mv2 was fine, but it always fell into panic when I created mv3 as join of mv1 and mv2.

thread 'tokio-runtime-worker' panicked at 'not yet implemented: handle finish report after meta recovery', /Users/wutao/git/risingwave-dev/src/meta/src/barrier/notifier.rs:123:33
stack backtrace:
   0: rust_begin_unwind
             at /rustc/cb121987158d69bb894ba1bcc21dc45d1e0a488f/library/std/src/panicking.rs:584:5
   1: core::panicking::panic_fmt
             at /rustc/cb121987158d69bb894ba1bcc21dc45d1e0a488f/library/core/src/panicking.rs:142:14
   2: risingwave_meta::barrier::notifier::UnfinishedNotifiers::finish_actors
             at ./src/meta/src/barrier/notifier.rs:123:33
   3: risingwave_meta::barrier::GlobalBarrierManager<S>::run::{{closure}}
             at ./src/meta/src/barrier/mod.rs:279:25
   4: <core::future::from_generator::GenFuture<T> as core::future::future::Future>::poll
             at /rustc/cb121987158d69bb894ba1bcc21dc45d1e0a488f/library/core/src/future/mod.rs:91:19
   5: risingwave_meta::barrier::GlobalBarrierManager<S>::start::{{closure}}::{{closure}}
             at ./src/meta/src/barrier/mod.rs:202:45

To Reproduce

First, we need to use https://github.com/singularity-data/risingwave-demo/tree/main/workload-generator to generate the streams.

create materialized source ad_click (
    bid_id bigint,
    click_timestamp timestamp
) with (
    'connector' = 'kafka',
    'kafka.topic' = 'ad_click',
    'kafka.brokers' = '127.0.0.1:29092',
    'kafka.scan.startup.mode' = 'earliest'
) row format json;

create materialized source ad_impression (
    bid_id bigint,
    ad_id bigint,
    impression_timestamp timestamp
) with (
    'connector' = 'kafka',
    'kafka.topic' = 'ad_impression',
    'kafka.brokers' = '127.0.0.1:29092',
    'kafka.scan.startup.mode' = 'earliest'
) row format json;

CREATE MATERIALIZED VIEW ad_impressions_5min AS
SELECT
    ad_id,
    COUNT(*) AS impressions_count,
    window_end
FROM
    TUMBLE(
        ad_impression,
        impression_timestamp,
        INTERVAL '1' MINUTE
    )
GROUP BY
    ad_id,
    window_end;

CREATE MATERIALIZED VIEW ad_clicks_5min AS
SELECT
    ai.ad_id,
    COUNT(*) AS clicks_count,
    ai.window_end as window_end
FROM
    TUMBLE(
        ad_click,
        click_timestamp,
        INTERVAL '1' MINUTE
    ) AS ac
    INNER JOIN TUMBLE(
        ad_impression,
        impression_timestamp,
        INTERVAL '1' MINUTE
    ) AS ai ON ai.bid_id = ac.bid_id
    AND ai.window_end = ac.window_end
GROUP BY
    ad_id,
    ai.window_end;

CREATE MATERIALIZED VIEW ad_ctr_5min AS
SELECT
    ac.ad_id AS ad_id,
    ac.clicks_count :: NUMERIC / ai.impressions_count AS ctr,
    ai.window_end AS window
FROM
    ad_impressions_5min AS ai
    join ad_clicks_5min AS ac ON ai.ad_id = ac.ad_id
    AND ai.window_end = ac.window_end;

Then we are going to build several materialized views: ad_ctr_5min will join ad_impressions_5min and ad_clicks_5min. It will definitely fail on ad_ctr_5min due to the panic above.

If I run the join on top of ad_impressions_5min and ad_clicks_5min in batch mode the results will show successfully:

dev=> SELECT
dev->     ac.ad_id AS ad_id,
dev->     ac.clicks_count :: NUMERIC / ai.impressions_count AS ctr,
dev->     ai.window_end AS window
dev-> FROM
dev->     ad_impressions_5min AS ai
dev->     join ad_clicks_5min AS ac ON ai.ad_id = ac.ad_id
dev->     AND ai.window_end = ac.window_end;
 ad_id |              ctr               |       window
-------+--------------------------------+---------------------
     1 | 0.50                           | 2022-05-24 06:24:00
     2 | 1                              | 2022-05-24 06:25:00
     4 | 0.25                           | 2022-05-24 06:24:00
     5 | 1                              | 2022-05-24 06:24:00
     5 | 0.75                           | 2022-05-24 06:25:00
     6 | 0.6666666666666666666666666667 | 2022-05-24 06:24:00
     6 | 0.6666666666666666666666666667 | 2022-05-24 06:25:00
     7 | 0.50                           | 2022-05-24 06:24:00
     9 | 0.25                           | 2022-05-24 06:24:00

Expected behavior A clear and concise description of what you expected to happen.

NO PANIC.

Additional context Add any other context about the problem here.

yezizp2012 commented 2 years ago

Cc @BugenZhao . It seems like when creating ad_ctr_5min, barrier manager received some impossible finished epoch. I will try to reproduce it and fix it.

yezizp2012 commented 2 years ago

After some more investigation, the FinishCreateMviewNotifier notified twice in rearraged_chain. This is quite weird and seems impossible. image

Will check deeper. 🤔

yezizp2012 commented 2 years ago

ad_ctr_5min plan as follows:

                                                                 QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
 StreamMaterialize { columns: [ad_id, ctr, window, ad_id#1(hidden), window_end(hidden)], pk_columns: [ad_id#1, window, ad_id, window_end] }
   StreamProject { exprs: [$3, ($4::Decimal / $1), $2, $0, $5] }
     StreamHashJoin { type: Inner, predicate: $0 = $3 AND $2 = $5 }
       StreamTableScan { table: ad_impressions_5min, columns: [ad_id, impressions_count, window_end], pk_indices: [0, 2] }
       StreamTableScan { table: ad_clicks_5min, columns: [ad_id, clicks_count, window_end], pk_indices: [0, 2] }
(5 rows)

After further investigation, the chain executor is built twice in this mview with same actor id.

2022-05-24T09:19:09.020321Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953472, actor: 32, fragment: 11, node: “MergeExecutor”
2022-05-24T09:19:09.020358Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953509, actor: 32, fragment: 11, node: “BatchPlanNode”
2022-05-24T09:19:09.021187Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953510, actor: 32, fragment: 11, node: “ChainExecutor”
2022-05-24T09:19:09.021228Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953472, actor: 32, fragment: 11, node: “MergeExecutor”
2022-05-24T09:19:09.021256Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953507, actor: 32, fragment: 11, node: “BatchPlanNode”
2022-05-24T09:19:09.022018Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953508, actor: 32, fragment: 11, node: “ChainExecutor”
2022-05-24T09:19:09.022055Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953516, actor: 32, fragment: 11, node: “StreamHashJoin { type: Inner, predicate: $0 = $3 AND $2 = $5 }”
2022-05-24T09:19:09.022127Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953518, actor: 32, fragment: 11, node: “StreamProject { exprs: [$3, ($4::Decimal / $1), $2, $0, $5] }”
2022-05-24T09:19:09.022169Z  INFO risingwave_stream::task::stream_manager: create executor: 137438953519, actor: 32, fragment: 11, node: “StreamMaterialize { columns: [ad_id, ctr, window, ad_id#1(hidden), window_end(hidden)], pk_columns: [ad_id#1, window, ad_id, window_end] }”

The fragmenter seems work not correctly on StreamHashJoin, it should be split into three fragments, rather than the follow:

image