risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.78k stars 561 forks source link

perf: improve tpc-h q1 performance (single-topic) #15034

Open lmatz opened 7 months ago

lmatz commented 7 months ago

See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1

lmatz commented 7 months ago

RW Query:

    create sink tpch_q1 as
    select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        round(avg(l_quantity), 4) as avg_qty,
        round(avg(l_extendedprice), 4) as avg_price,
        round(avg(l_discount), 4) as avg_disc,
        count(*) as count_order
    from 
        lineitem 
    where 
        l_shipdate <= date '2099-12-01' - interval '71' day
    group by 
        l_returnflag, 
        l_linestatus
    with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');

Query Plan:

 StreamSink { type: append-only, columns: [l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order] }
 └─StreamProject { exprs: [$expr5, $expr6, sum($expr1), sum($expr2), sum($expr7), sum($expr8), RoundDigit((sum($expr1) / count($expr1)::Decimal), 4:Int32) as $expr9, RoundDigit((sum($expr2) / count($expr2)::Decimal), 4:Int32) as $expr10, RoundDigit((sum($expr3) / count($expr3)::Decimal), 4:Int32) as $expr11, count] }
   └─StreamHashAgg [append_only] { group_key: [$expr5, $expr6], aggs: [sum($expr1), sum($expr2), sum($expr7), sum($expr8), count($expr1), count($expr2), sum($expr3), count($expr3), count] }
     └─StreamExchange { dist: HashShard($expr5, $expr6) }
       └─StreamProject { exprs: [$expr5, $expr6, $expr1, $expr2, $expr7, ($expr7 * (1:Decimal + $expr4)) as $expr8, $expr3, _row_id] }
         └─StreamProject { exprs: [$expr1, $expr2, $expr3, $expr4, $expr5, $expr6, ($expr2 * (1:Decimal - $expr3)) as $expr7, _row_id] }
           └─StreamProject { exprs: [Field(lineitem, 4:Int32) as $expr1, Field(lineitem, 5:Int32) as $expr2, Field(lineitem, 6:Int32) as $expr3, Field(lineitem, 7:Int32) as $expr4, Field(lineitem, 8:Int32) as $expr5, Field(lineitem, 9:Int32) as $expr6, _row_id] }
             └─StreamFilter { predicate: (Field(lineitem, 10:Int32) <= '2099-09-21 00:00:00':Timestamp) AND (eventType = 'lineitem':Varchar) }
               └─StreamRowIdGen { row_id_index: 10 }
                 └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(10 rows)

Flink:

INSERT INTO tpch_q1
    select
        l_returnflag,
        l_linestatus,
        sum(l_quantity) as sum_qty,
        sum(l_extendedprice) as sum_base_price,
        sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
        sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
        round(avg(l_quantity), 4) as avg_qty,
        round(avg(l_extendedprice), 4) as avg_price,
        round(avg(l_discount), 4) as avg_disc,
        count(*) as count_order
    from
        lineitem
    where 
        l_shipdate <= date '2099-12-01' - interval '71' day
    group by
        l_returnflag,
        l_linestatus;

Query Plan:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q1], fields=[l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order])
+- Calc(select=[l_returnflag, l_linestatus, CAST(sum_qty AS DECIMAL(10, 0)) AS sum_qty, CAST(sum_base_price AS DECIMAL(10, 0)) AS sum_base_price, CAST(sum_disc_price AS DECIMAL(10, 0)) AS sum_disc_price, CAST(sum_charge AS DECIMAL(10, 0)) AS sum_charge, CAST(ROUND($f6, 4) AS DECIMAL(10, 0)) AS avg_qty, CAST(ROUND($f7, 4) AS DECIMAL(10, 0)) AS avg_price, CAST(ROUND($f8, 4) AS DECIMAL(10, 0)) AS avg_disc, CAST(count_order AS BIGINT) AS count_order])
   +- GroupAggregate(groupBy=[l_returnflag, l_linestatus], select=[l_returnflag, l_linestatus, SUM(l_quantity) AS sum_qty, SUM(l_extendedprice) AS sum_base_price, SUM($f4) AS sum_disc_price, SUM($f5) AS sum_charge, AVG(l_quantity) AS $f6, AVG(l_extendedprice) AS $f7, AVG(l_discount) AS $f8, COUNT(*) AS count_order])
      +- Exchange(distribution=[hash[l_returnflag, l_linestatus]])
         +- Calc(select=[lineitem.l_returnflag AS l_returnflag, lineitem.l_linestatus AS l_linestatus, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice, *(lineitem.l_extendedprice, -(1, lineitem.l_discount)) AS $f4, *(*(lineitem.l_extendedprice, -(1, lineitem.l_discount)), +(1, lineitem.l_tax)) AS $f5, lineitem.l_discount AS l_discount], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), <=(lineitem.l_shipdate, 2099-09-21))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q1], fields=[l_returnflag, l_linestatus, sum_qty, sum_base_price, sum_disc_price, sum_charge, avg_qty, avg_price, avg_disc, count_order])
