risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.97k stars 574 forks source link

discussion: nexmark q16 low performance #7271

Closed fuyufjh closed 1 year ago

fuyufjh commented 1 year ago

For Q16, RisingWave's throughput is much less than Flink.

CREATE MATERIALIZED VIEW nexmark_q16 AS
SELECT
    channel,
    to_char(date_time, 'YYYY-MM-DD') as "day",
    max(to_char(date_time, 'HH:mm')) as "minute",
    count(*) AS total_bids,
    count(*) filter (where price < 10000) AS rank1_bids,
    count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
    count(*) filter (where price >= 1000000) AS rank3_bids,
    count(distinct bidder) AS total_bidders,
    count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
    count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
    count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
    count(distinct auction) AS total_auctions,
    count(distinct auction) filter (where price < 10000) AS rank1_auctions,
    count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
    count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
FROM bid
GROUP BY channel, to_char(date_time, 'YYYY-MM-DD');

We are using 2-phase (expand -> agg), which might be the cause...

 StreamMaterialize { columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], pk_columns: [channel, day] }
 └─StreamProject { exprs: [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), max(max(ToChar(Field(bid, 5:Int32), 'HH:mm':Varchar))) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 1000000:Int32))) filter((flag = 0:Int64)), count(Field(bid, 1:Int32)) filter((flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 0:Int32)) filter((flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
   └─StreamHashAgg { group_key: [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar)], aggs: [count, max(max(ToChar(Field(bid, 5:Int32), 'HH:mm':Varchar))) filter((flag = 0:Int64)), sum0(count) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) < 10000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32))) filter((flag = 0:Int64)), sum0(count filter((Field(bid, 2:Int32) >= 1000000:Int32))) filter((flag = 0:Int64)), count(Field(bid, 1:Int32)) filter((flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 1:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 1:Int64)), count(Field(bid, 0:Int32)) filter((flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) < 10000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64)), count(Field(bid, 0:Int32)) filter((count filter((Field(bid, 2:Int32) >= 1000000:Int32)) > 0:Int64) AND (flag = 2:Int64))] }
     └─StreamExchange { dist: HashShard(Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar)) }
       └─StreamProject { exprs: [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32), Field(bid, 1:Int32), Field(bid, 1:Int32), Field(bid, 1:Int32), Field(bid, 0:Int32), Field(bid, 0:Int32), Field(bid, 0:Int32), Field(bid, 0:Int32), flag, max(ToChar(Field(bid, 5:Int32), 'HH:mm':Varchar)), count, count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32))] }
         └─StreamAppendOnlyHashAgg { group_key: [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32), Field(bid, 0:Int32), flag], aggs: [count, max(ToChar(Field(bid, 5:Int32), 'HH:mm':Varchar)), count, count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32)), count filter((Field(bid, 2:Int32) < 10000:Int32)), count filter((Field(bid, 2:Int32) >= 10000:Int32) AND (Field(bid, 2:Int32) < 1000000:Int32)), count filter((Field(bid, 2:Int32) >= 1000000:Int32))] }
           └─StreamExchange { dist: HashShard(Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32), Field(bid, 0:Int32), flag) }
             └─StreamExpand { column_subsets: [[Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), ToChar(Field(bid, 5:Int32), 'HH:mm':Varchar)], [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 1:Int32)], [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), Field(bid, 0:Int32)]] }
               └─StreamProject { exprs: [Field(bid, 3:Int32), ToChar(Field(bid, 5:Int32), 'YYYY-MM-DD':Varchar), ToChar(Field(bid, 5:Int32), 'HH:mm':Varchar), Field(bid, 2:Int32), Field(bid, 1:Int32), Field(bid, 0:Int32), _row_id] }
                 └─StreamFilter { predicate: (event_type = 2:Int32) }
                   └─StreamRowIdGen { row_id_index: 4 }
                     └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(12 rows)

The plan of Flink:

== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.discard_sink], fields=[channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions])
+- GroupAggregate(groupBy=[channel, day], select=[channel, day, MAX($f2) AS minute, COUNT(*) AS total_bids, COUNT(*) FILTER $f3 AS rank1_bids, COUNT(*) FILTER $f4 AS rank2_bids, COUNT(*) FILTER $f5 AS rank3_bids, COUNT(DISTINCT bidder) AS total_bidders, COUNT(DISTINCT bidder) FILTER $f3 AS rank1_bidders, COUNT(DISTINCT bidder) FILTER $f4 AS rank2_bidders, COUNT(DISTINCT bidder) FILTER $f5 AS rank3_bidders, COUNT(DISTINCT auction) AS total_auctions, COUNT(DISTINCT auction) FILTER $f3 AS rank1_auctions, COUNT(DISTINCT auction) FILTER $f4 AS rank2_auctions, COUNT(DISTINCT auction) FILTER $f5 AS rank3_auctions])
   +- Exchange(distribution=[hash[channel, day]])
      +- Calc(select=[bid.channel AS channel, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'HH:mm') AS $f2, IS TRUE(<(bid.price, 10000)) AS $f3, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f4, IS TRUE(>=(bid.price, 1000000)) AS $f5, bid.bidder AS bidder, bid.auction AS auction], where=[=(event_type, 2)])
         +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
            +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
               +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

For this case the cost of 2-phase distinct aggregation is

  1. An extra Exchange and an extra final HashAgg
  2. The final HashAgg is not append-only and there is a max()

Perhaps we should not use 2-phase distinct aggregation in streaming, especially for these with GROUP BY columns.

BugenZhao commented 1 year ago

2. The final HashAgg is not append-only and there is a max()

By putting all aggregation calls in a single Agg operator, we can only follow the worst one when talking about the property of the output or whether some optimization can apply. As we support DAG execution now, will it be better if we aggregate them separately and join the results back? 🤔

