risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.94k stars 573 forks source link

perf: improve nexmark q15 scaling up performance #15251

Open lmatz opened 7 months ago

lmatz commented 7 months ago

nightly-20240224

RW Config:

RW_CONFIG="{'system':{'data_directory':'hummock_001','barrier_interval_ms':10000},'server':{'telemetry_enabled':false},'meta': {'level0_tier_compact_file_number':6,'level0_overlapping_sub_level_compact_level_count':6}}"

RW:

4X:

metabase: http://metabase.risingwave-cloud.xyz/question/9236-nexmark-q15-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2763?start_date=2024-01-04

grafana: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708900035000&to=1708901778000&var-namespace=nexmark-ht-4x-1cn-affinity-10s

buildkite: https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3156

1X:

metabase: http://metabase.risingwave-cloud.xyz/question/603-nexmark-q15-blackhole-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-178?start_date=2023-11-17

grafana: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708899363000&to=1708901166000&var-namespace=nexmark-1cn-affinity-10s

buildkite: https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3155

RW 4X: 1.15M RW 1X: 546K

4X/1X Ratio: 2.1

However, forgot to tune the barrier interval of Flink to 10s instead of 60s, let's try it again, I will post the new results at the end of this page.

Flink:

4X: http://metabase.risingwave-cloud.xyz/question/9712-flink-nexmark-q15-flink-4x-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-2920?start_date=2023-12-05

https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1704381755000&to=1704382897000&var-namespace=flink-4x-medium-1tm-test-20240104

https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/130

1X: http://metabase.risingwave-cloud.xyz/question/2336-flink-nexmark-q15-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-146?start_date=2023-07-08

https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1704454873000&to=1704456616000&var-namespace=flink-medium-1tm-test-20230104

https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/131

4X/1X Ratio: 3.125

Under 1X setting, RW and Flink does not differ too much. But scaling up differ a lot

Setting https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L113-L114:

Q15_RW_FORCE_TWO_PHASE_AGG = true
Q15_RW_FORCE_SPLIT_DISTINCT_AGG = true

RW:

