risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
7.08k stars 585 forks source link

perf: improve nexmark q8(EOWC) (w/ and w/o scaling up) performance #14986

Open lmatz opened 9 months ago

lmatz commented 9 months ago

nightly-20240127

4X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2962
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus: test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly

1X:

  1. https://buildkite.com/risingwave-test/nexmark-benchmark/builds/2961
  2. https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus: test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz

RW 4X: 1659K RW 1X: 592K (slower) 4X/1X Ratio: 2.8

Flink: http://metabase.risingwave-cloud.xyz/question/9549-nexmark-rw-vs-flink-avg-source-throughput-all-testbeds?rw_tag=nightly-20240127&flink_tag=v1.16.0&flink_label=flink-medium-1tm-test-20230104,flink-4x-medium-1tm-test-20240104&flink_metrics=avg-job-throughput-per-second

4X/1X Ratio: 3.16

RW:

CREATE SINK nexmark_q8
    AS
    SELECT P.id,
           P.name,
           P.starttime
    FROM (SELECT id,
                 name,
                 window_start AS starttime,
                 window_end   AS endtime
          FROM
              TUMBLE(person, date_time, INTERVAL '10' SECOND)
          GROUP BY id,
                   name,
                   window_start,
                   window_end) P
             JOIN (SELECT seller,
                          window_start AS starttime,
                          window_end   AS endtime
                   FROM
                       TUMBLE(auction, date_time, INTERVAL '10' SECOND)
                   GROUP BY seller,
                            window_start,
                            window_end) A ON P.id = A.seller
        AND P.starttime = A.starttime
        AND P.endtime = A.endtime
    EMIT ON WINDOW CLOSE
    WITH ( connector = 'blackhole', type = 'append-only' );

Query Plan:

 StreamSink { type: append-only, columns: [id, name, starttime, $expr10217(hidden), $expr10224(hidden), $expr10223(hidden), $expr10225(hidden)] }
 └─StreamEowcSort { sort_column: $expr2 }
   └─StreamHashJoin [window, append_only] { type: Inner, predicate: $expr2 = $expr6 AND $expr5 = $expr8 AND $expr3 = $expr7, output_watermarks: [$expr2, $expr5, $expr6, $expr8] }
     ├─StreamExchange { dist: HashShard($expr3, $expr2, $expr5) }
     │ └─StreamAppendOnlyDedup { dedup_cols: [$expr3, $expr4, $expr2, $expr5] }
     │   └─StreamExchange { dist: HashShard($expr3, $expr4, $expr2, $expr5) }
     │     └─StreamProject { exprs: [Field(person, 0:Int32) as $expr3, Field(person, 1:Int32) as $expr4, $expr2, ($expr2 + '00:00:10':Interval) as $expr5], output_watermarks: [$expr2, $expr5] }
     │       └─StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr2, _row_id], output_watermarks: [$expr1, $expr2] }
     │         └─StreamFilter { predicate: (event_type = 0:Int32) }
     │           └─StreamShare { id: 6 }
     │             └─StreamProject { exprs: [event_type, person, auction, $expr1, _row_id], output_watermarks: [$expr1] }
     │               └─StreamFilter { predicate: ((event_type = 0:Int32) OR (event_type = 1:Int32)) }
     │                 └─StreamRowIdGen { row_id_index: 6 }
     │                   └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
     │                     └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
     │                       └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
     └─StreamAppendOnlyDedup { dedup_cols: [$expr7, $expr6, $expr8] }
       └─StreamExchange { dist: HashShard($expr7, $expr6, $expr8) }
         └─StreamProject { exprs: [Field(auction, 7:Int32) as $expr7, $expr6, ($expr6 + '00:00:10':Interval) as $expr8], output_watermarks: [$expr6, $expr8] }
           └─StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr6, _row_id], output_watermarks: [$expr1, $expr6] }
             └─StreamFilter { predicate: (event_type = 1:Int32) }
               └─StreamShare { id: 6 }
                 └─StreamProject { exprs: [event_type, person, auction, $expr1, _row_id], output_watermarks: [$expr1] }
                   └─StreamFilter { predicate: ((event_type = 0:Int32) OR (event_type = 1:Int32)) }
                     └─StreamRowIdGen { row_id_index: 6 }
                       └─StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] }
                         └─StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                           └─StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] }
(28 rows)