st1page commented 1 year ago
  1. In this case, the number of distinct values in each group is small and append-only. We can easily implement the APPEND_ONLY_DISTINCT_COUNT distinct count, whose state just stores each kind of distinct key. key: [group_key|distinct_key], value: []
  2. For the updatable stream, if the NDV is small, we can do it similarly. The key of the state is key: [group_key|distinct_key], value: [count]. But it means we need to read and write the state store for each operation from upstream, which seems expensive if we do not support ADD and SUB on the state store. Because we batching write the state store only when the barrier comes, maybe it is acceptable?
  3. As a more general, distributed, and parallel solution, we can use the join to join different distinct groups with the same join key(https://github.com/risingwavelabs/risingwave/issues/2915). I think we should use it when the number of groups is not big enough to parallel the process

BTW, for the 1 and 2. maybe we can use bloom filter to help to optimize?

fuyufjh commented 1 year ago
  1. The final HashAgg is not append-only and there is a max()

By putting all aggregation calls in a single Agg operator, we can only follow the worst one when talking about the property of the output or whether some optimization can apply. As we support DAG execution now, will it be better if we aggregate them separately and join the results back? 🤔

As a more general, distributed, and parallel solution, we can use the join to join different distinct groups with the same join key(https://github.com/risingwavelabs/risingwave/issues/2915). I think we should use it when the number of groups is not big enough to parallel the process

The DAG approach only mitigates but not eliminates the cost of 2-phase distinct aggregation. I don't suppose it could make the performance satisfying because the problem (2-phase) still exists.

Recall that when we discussed 2-phase aggregation, we noticed that 2-phase is not always a good thing: it improved parallelism but also brings the cost of an extra shuffle and an extra stateful agg, which is exactly the same problem here. Finally, we decided to use 2-phase aggregation for simple agg only (i.e. those without group by), so why not follow the same philosophy this time? That is, only use 2-phase distinct aggregation for queries without group by.

lmatz commented 1 year ago

By looking at the flame graph generated in https://buildkite.com/risingwavelabs/main-cron/builds/458#0187c454-58bd-439a-bf57-6f774ea41b80,

SCR-20230428-hxi

Of course, the executor may have to take this amount, but this is really quite a lot of time, considering Json Parser takes a dominant amount of time in other queries, let's see if we can optimize it further.

SCR-20230428-iwc

lmatz commented 1 year ago

https://github.com/risingwavelabs/risingwave/issues/9233 probably not a big deal

lmatz commented 1 year ago

get_row takes some time and it only happens when the cache does not contain the row, does cache evicts too much? SCR-20230428-irq

But not too significant difference across the recent days.

lmatz commented 1 year ago

As found by @BugenZhao and @st1page, the current pk of two distinct tables leads to poor performance... As shown below:

    Table 1 { columns: [ channel, $expr1, bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4, 5, 6 ], distribution key: [ 0, 1 ], read pk prefix len hint: 3 }

    Table 2 { columns: [ channel, $expr1, auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4, 5, 6 ], distribution key: [ 0, 1 ], read pk prefix len hint: 3 }

Now switch to :

    SELECT
      channel,
      to_char(date_time, 'yyyy-MM-dd') AS day,
      max(to_char(date_time, 'HH:mm')) AS minute,
      count(*) AS total_bids,
      count(*) filter (where price < 10000) AS rank1_bids,
      count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
      count(*) filter (where price >= 1000000) AS rank3_bids,
      count(distinct bidder) AS total_bidders,
      count(distinct bidder) filter (where price < 10000) AS rank1_bidders,
      count(distinct bidder) filter (where price >= 10000 and price < 1000000) AS rank2_bidders,
      count(distinct bidder) filter (where price >= 1000000) AS rank3_bidders,
      count(distinct auction) AS total_auctions,
      count(distinct auction) filter (where price < 10000) AS rank1_auctions,
      count(distinct auction) filter (where price >= 10000 and price < 1000000) AS rank2_auctions,
      count(distinct auction) filter (where price >= 1000000) AS rank3_auctions
    FROM bid
    GROUP BY to_char(date_time, 'yyyy-MM-dd'), channel;

    Fragment 0
    StreamMaterialize { columns: [channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions], stream_key: [day, channel], pk_columns: [day, channel], pk_conflict: "NoCheck" } { materialized table: 4294967294 }
    └── StreamProject { exprs: [channel, $expr1, max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32))] }
        └── StreamAppendOnlyHashAgg { group_key: [$expr1, channel], aggs: [max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32))] }
            ├── result table: 0
            ├── state tables: []
            ├── distinct tables: [ (distinct key: bidder, table id: 1), (distinct key: auction, table id: 2) ]
            └──  StreamExchange Hash([0, 1]) from 1

    Fragment 1
    StreamProject { exprs: [ToChar(date_time, 'yyyy-MM-dd':Varchar) as $expr1, channel, ToChar(date_time, 'HH:mm':Varchar) as $expr2, price, bidder, auction, _row_id] }
    └── StreamRowIdGen { row_id_index: 7 }
        └──  StreamSource { source: "bid", columns: ["auction", "bidder", "price", "channel", "url", "date_time", "extra", "_row_id"] } { source state table: 3 }

    Table 0
    ├── columns: [ $expr1, channel, max($expr2), count, count filter((price < 10000:Int32)), count filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count filter((price >= 1000000:Int32)), count(distinct bidder), count(distinct bidder) filter((price < 10000:Int32)), count(distinct bidder) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct bidder) filter((price >= 1000000:Int32)), count(distinct auction), count(distinct auction) filter((price < 10000:Int32)), count(distinct auction) filter((price >= 10000:Int32) AND (price < 1000000:Int32)), count(distinct auction) filter((price >= 1000000:Int32)) ]
    ├── primary key: [ $0 ASC, $1 ASC ]
    ├── value indices: [ 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 ]
    ├── distribution key: [ 0, 1 ]
    └── read pk prefix len hint: 2

    Table 1 { columns: [ $expr1, channel, bidder, count_for_agg_call_5, count_for_agg_call_6, count_for_agg_call_7, count_for_agg_call_8 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4, 5, 6 ], distribution key: [ 0, 1 ], read pk prefix len hint: 3 }

    Table 2 { columns: [ $expr1, channel, auction, count_for_agg_call_9, count_for_agg_call_10, count_for_agg_call_11, count_for_agg_call_12 ], primary key: [ $0 ASC, $1 ASC, $2 ASC ], value indices: [ 3, 4, 5, 6 ], distribution key: [ 0, 1 ], read pk prefix len hint: 3 }

    Table 3 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }

    Table 4294967294 { columns: [ channel, day, minute, total_bids, rank1_bids, rank2_bids, rank3_bids, total_bidders, rank1_bidders, rank2_bidders, rank3_bidders, total_auctions, rank1_auctions, rank2_auctions, rank3_auctions ], primary key: [ $1 ASC, $0 ASC ], value indices: [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14 ], distribution key: [ 1, 0 ], read pk prefix len hint: 2 }