CREATE SINK nexmark_q15 AS
    SELECT to_char(date_time, 'YYYY-MM-DD')                                          as "day",
           count(*)                                                                  AS total_bids,
           count(*) filter (where price < 10000)                                     AS rank1_bids,
           count(*) filter (where price >= 10000 and price < 1000000)                AS rank2_bids,
           count(*) filter (where price >= 1000000)                                  AS rank3_bids,
           count(distinct bidder)                                                    AS total_bidders,
           count(distinct bidder) filter (where price < 10000)                       AS rank1_bidders,
           count(distinct bidder) filter (where price >= 10000 and price < 1000000)  AS rank2_bidders,
           count(distinct bidder) filter (where price >= 1000000)                    AS rank3_bidders,
           count(distinct auction)                                                   AS total_auctions,
           count(distinct auction) filter (where price < 10000)                      AS rank1_auctions,
           count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
           count(distinct auction) filter (where price >= 1000000)                   AS rank3_auctions
    FROM bid
    GROUP BY to_char(date_time, 'YYYY-MM-DD')
    WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Plan:

 StreamSink { type: append-only, columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
 └─StreamProject { exprs: [$expr2_expanded, sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count($expr4_expanded) filter((flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr5_expanded) filter((flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)))] }
   └─StreamHashAgg { group_key: [$expr2_expanded], aggs: [sum0(sum0(count) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64))), sum0(sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64))), sum0(count($expr4_expanded) filter((flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64))), sum0(count($expr5_expanded) filter((flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), sum0(count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))), count] }
     └─StreamExchange { dist: HashShard($expr2_expanded) }
       └─StreamHashAgg { group_key: [$expr2_expanded, $expr6], aggs: [sum0(count) filter((flag = 0:Int64)), sum0(count filter(($expr3 < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter(($expr3 >= 1000000:Int32))) filter((flag = 0:Int64)), count($expr4_expanded) filter((flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr4_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count($expr5_expanded) filter((flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count($expr5_expanded) filter((count filter(($expr3 >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count] }
         └─StreamProject { exprs: [$expr2_expanded, $expr4_expanded, $expr5_expanded, flag, count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), Vnode($expr2_expanded, $expr4_expanded, $expr5_expanded, flag) as $expr6] }
           └─StreamHashAgg [append_only] { group_key: [$expr2_expanded, $expr4_expanded, $expr5_expanded, flag], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32))] }
             └─StreamExchange { dist: HashShard($expr2_expanded, $expr4_expanded, $expr5_expanded, flag) }
               └─StreamExpand { column_subsets: [[$expr2], [$expr2, $expr4], [$expr2, $expr5]] }
                 └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5, _row_id] }
                   └─StreamFilter { predicate: (event_type = 2:Int32) }
                     └─StreamRowIdGen { row_id_index: 6 }
                       └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                         └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                           └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(15 rows)

As the throughput of both systems under 1X setting are very close, it sounds very unreasonable that 4X setting has such big difference

lmatz commented 7 months ago

Let's compare the metrics between RW 1X and RW 4X.

Memory

1X: SCR-20240228-o0o

4X: SCR-20240228-o0k

7.43*4 = 29.72 < 33. 4X is using even more memory.

Agg's Cache Miss Rate

1X: SCR-20240228-o4d

4X: SCR-20240228-o4g

1X's Agg's cache miss rate is even higher than 4X's.

Agg's Cached Keys

1X: SCR-20240228-o6g

4X: SCR-20240228-o6a

Hummock Read

1X: SCR-20240228-o7r

4X: SCR-20240228-o7x

4X's block cache miss rate is even lower than 1X, although both cache miss rate has periodic flucutaion.

lmatz commented 7 months ago

Since q15 has distinct aggregation, which is a complex operator that may become the bottleneck.

We remove the distinct keyword in all the aggregations and introduce q15-no-distinct: https://github.com/risingwavelabs/kube-bench/blob/main/manifests/nexmark/nexmark-sinks.template.yaml#L597-L617

CREATE SINK nexmark_q15_no_distinct AS
    SELECT to_char(date_time, 'YYYY-MM-DD')                                          as "day",
           count(*)                                                                  AS total_bids,
           count(*) filter (where price < 10000)                                     AS rank1_bids,
           count(*) filter (where price >= 10000 and price < 1000000)                AS rank2_bids,
           count(*) filter (where price >= 1000000)                                  AS rank3_bids,
           count(bidder)                                                    AS total_bidders,
           count(bidder) filter (where price < 10000)                       AS rank1_bidders,
           count(bidder) filter (where price >= 10000 and price < 1000000)  AS rank2_bidders,
           count(bidder) filter (where price >= 1000000)                    AS rank3_bidders,
           count(auction)                                                   AS total_auctions,
           count(auction) filter (where price < 10000)                      AS rank1_auctions,
           count(auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
           count(auction) filter (where price >= 1000000)                   AS rank3_auctions
    FROM bid
    GROUP BY to_char(date_time, 'YYYY-MM-DD')
    WITH ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Since the group by columns is a single time column, by default, we use two phase aggregation: https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L115

Therefore, the plan:

 StreamSink { type: append-only, columns: [day, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions] }
 └─StreamProject { exprs: [$expr2, sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32)))] }
   └─StreamHashAgg { group_key: [$expr2], aggs: [sum0(count), sum0(count filter(($expr3 < 10000:Int32))), sum0(count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count filter(($expr3 >= 1000000:Int32))), sum0(count($expr4)), sum0(count($expr4) filter(($expr3 < 10000:Int32))), sum0(count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr4) filter(($expr3 >= 1000000:Int32))), sum0(count($expr5)), sum0(count($expr5) filter(($expr3 < 10000:Int32))), sum0(count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32))), sum0(count($expr5) filter(($expr3 >= 1000000:Int32))), count] }
     └─StreamExchange { dist: HashShard($expr2) }
       └─StreamHashAgg [append_only] { group_key: [$expr2, $expr6], aggs: [count, count filter(($expr3 < 10000:Int32)), count filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count filter(($expr3 >= 1000000:Int32)), count($expr4), count($expr4) filter(($expr3 < 10000:Int32)), count($expr4) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr4) filter(($expr3 >= 1000000:Int32)), count($expr5), count($expr5) filter(($expr3 < 10000:Int32)), count($expr5) filter(($expr3 >= 10000:Int32) AND ($expr3 < 1000000:Int32)), count($expr5) filter(($expr3 >= 1000000:Int32))] }
         └─StreamProject { exprs: [ToChar($expr1, 'YYYY-MM-DD':Varchar) as $expr2, Field(bid, 2:Int32) as $expr3, Field(bid, 1:Int32) as $expr4, Field(bid, 0:Int32) as $expr5, _row_id, Vnode(_row_id) as $expr6] }
           └─StreamFilter { predicate: (event_type = 2:Int32) }
             └─StreamRowIdGen { row_id_index: 6 }
               └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                 └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                   └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(11 rows)

