risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.06k stars 581 forks source link

Bug(source): datagen source generate data skew #19469

Open Li0k opened 2 days ago

Li0k commented 2 days ago

Test case 1:

create table t_large_ckpt_with_pk (k bigint, v varchar, primary key(k)) with (
        connector = 'datagen',
        fields.v.length = 100000,
        datagen.rows.per.second='100000',
        datagen.split.num = '24',
        fields.k.kind = 'random',
        fields.k.min = 1,
        fields.k.max = 50000000,
    );
create materialized view mv_large_ckpt_with_pk as select * from t_large_ckpt_with_pk;

checkpoint_frequency = 60

image

The metrics show that the actual data written to the mv is less than 10% of the data written to the table, which means that there are a lot of duplicates in the generated pk(k), and the duplicates are cleaned up in the shared buffer compact phase.

Test case2 :

test by @tabVersion

dev=> create source  t_large_ckpt_with_pk (k bigint, v varchar) with (
        connector = 'datagen',
        fields.v.length = 100000,
        datagen.rows.per.second='100000',
        datagen.split.num = '24',
        fields.k.kind = 'random',
        fields.k.min = 1,
        fields.k.max = 50000000,
    );
CREATE_SOURCE
dev=> create materialized view mv as ( select k, count(*) from t_large_ckpt_with_pk group by k ) ;
CREATE_MATERIALIZED_VIEW
dev=> create table t as select * from mv ;
INSERT 0 3072
dev=>
dev=> create table t1 as select * from mv ;
INSERT 0 4184
select * from t1 order by count  desc limit 10 ;
    k    | count
---------+-------
 1626013 |    12
  153605 |    12
  957843 |    12
   15215 |    12
 1689808 |    12
 1885161 |    12
  623498 |    12
  592407 |    12
 1572091 |    12
  384375 |    12
(10 rows)
select * from t join t1 on t.k = t1.k order by t1.count desc limit 10 ;
    k    | count |    k    | count
---------+-------+---------+-------
 9199828 |    12 | 9199828 |    12
 2322857 |    12 | 2322857 |    12
 8216541 |    12 | 8216541 |    12
 1157410 |    12 | 1157410 |    12
 9581087 |    12 | 9581087 |    12
 9753606 |    12 | 9753606 |    12
 5211605 |    12 | 5211605 |    12
 3506222 |    12 | 3506222 |    12
 8834964 |    12 | 8834964 |    12
 3109476 |    12 | 3109476 |    12
(10 rows)
dev=> select sum(count) from t;
  sum
-------
 36864
(1 row)

dev=> select sum(count) from t1;
  sum
-------
 50002
(1 row)