lmatz commented 1 year ago
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.nexmark_q16], fields=[channel, day, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, $f13, $f14])
+- GroupAggregate(groupBy=[channel, day], partialFinalType=[FINAL], select=[channel, day, MAX($f5) AS $f2, $SUM0_RETRACT($f6) AS $f3, $SUM0_RETRACT($f7) AS $f4, $SUM0_RETRACT($f8_0) AS $f5, $SUM0_RETRACT($f9_0) AS $f6, $SUM0_RETRACT($f10_0) AS $f7, $SUM0_RETRACT($f11) AS $f8, $SUM0_RETRACT($f12) AS $f9, $SUM0_RETRACT($f13) AS $f10, $SUM0_RETRACT($f14) AS $f11, $SUM0_RETRACT($f15) AS $f12, $SUM0_RETRACT($f16) AS $f13, $SUM0_RETRACT($f17) AS $f14])
   +- Exchange(distribution=[hash[channel, day]])
      +- GroupAggregate(groupBy=[channel, day, $f8, $f9, $f10], partialFinalType=[PARTIAL], select=[channel, day, $f8, $f9, $f10, MAX($f2) FILTER $g_3 AS $f5, COUNT(*) FILTER $g_7 AS $f6, COUNT(*) FILTER $g_70 AS $f7, COUNT(*) FILTER $g_71 AS $f8_0, COUNT(*) FILTER $g_72 AS $f9_0, COUNT(DISTINCT bidder) FILTER $g_5 AS $f10_0, COUNT(DISTINCT bidder) FILTER $g_50 AS $f11, COUNT(DISTINCT bidder) FILTER $g_51 AS $f12, COUNT(DISTINCT bidder) FILTER $g_52 AS $f13, COUNT(DISTINCT auction) FILTER $g_6 AS $f14, COUNT(DISTINCT auction) FILTER $g_60 AS $f15, COUNT(DISTINCT auction) FILTER $g_61 AS $f16, COUNT(DISTINCT auction) FILTER $g_62 AS $f17])
         +- Exchange(distribution=[hash[channel, day, $f8, $f9, $f10]])
            +- Calc(select=[channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, $f9, $f10, =($e, 3) AS $g_3, =($e, 7) AS $g_7, AND(=($e, 7), $f3) AS $g_70, AND(=($e, 7), $f4) AS $g_71, AND(=($e, 7), $f5) AS $g_72, =($e, 5) AS $g_5, AND(=($e, 5), $f3) AS $g_50, AND(=($e, 5), $f4) AS $g_51, AND(=($e, 5), $f5) AS $g_52, =($e, 6) AS $g_6, AND(=($e, 6), $f3) AS $g_60, AND(=($e, 6), $f4) AS $g_61, AND(=($e, 6), $f5) AS $g_62])
               +- Expand(projects=[{channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, null AS $f9, null AS $f10, 3 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, $f9, null AS $f10, 5 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, $f10, 6 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, null AS $f10, 7 AS $e}])
                  +- Calc(select=[bid.channel AS channel, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'yyyy-MM-dd') AS day, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'HH:mm') AS $f2, IS TRUE(<(bid.price, 10000)) AS $f3, IS TRUE(SEARCH(bid.price, Sarg[[10000..1000000)])) AS $f4, IS TRUE(>=(bid.price, 1000000)) AS $f5, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), _UTF-16LE'HH:mm')), 1024) AS $f8, MOD(HASH_CODE(bid.bidder), 1024) AS $f9, MOD(HASH_CODE(bid.auction), 1024) AS $f10], where=[=(event_type, 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.nexmark_q16], fields=[channel, day, $f2, $f3, $f4, $f5, $f6, $f7, $f8, $f9, $f10, $f11, $f12, $f13, $f14])