RW 1X: http://metabase.risingwave-cloud.xyz/question/12510-nexmark-q15-no-distinct-blackhole-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3087?start_date=2024-01-23

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708901385000&to=1708902468000&var-namespace=nexmark-1cn-affinity-10s

https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3155

RW 4X: http://metabase.risingwave-cloud.xyz/question/12653-nexmark-q15-no-distinct-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-3089?start_date=2024-01-24

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708902057000&to=1708902659000&var-namespace=nexmark-ht-4x-1cn-affinity-10s

https://buildkite.com/risingwave-test/nexmark-benchmark/builds/3156

RW 4X: 3.3M RW 1X: 923K

Both almost close to the throughput of a stateless query.

4X/1X Ratio: 3.57

Both the scalability and the absolute throughput of RW are way much better when there is no distinct.

Flink 1X: http://metabase.risingwave-cloud.xyz/question/13468-flink-nexmark-q15-no-distinct-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3060?start_date=2024-01-27

https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709111642000&to=1709112604000&var-namespace=flink-medium-1tm-ckpt-10s

https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/148

Flink 4X: http://metabase.risingwave-cloud.xyz/question/13478-flink-nexmark-q15-no-distinct-flink-4x-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-3061?start_date=2024-01-27

https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1708961698000&to=1708962300000&var-namespace=flink-4x-medium-1tm-20240226

https://buildkite.com/risingwave-test/flink-nexmark-bench/builds/144

Flink 4X: 3.3M Flink 1X: 1M

we can conclude that q15-no-distinct is a scalable query for both RW and Flink.

lmatz commented 7 months ago

This is the Flink plan after enabling table.optimizer.distinct-agg.split.enabled: true

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
   +- Exchange(distribution=[hash[day]])
      +- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
         +- Exchange(distribution=[hash[day, $f6, $f7]])
            +- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, =($e, 3) AS $g_3, AND(=($e, 3), $f1) AS $g_30, AND(=($e, 3), $f2) AS $g_31, AND(=($e, 3), $f3) AS $g_32, =($e, 1) AS $g_1, AND(=($e, 1), $f1) AS $g_10, AND(=($e, 1), $f2) AS $g_11, AND(=($e, 1), $f3) AS $g_12, =($e, 2) AS $g_2, AND(=($e, 2), $f1) AS $g_20, AND(=($e, 2), $f2) AS $g_21, AND(=($e, 2), $f3) AS $g_22])
               +- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
                  +- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, IS TRUE(<(bid.price, 10000)) AS $f1, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f2, IS TRUE(>=(bid.price, 1000000)) AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[=(event_type, 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q15], fields=[day, $f1, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12])
