risingwavelabs / risingwave

Best-in-class stream processing, analytics, and management. Perform continuous analytics, or build event-driven applications, real-time ETL pipelines, and feature stores in minutes. Unified streaming and batch. PostgreSQL compatible.
https://go.risingwave.com/slack
Apache License 2.0
6.78k stars 561 forks source link

perf: nexmark q3 #8157

Open lmatz opened 1 year ago

lmatz commented 1 year ago

Query:

CREATE MATERIALIZED VIEW nexmark_q3
AS
SELECT
    P.name, P.city, P.state, A.id
FROM
    auction AS A INNER JOIN person AS P on A.seller = P.id
WHERE
    A.category = 10 and (P.state = 'or' OR P.state = 'id' OR P.state = 'ca');

RW:

 StreamMaterialize { columns: [name, city, state, id, _row_id(hidden), $expr350(hidden), _row_id#1(hidden), $expr351(hidden)], pk_columns: [_row_id, _row_id#1, $expr350, $expr351], pk_conflict: "no check" }
 └─StreamAppendOnlyHashJoin { type: Inner, predicate: $expr2 = $expr3 }
   ├─StreamExchange { dist: HashShard($expr2) }
   | └─StreamProject { exprs: [Field(auction, 0:Int32) as $expr1, Field(auction, 7:Int32) as $expr2, _row_id] }
   |   └─StreamFilter { predicate: (Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32) }
   |     └─StreamProject { exprs: [event_type, person, auction, _row_id] }
   |       └─StreamShare { id = 401 }
   |         └─StreamProject { exprs: [event_type, person, auction, _row_id] }
   |           └─StreamFilter { predicate: (((Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32)) OR ((((Field(person, 5:Int32) = 'or':Varchar) OR (Field(person, 5:Int32) = 'id':Varchar)) OR (Field(person, 5:Int32) = 'ca':Varchar)) AND (event_type = 0:Int32))) }
   |             └─StreamRowIdGen { row_id_index: 4 }
   |               └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
   └─StreamExchange { dist: HashShard($expr3) }
     └─StreamProject { exprs: [Field(person, 0:Int32) as $expr3, Field(person, 1:Int32) as $expr4, Field(person, 4:Int32) as $expr5, Field(person, 5:Int32) as $expr6, _row_id] }
       └─StreamFilter { predicate: (((Field(person, 5:Int32) = 'or':Varchar) OR (Field(person, 5:Int32) = 'id':Varchar)) OR (Field(person, 5:Int32) = 'ca':Varchar)) AND (event_type = 0:Int32) }
         └─StreamProject { exprs: [event_type, person, auction, _row_id] }
           └─StreamShare { id = 401 }
             └─StreamProject { exprs: [event_type, person, auction, _row_id] }
               └─StreamFilter { predicate: (((Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32)) OR ((((Field(person, 5:Int32) = 'or':Varchar) OR (Field(person, 5:Int32) = 'id':Varchar)) OR (Field(person, 5:Int32) = 'ca':Varchar)) AND (event_type = 0:Int32))) }
                 └─StreamRowIdGen { row_id_index: 4 }
                   └─StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
(20 rows)

 Fragment 0
   StreamMaterialize { columns: [name, city, state, id, _row_id(hidden), $expr350(hidden), _row_id#1(hidden), $expr351(hidden)], pk_columns: [_row_id, _row_id#1, $expr350, $expr351], pk_conflict: "no check" }
       materialized table: 4294967294
     StreamAppendOnlyHashJoin { type: Inner, predicate: $expr350 = $expr351 }
         left table: 0, right table 2, left degree table: 1, right degree table: 3,
       StreamExchange Hash([1]) from 1
       StreamExchange Hash([0]) from 3

 Fragment 1
   StreamProject { exprs: [Field(auction, 0:Int32) as $expr349, Field(auction, 7:Int32) as $expr350, _row_id] }
     StreamFilter { predicate: (Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32) }
       StreamProject { exprs: [event_type, person, auction, _row_id] }
         StreamExchange Hash([3]) from 2

 Fragment 2
   StreamProject { exprs: [event_type, person, auction, _row_id] }
     StreamFilter { predicate: (((Field(auction, 8:Int32) = 10:Int32) AND (event_type = 1:Int32)) OR ((((Field(person, 5:Int32) = 'or':Varchar) OR (Field(person, 5:Int32) = 'id':Varchar)) OR (Field(person, 5:Int32) = 'ca':Varchar)) AND (event_type = 0:Int32))) }
       StreamRowIdGen { row_id_index: 4 }
         StreamSource { source: "nexmark", columns: ["event_type", "person", "auction", "bid", "_row_id"] }
             source state table: 4

 Fragment 3
   StreamProject { exprs: [Field(person, 0:Int32) as $expr351, Field(person, 1:Int32) as $expr352, Field(person, 4:Int32) as $expr353, Field(person, 5:Int32) as $expr354, _row_id] }
     StreamFilter { predicate: (((Field(person, 5:Int32) = 'or':Varchar) OR (Field(person, 5:Int32) = 'id':Varchar)) OR (Field(person, 5:Int32) = 'ca':Varchar)) AND (event_type = 0:Int32) }
       StreamProject { exprs: [event_type, person, auction, _row_id] }
         StreamExchange Hash([3]) from 2

  Table 0 { columns: [$expr349, $expr350, _row_id], primary key: [$1 ASC, $2 ASC], value indices: [0, 1, 2], distribution key: [1] }
  Table 1 { columns: [$expr350, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
  Table 2 { columns: [$expr351, $expr352, $expr353, $expr354, _row_id], primary key: [$0 ASC, $4 ASC], value indices: [0, 1, 2, 3, 4], distribution key: [0] }
  Table 3 { columns: [$expr351, _row_id, _degree], primary key: [$0 ASC, $1 ASC], value indices: [2], distribution key: [0] }
  Table 4 { columns: [partition_id, offset], primary key: [$0 ASC], value indices: [0, 1], distribution key: [] }
  Table 4294967294 { columns: [name, city, state, id, _row_id, $expr350, _row_id#1, $expr351], primary key: [$4 ASC, $6 ASC, $5 ASC, $7 ASC], value indices: [0, 1, 2, 3, 4, 5, 6, 7], distribution key: [5] }
(33 rows)
lmatz commented 1 year ago

Flink:

== Optimized Physical Plan ==
Calc(select=[name, city, state, id])
+- Join(joinType=[InnerJoin], where=[=(seller, id0)], select=[id, seller, id0, name, city, state], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[seller]])
   :  +- Calc(select=[auction.id AS id, auction.seller AS seller], where=[AND(=(event_type, 1), SEARCH(auction.category, Sarg[10L:BIGINT]:BIGINT))])
   :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
   :        +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
   :           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[person.id AS id, person.name AS name, person.city AS city, person.state AS state], where=[AND(SEARCH(event_type, Sarg[0]), SEARCH(person.state, Sarg[_UTF-16LE'CA':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'ID':VARCHAR(2147483647) CHARACTER SET "UTF-16LE", _UTF-16LE'OR':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE"))])
         +- WatermarkAssigner(rowtime=[dateTime], watermark=[-(dateTime, 4000:INTERVAL SECOND)])
            +- Calc(select=[event_type, person, auction, bid, CASE(=(event_type, 0), person.dateTime, =(event_type, 1), auction.dateTime, bid.dateTime) AS dateTime])
               +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])

== Optimized Execution Plan ==
Calc(select=[name, city, state, id])
+- Join(joinType=[InnerJoin], where=[(seller = id0)], select=[id, seller, id0, name, city, state], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
   :- Exchange(distribution=[hash[seller]])
   :  +- Calc(select=[auction.id AS id, auction.seller AS seller], where=[((event_type = 1) AND SEARCH(auction.category, Sarg[10]))])
   :     +- WatermarkAssigner(rowtime=[dateTime], watermark=[(dateTime - 4000:INTERVAL SECOND)])(reuse_id=[1])
   :        +- Calc(select=[event_type, person, auction, bid, CASE((event_type = 0), person.dateTime, (event_type = 1), auction.dateTime, bid.dateTime) AS dateTime])
   :           +- TableSourceScan(table=[[default_catalog, default_database, datagen]], fields=[event_type, person, auction, bid])
   +- Exchange(distribution=[hash[id]])
      +- Calc(select=[person.id AS id, person.name AS name, person.city AS city, person.state AS state], where=[(SEARCH(event_type, Sarg[0]) AND SEARCH(person.state, Sarg[_UTF-16LE'CA', _UTF-16LE'ID', _UTF-16LE'OR']))])
         +- Reused(reference_id=[1])
lmatz commented 1 year ago

Sarg:https://calcite.apache.org/javadocAggregate/org/apache/calcite/util/Sarg.html

or Or?

xxchan commented 1 year ago

SEARCH -> an HashSet or a concatenation of OR https://github.com/apache/flink/blob/d9102ddd755ec26d14256bcb733d814e063acd2b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/SearchOperatorGen.scala#L45