+- GroupAggregate(groupBy=[channel, day], partialFinalType=[FINAL], select=[channel, day, MAX($f5) AS $f2, $SUM0_RETRACT($f6) AS $f3, $SUM0_RETRACT($f7) AS $f4, $SUM0_RETRACT($f8_0) AS $f5, $SUM0_RETRACT($f9_0) AS $f6, $SUM0_RETRACT($f10_0) AS $f7, $SUM0_RETRACT($f11) AS $f8, $SUM0_RETRACT($f12) AS $f9, $SUM0_RETRACT($f13) AS $f10, $SUM0_RETRACT($f14) AS $f11, $SUM0_RETRACT($f15) AS $f12, $SUM0_RETRACT($f16) AS $f13, $SUM0_RETRACT($f17) AS $f14])
   +- Exchange(distribution=[hash[channel, day]])
      +- GroupAggregate(groupBy=[channel, day, $f8, $f9, $f10], partialFinalType=[PARTIAL], select=[channel, day, $f8, $f9, $f10, MAX($f2) FILTER $g_3 AS $f5, COUNT(*) FILTER $g_7 AS $f6, COUNT(*) FILTER $g_70 AS $f7, COUNT(*) FILTER $g_71 AS $f8_0, COUNT(*) FILTER $g_72 AS $f9_0, COUNT(DISTINCT bidder) FILTER $g_5 AS $f10_0, COUNT(DISTINCT bidder) FILTER $g_50 AS $f11, COUNT(DISTINCT bidder) FILTER $g_51 AS $f12, COUNT(DISTINCT bidder) FILTER $g_52 AS $f13, COUNT(DISTINCT auction) FILTER $g_6 AS $f14, COUNT(DISTINCT auction) FILTER $g_60 AS $f15, COUNT(DISTINCT auction) FILTER $g_61 AS $f16, COUNT(DISTINCT auction) FILTER $g_62 AS $f17])
         +- Exchange(distribution=[hash[channel, day, $f8, $f9, $f10]])
            +- Calc(select=[channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, $f9, $f10, ($e = 3) AS $g_3, ($e = 7) AS $g_7, (($e = 7) AND $f3) AS $g_70, (($e = 7) AND $f4) AS $g_71, (($e = 7) AND $f5) AS $g_72, ($e = 5) AS $g_5, (($e = 5) AND $f3) AS $g_50, (($e = 5) AND $f4) AS $g_51, (($e = 5) AND $f5) AS $g_52, ($e = 6) AS $g_6, (($e = 6) AND $f3) AS $g_60, (($e = 6) AND $f4) AS $g_61, (($e = 6) AND $f5) AS $g_62])
               +- Expand(projects=[{channel, day, $f2, $f3, $f4, $f5, bidder, auction, $f8, null AS $f9, null AS $f10, 3 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, $f9, null AS $f10, 5 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, $f10, 6 AS $e}, {channel, day, $f2, $f3, $f4, $f5, bidder, auction, null AS $f8, null AS $f9, null AS $f10, 7 AS $e}])
                  +- Calc(select=[bid.channel AS channel, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'yyyy-MM-dd') AS day, DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'HH:mm') AS $f2, (bid.price < 10000) IS TRUE AS $f3, SEARCH(bid.price, Sarg[[10000..1000000)]) IS TRUE AS $f4, (bid.price >= 1000000) IS TRUE AS $f5, bid.bidder AS bidder, bid.auction AS auction, MOD(HASH_CODE(DATE_FORMAT(CAST(dateTime AS TIMESTAMP(3)), 'HH:mm')), 1024) AS $f8, MOD(HASH_CODE(bid.bidder), 1024) AS $f9, MOD(HASH_CODE(bid.auction), 1024) AS $f10], where=[(event_type = 2)])
                     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])
                        +- Calc(select=[event_type, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
                           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

with table.optimizer.distinct-agg.split.enabled: true

This configuration is set by Flink' Nexmark Repo and also run in our own testing pipeline

fuyufjh commented 1 year ago

Since lots of progress has been made, let's move to https://github.com/risingwavelabs/risingwave/issues/10777 and close this one.