+- Calc(select=[l_returnflag, l_linestatus, CAST(sum_qty AS DECIMAL(10, 0)) AS sum_qty, CAST(sum_base_price AS DECIMAL(10, 0)) AS sum_base_price, CAST(sum_disc_price AS DECIMAL(10, 0)) AS sum_disc_price, CAST(sum_charge AS DECIMAL(10, 0)) AS sum_charge, CAST(ROUND($f6, 4) AS DECIMAL(10, 0)) AS avg_qty, CAST(ROUND($f7, 4) AS DECIMAL(10, 0)) AS avg_price, CAST(ROUND($f8, 4) AS DECIMAL(10, 0)) AS avg_disc, CAST(count_order AS BIGINT) AS count_order])
   +- GroupAggregate(groupBy=[l_returnflag, l_linestatus], select=[l_returnflag, l_linestatus, SUM(l_quantity) AS sum_qty, SUM(l_extendedprice) AS sum_base_price, SUM($f4) AS sum_disc_price, SUM($f5) AS sum_charge, AVG(l_quantity) AS $f6, AVG(l_extendedprice) AS $f7, AVG(l_discount) AS $f8, COUNT(*) AS count_order])
      +- Exchange(distribution=[hash[l_returnflag, l_linestatus]])
         +- Calc(select=[lineitem.l_returnflag AS l_returnflag, lineitem.l_linestatus AS l_linestatus, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice, (lineitem.l_extendedprice * (1 - lineitem.l_discount)) AS $f4, ((lineitem.l_extendedprice * (1 - lineitem.l_discount)) * (1 + lineitem.l_tax)) AS $f5, lineitem.l_discount AS l_discount], where=[((eventType = 'lineitem') AND (lineitem.l_shipdate <= 2099-09-21))])
            +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
lmatz commented 7 months ago

Why three separate and consecutive StreamProject, is this expected?

Edit: @st1page :

yes, to reuse the same sub expression’s result and save duplicated computation

lmatz commented 7 months ago

RW: https://grafana.test.risingwave-cloud.xyz/d/EpkBw5W4k/risingwave-dev-dashboard?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706005919000&to=1706006521000&var-namespace=tpch-1cn-affinity-lmatz-good

CPU is not really the bottleneck, so project is irrelevant @chenzl25 @st1page : SCR-20240207-j0h

Nearly zero aggregate cache miss also, so not slowed by the IO from aggregation: SCR-20240207-j0v

However, there is clear skewness: SCR-20240207-j0k SCR-20240207-j38

Flink: https://grafana.test.risingwave-cloud.xyz/d/Q4ayNkfVz/flink-hits?orgId=1&var-datasource=Prometheus:%20test-useast1-eks-a&from=1706008339000&to=1706008941000&var-namespace=tpch-flink-good-order SCR-20240207-j1m SCR-20240207-j4e

st1page commented 7 months ago

However, there is clear skewness:

could it be the same issue with https://github.com/risingwavelabs/risingwave/issues/5214?

lmatz commented 7 months ago

However, there is clear skewness:

could it be the same issue with #5214?

Could be

Three possibilities in total:

  1. 5214

  2. The merge executor throughput indicates skewness in data (after group by?)
  3. Different partitions may contain drastically different amounts of events (as we have a single topic here which includes the events from all 8 tables in TPC-H)
lmatz commented 6 months ago

So I added a simple stateless query called q0 that does nothing and evaluated it for both RW and Flink: https://github.com/risingwavelabs/kube-bench/blob/main/manifests/tpch/tpch-modified-sinks.template.yaml#L7-L33

it turns out the performance is almost exactly the same as q1 for both RW and Flink (as shown in the notion page in the first thread):

RW: http://metabase.risingwave-cloud.xyz/question/10801-tpch-q0-bs-medium-1cn-affinity-avg-source-output-rows-per-second-rows-s-history-thtb-2971?start_date=2024-01-09 Flink: http://metabase.risingwave-cloud.xyz/question/10770-flink-tpch-q0-flink-medium-1tm-avg-job-throughput-per-second-records-s-history-thtb-2968?start_date=2024-01-08

So we can conclude that the computation is very light and that the bottleneck should be on the source.

As there is an outstanding issue for fixing the source's performance regression, we will wait for #14815 to be fixed first and see what should be the next step

github-actions[bot] commented 3 months ago

This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.