Dist Query Plan:

 Fragment 0
 StreamSink { type: append-only, columns: [id, name, starttime, $expr10217(hidden), $expr10224(hidden), $expr10223(hidden), $expr10225(hidden)] } { tables: [ Sink: 0 ] }
 └── StreamEowcSort { sort_column: $expr2 } { tables: [ Sort: 1 ] }
     └── StreamHashJoin [window, append_only] { type: Inner, predicate: $expr2 = $expr6 AND $expr5 = $expr8 AND $expr3 = $expr7, output_watermarks: [$expr2, $expr5, $expr6, $expr8] }
         ├── tables: [ HashJoinLeft: 2, HashJoinDegreeLeft: 3, HashJoinRight: 4, HashJoinDegreeRight: 5 ]
         ├── StreamExchange Hash([0, 2, 3]) from 1
         └── StreamAppendOnlyDedup { dedup_cols: [$expr7, $expr6, $expr8] } { tables: [ AppendOnlyDedup: 9 ] }
             └── StreamExchange Hash([0, 1, 2]) from 4

 Fragment 1
 StreamAppendOnlyDedup { dedup_cols: [$expr3, $expr4, $expr2, $expr5] } { tables: [ AppendOnlyDedup: 6 ] }
 └── StreamExchange Hash([0, 1, 2, 3]) from 2

 Fragment 2
 StreamProject { exprs: [Field(person, 0:Int32) as $expr3, Field(person, 1:Int32) as $expr4, $expr2, ($expr2 + '00:00:10':Interval) as $expr5], output_watermarks: [$expr2, $expr5] }
 └── StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr2, _row_id], output_watermarks: [$expr1, $expr2] }
     └── StreamFilter { predicate: (event_type = 0:Int32) }
         └── StreamExchange NoShuffle from 3

 Fragment 3
 StreamProject { exprs: [event_type, person, auction, $expr1, _row_id], output_watermarks: [$expr1] }
 └── StreamFilter { predicate: ((event_type = 0:Int32) OR (event_type = 1:Int32)) }
     └── StreamRowIdGen { row_id_index: 6 }
         └── StreamWatermarkFilter { watermark_descs: [Desc { column: $expr1, expr: ($expr1 - '00:00:04':Interval) }], output_watermarks: [$expr1] } { tables: [ WatermarkFilter: 7 ] }
             └── StreamProject { exprs: [event_type, person, auction, bid, Case((event_type = 0:Int32), Field(person, 6:Int32), (event_type = 1:Int32), Field(auction, 5:Int32), Field(bid, 5:Int32)) as $expr1, _rw_kafka_timestamp, _row_id] }
                 └── StreamSource { source: nexmark, columns: [event_type, person, auction, bid, _rw_kafka_timestamp, _row_id] } { tables: [ Source: 8 ] }

 Fragment 4
 StreamProject { exprs: [Field(auction, 7:Int32) as $expr7, $expr6, ($expr6 + '00:00:10':Interval) as $expr8], output_watermarks: [$expr6, $expr8] }
 └── StreamProject { exprs: [event_type, person, auction, $expr1, TumbleStart($expr1, '00:00:10':Interval) as $expr6, _row_id], output_watermarks: [$expr1, $expr6] }
     └── StreamFilter { predicate: (event_type = 1:Int32) }
         └── StreamExchange NoShuffle from 3

 Table 0
 ├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_vnode, kv_log_store_row_op, id, name, starttime, $expr10217, $expr10224, $expr10223, $expr10225 ]
 ├── primary key: [ $0 ASC, $1 ASC, $2 ASC ]
 ├── value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 ]
 ├── distribution key: [ 4, 6, 7 ]
 ├── read pk prefix len hint: 3
 └── vnode column idx: 2

 Table 1 { columns: [ $expr3, $expr4, $expr2, $expr5, $expr7, $expr6, $expr8 ], primary key: [ $2 ASC, $0 ASC, $3 ASC, $1 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 0 }

 Table 2 { columns: [ $expr3, $expr4, $expr2, $expr5 ], primary key: [ $2 ASC, $3 ASC, $0 ASC, $1 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 2, 3 ], read pk prefix len hint: 3 }

 Table 3 { columns: [ $expr2, $expr5, $expr3, $expr4, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 4 ], distribution key: [ 2, 0, 1 ], read pk prefix len hint: 3 }

 Table 4 { columns: [ $expr7, $expr6, $expr8 ], primary key: [ $1 ASC, $2 ASC, $0 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }

 Table 5 { columns: [ $expr6, $expr8, $expr7, _degree ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3 ], distribution key: [ 2, 0, 1 ], read pk prefix len hint: 3 }

 Table 6 { columns: [ $expr3, $expr4, $expr2, $expr5 ], primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ], value indices: [ 0, 1, 2, 3 ], distribution key: [ 0, 1, 2, 3 ], read pk prefix len hint: 4 }

 Table 7 { columns: [ vnode, offset ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1, vnode column idx: 0 }

 Table 8 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }

 Table 9 { columns: [ $expr7, $expr6, $expr8 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0, 1, 2 ], read pk prefix len hint: 3 }

(59 rows)

Flink:

INSERT INTO nexmark_q8
    SELECT P.id, P.name, P.starttime
    FROM (
      SELECT P.id, P.name,
             TUMBLE_START(P.dateTime, INTERVAL '10' SECOND) AS starttime,
             TUMBLE_END(P.dateTime, INTERVAL '10' SECOND) AS endtime
      FROM person P
      GROUP BY P.id, P.name, TUMBLE(P.dateTime, INTERVAL '10' SECOND)
    ) P
    JOIN (
      SELECT A.seller,
             TUMBLE_START(A.dateTime, INTERVAL '10' SECOND) AS starttime,
             TUMBLE_END(A.dateTime, INTERVAL '10' SECOND) AS endtime
      FROM auction A
      GROUP BY A.seller, TUMBLE(A.dateTime, INTERVAL '10' SECOND)
    ) A
    ON P.id = A.seller AND P.starttime = A.starttime AND P.endtime = A.endtime;

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q8], fields=[id, name, starttime])
+- Calc(select=[id, name, starttime])
   +- Join(joinType=[InnerJoin], where=[AND(=(id, seller), =(starttime, starttime0), =(endtime, endtime0))], select=[id, name, starttime, endtime, seller, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[id, starttime, endtime]])
      :  +- Calc(select=[$f0 AS id, $f1 AS name, w$start AS starttime, w$end AS endtime])
      :     +- GroupWindowAggregate(groupBy=[$f0, $f1], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
      :        +- Exchange(distribution=[hash[$f0, $f1]])
      :           +- Calc(select=[person.id AS $f0, person.name AS $f1, dateTime], where=[=(event_type, 0)])
      :              +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
      :                 +- Calc(select=[event_type, person, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
      :                    +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])
      +- Exchange(distribution=[hash[seller, starttime, endtime]])
         +- Calc(select=[$f0 AS seller, w$start AS starttime, w$end AS endtime])
            +- GroupWindowAggregate(groupBy=[$f0], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
               +- Exchange(distribution=[hash[$f0]])
                  +- Calc(select=[auction.seller AS $f0, dateTime], where=[=(event_type, 1)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, auction, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q8], fields=[id, name, starttime])
+- Calc(select=[id, name, starttime])
   +- Join(joinType=[InnerJoin], where=[((id = seller) AND (starttime = starttime0) AND (endtime = endtime0))], select=[id, name, starttime, endtime, seller, starttime0, endtime0], leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
      :- Exchange(distribution=[hash[id, starttime, endtime]])
      :  +- Calc(select=[$f0 AS id, $f1 AS name, w$start AS starttime, w$end AS endtime])
      :     +- GroupWindowAggregate(groupBy=[$f0, $f1], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, $f1, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
      :        +- Exchange(distribution=[hash[$f0, $f1]])
      :           +- Calc(select=[person.id AS $f0, person.name AS $f1, dateTime], where=[(event_type = 0)])
      :              +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
      :                 +- Calc(select=[event_type, person, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
      :                    +- TableSourceScan(table=[[default_catalog, default_database, kafka]], fields=[event_type, person, auction, bid])(reuse_id=[1])
      +- Exchange(distribution=[hash[seller, starttime, endtime]])
         +- Calc(select=[$f0 AS seller, w$start AS starttime, w$end AS endtime])
            +- GroupWindowAggregate(groupBy=[$f0], window=[TumblingGroupWindow('w$, dateTime, 10000)], properties=[w$start, w$end, w$rowtime, w$proctime], select=[$f0, start('w$) AS w$start, end('w$) AS w$end, rowtime('w$) AS w$rowtime, proctime('w$) AS w$proctime])
               +- Exchange(distribution=[hash[$f0]])
                  +- Calc(select=[auction.seller AS $f0, dateTime], where=[(event_type = 1)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, auction, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- Reused(reference_id=[1])
lmatz commented 9 months ago

Some notable phenomenon:

1. why the executor cache for materialized view when no materialized view is created?

And why such an executor cache pattern?

4X: SCR-20240205-dm

1X: SCR-20240205-dp

2. Why join table miss rates 80% and 100%, so high? If 100%, the cache is effectively 100% useless.

4X: SCR-20240205-fq

1X: SCR-20240205-fv

3. Join Actor Input Blocking Time Ratio and Join Actor Match Duration Per Second are both about 4 times worse.

4X: SCR-20240205-ij

1X: SCR-20240205-in

4. AppendOnlyDedup cannot keep up when scaling up

4X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63 SCR-20240205-vr

1X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz&editPanel=63 SCR-20240205-vn

5. The HashJoin generally keeps up when scaling up.

4X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63 SCR-20240205-vr

1X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz&editPanel=63 SCR-20240205-vn

6. The Sort operator does not keep up when scaling up

4X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63 SCR-20240205-vr

1X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?editPanel=63&from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz SCR-20240205-vn

7. Project cannot keep up when scaling up (hard to believe

4X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?from=1706892320000&orgId=1&to=1706893643000&var-datasource=Prometheus:%20test-useast1-eks-a&var-namespace=nexmark-ht-4x-1cn-affinity-weekly&editPanel=63

SCR-20240205-1bd

1X: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?editPanel=63&from=1706890825000&orgId=1&to=1706892628000&var-datasource=Prometheus:+test-useast1-eks-a&var-namespace=nexmark-1cn-affinity-lmatz

SCR-20240205-1bf

github-actions[bot] commented 5 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.