+- GroupAggregate(groupBy=[day], partialFinalType=[FINAL], select=[day, $SUM0_RETRACT($f3) AS $f1, $SUM0_RETRACT($f4) AS $f2, $SUM0_RETRACT($f5) AS $f3, $SUM0_RETRACT($f6_0) AS $f4, $SUM0_RETRACT($f7_0) AS $f5, $SUM0_RETRACT($f8) AS $f6, $SUM0_RETRACT($f9) AS $f7, $SUM0_RETRACT($f10) AS $f8, $SUM0_RETRACT($f11) AS $f9, $SUM0_RETRACT($f12) AS $f10, $SUM0_RETRACT($f13) AS $f11, $SUM0_RETRACT($f14) AS $f12])
   +- Exchange(distribution=[hash[day]])
      +- GroupAggregate(groupBy=[day, $f6, $f7], partialFinalType=[PARTIAL], select=[day, $f6, $f7, COUNT(*) FILTER $g_3 AS $f3, COUNT(*) FILTER $g_30 AS $f4, COUNT(*) FILTER $g_31 AS $f5, COUNT(*) FILTER $g_32 AS $f6_0, COUNT(DISTINCT bidder) FILTER $g_1 AS $f7_0, COUNT(DISTINCT bidder) FILTER $g_10 AS $f8, COUNT(DISTINCT bidder) FILTER $g_11 AS $f9, COUNT(DISTINCT bidder) FILTER $g_12 AS $f10, COUNT(DISTINCT auction) FILTER $g_2 AS $f11, COUNT(DISTINCT auction) FILTER $g_20 AS $f12, COUNT(DISTINCT auction) FILTER $g_21 AS $f13, COUNT(DISTINCT auction) FILTER $g_22 AS $f14])
         +- Exchange(distribution=[hash[day, $f6, $f7]])
            +- Calc(select=[day, $f1, $f2, $f3, bidder, auction, $f6, $f7, ($e = 3) AS $g_3, (($e = 3) AND $f1) AS $g_30, (($e = 3) AND $f2) AS $g_31, (($e = 3) AND $f3) AS $g_32, ($e = 1) AS $g_1, (($e = 1) AND $f1) AS $g_10, (($e = 1) AND $f2) AS $g_11, (($e = 1) AND $f3) AS $g_12, ($e = 2) AS $g_2, (($e = 2) AND $f1) AS $g_20, (($e = 2) AND $f2) AS $g_21, (($e = 2) AND $f3) AS $g_22])
               +- Expand(projects=[{day, $f1, $f2, $f3, bidder, auction, $f6, null AS $f7, 1 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, $f7, 2 AS $e}, {day, $f1, $f2, $f3, bidder, auction, null AS $f6, null AS $f7, 3 AS $e}])
                  +- Calc(select=[DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, (bid.price < 10000) IS TRUE AS $f1, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f2, (bid.price >= 1000000) IS TRUE AS $f3, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(bid.bidder), 1024) AS $f6, MOD(HASH_CODE(bid.auction), 1024) AS $f7], where=[(event_type = 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

link #11964

lmatz commented 7 months ago

With @st1page and @Little-Wallace ,

we suspected that the aggregation dirty heap size, 64MB by default, is limiting the throughput. The reason is that due to the group by columns, aka day, bidder, and auction, and the data pattern, the skewness is strong, all the data go to a single partial aggregation operator at any given time SCR-20240306-o8l

We found that the aggregation dirty heap size fluctuates around 64MB: SCR-20240307-glo

The figure above comes from: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1709731899000&to=1709733586000&var-namespace=nexmark-ht-4x-1cn-affinity-10s

Flink has a similar knob to tune. https://github.com/risingwavelabs/kube-bench/blob/main/env.toml#L658-L663 https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/tuning/

However, after we set stream_hash_agg_max_dirty_groups_heap_size to 268435456, aka 256MB, it does not change much

https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1709736387000&orgId=1&to=1709738065000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-10s SCR-20240307-gub

The throughput does not change.

The two executions mentioned in this thread can be found at metabase: http://metabase.risingwave-cloud.xyz/question/9236-nexmark-q15-blackhole-4x-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2763?start_date=2024-01-04

lmatz commented 7 months ago

15696 has improved by about 20%

github-actions[bot] commented 4 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.