Open lmatz opened 10 months ago
RW:
create sink tpch_q17 as
select
sum(l_extendedprice) / 7.0 as avg_yearly
from
lineitem,
part
where
p_partkey = l_partkey
-- and p_brand = 'Brand#13'
-- and p_container = 'JUMBO PKG'
and l_quantity < (
select
220.2 * avg(l_quantity)
from
lineitem
where
l_partkey = p_partkey
)
with ( connector = 'blackhole', type = 'append-only', force_append_only = 'true');
Plan:
StreamSink { type: append-only, columns: [avg_yearly] }
└─StreamProject { exprs: [(sum(sum($expr3)) / 7.0:Decimal) as $expr6] }
└─StreamSimpleAgg { aggs: [sum(sum($expr3)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum($expr3)] }
└─StreamProject { exprs: [$expr3, _row_id, _row_id, $expr1, $expr4, $expr4] }
└─StreamFilter { predicate: ($expr2 < $expr5) }
└─StreamHashJoin { type: Inner, predicate: $expr4 IS NOT DISTINCT FROM $expr4 }
├─StreamExchange { dist: HashShard($expr4) }
│ └─StreamShare { id: 13 }
│ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
│ ├─StreamExchange { dist: HashShard($expr1) }
│ │ └─StreamShare { id: 7 }
│ │ └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
│ │ └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr4) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
│ └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamProject { exprs: [$expr4, (220.2:Decimal * (sum($expr2) / count($expr2)::Decimal)) as $expr5] }
└─StreamHashAgg { group_key: [$expr4], aggs: [sum($expr2), count($expr2), count] }
└─StreamHashJoin { type: LeftOuter, predicate: $expr4 IS NOT DISTINCT FROM $expr1 }
├─StreamAppendOnlyDedup { dedup_cols: [$expr4] }
│ └─StreamExchange { dist: HashShard($expr4) }
│ └─StreamProject { exprs: [$expr4] }
│ └─StreamShare { id: 13 }
│ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
│ ├─StreamExchange { dist: HashShard($expr1) }
│ │ └─StreamShare { id: 7 }
│ │ └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
│ │ └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr4) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
│ └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamProject { exprs: [$expr1, $expr2, _row_id] }
└─StreamFilter { predicate: IsNotNull($expr1) }
└─StreamShare { id: 7 }
└─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
└─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
└─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(64 rows)
Dist Plan:
Fragment 0
StreamSink { type: append-only, columns: [avg_yearly] } { tables: [ Sink: 0 ] }
└── StreamProject { exprs: [(sum(sum($expr3)) / 7.0:Decimal) as $expr6] }
└── StreamSimpleAgg { aggs: [sum(sum($expr3)), count] }
├── tables: [ SimpleAggState: 1 ]
└── StreamExchange Single from 1
Fragment 1
StreamStatelessSimpleAgg { aggs: [sum($expr3)] }
└── StreamProject { exprs: [$expr3, _row_id, _row_id, $expr1, $expr4, $expr4] }
└── StreamFilter { predicate: ($expr2 < $expr5) }
└── StreamHashJoin { type: Inner, predicate: $expr4 IS NOT DISTINCT FROM $expr4 }
├── tables: [ HashJoinLeft: 2, HashJoinDegreeLeft: 3, HashJoinRight: 4, HashJoinDegreeRight: 5 ]
├── StreamExchange Hash([2]) from 2
└── StreamProject { exprs: [$expr4, (220.2:Decimal * (sum($expr2) / count($expr2)::Decimal)) as $expr5] }
└── StreamHashAgg { group_key: [$expr4], aggs: [sum($expr2), count($expr2), count] }
├── tables: [ HashAggState: 11 ]
└── StreamHashJoin { type: LeftOuter, predicate: $expr4 IS NOT DISTINCT FROM $expr1 }
├── tables:
│ ┌── HashJoinLeft: 12
│ ├── HashJoinDegreeLeft: 13
│ ├── HashJoinRight: 14
│ └── HashJoinDegreeRight: 15
├── StreamAppendOnlyDedup { dedup_cols: [$expr4] } { tables: [ AppendOnlyDedup: 16 ] }
│ └── StreamExchange Hash([0]) from 8
└── StreamExchange Hash([0]) from 9
Fragment 2
StreamNoOp
└── StreamExchange NoShuffle from 3
Fragment 3
StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
├── tables: [ HashJoinLeft: 6, HashJoinDegreeLeft: 7, HashJoinRight: 8, HashJoinDegreeRight: 9 ]
├── StreamExchange Hash([0]) from 4
└── StreamExchange Hash([0]) from 7
Fragment 4
StreamNoOp
└── StreamExchange NoShuffle from 5
Fragment 5
StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
└── StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└── StreamExchange NoShuffle from 6
Fragment 6
StreamProject { exprs: [eventType, lineitem, part, _row_id] }
└── StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
└── StreamRowIdGen { row_id_index: 10 }
└── StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└── tables: [ Source: 10 ]
Fragment 7
StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
└── StreamFilter { predicate: (eventType = 'part':Varchar) }
└── StreamExchange NoShuffle from 6
Fragment 8
StreamProject { exprs: [$expr4] }
└── StreamExchange NoShuffle from 3
Fragment 9
StreamProject { exprs: [$expr1, $expr2, _row_id] }
└── StreamFilter { predicate: IsNotNull($expr1) }
└── StreamExchange NoShuffle from 5
Table 0
├── columns: [ kv_log_store_epoch, kv_log_store_seq_id, kv_log_store_row_op, avg_yearly ]
├── primary key: [ $0 ASC, $1 ASC ]
├── value indices: [ 0, 1, 2, 3 ]
├── distribution key: []
└── read pk prefix len hint: 2
Table 1 { columns: [ sum(sum($expr3)), count ], primary key: [], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 0 }
Table 2
├── columns: [ $expr2, $expr3, $expr4, _row_id, $expr1, _row_id_0 ]
├── primary key: [ $2 ASC, $3 ASC, $5 ASC, $4 ASC ]
├── value indices: [ 0, 1, 2, 3, 4, 5 ]
├── distribution key: [ 2 ]
└── read pk prefix len hint: 1
Table 3
├── columns: [ $expr4, _row_id, _row_id_0, $expr1, _degree ]
├── primary key: [ $0 ASC, $1 ASC, $2 ASC, $3 ASC ]
├── value indices: [ 4 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 4 { columns: [ $expr4, $expr5 ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 5 { columns: [ $expr4, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 6
├── columns: [ $expr1, $expr2, $expr3, _row_id ]
├── primary key: [ $0 ASC, $3 ASC ]
├── value indices: [ 0, 1, 2, 3 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 7 { columns: [ $expr1, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 8 { columns: [ $expr4, _row_id ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 0, 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 9 { columns: [ $expr4, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 10 { columns: [ partition_id, offset_info ], primary key: [ $0 ASC ], value indices: [ 0, 1 ], distribution key: [], read pk prefix len hint: 1 }
Table 11
├── columns: [ $expr4, sum($expr2), count($expr2), count ]
├── primary key: [ $0 ASC ]
├── value indices: [ 1, 2, 3 ]
├── distribution key: [ 0 ]
└── read pk prefix len hint: 1
Table 12 { columns: [ $expr4 ], primary key: [ $0 ASC ], value indices: [ 0 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 13 { columns: [ $expr4, _degree ], primary key: [ $0 ASC ], value indices: [ 1 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 14 { columns: [ $expr1, $expr2, _row_id ], primary key: [ $0 ASC, $2 ASC ], value indices: [ 0, 1, 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 15 { columns: [ $expr1, _row_id, _degree ], primary key: [ $0 ASC, $1 ASC ], value indices: [ 2 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
Table 16 { columns: [ $expr4 ], primary key: [ $0 ASC ], value indices: [ 0 ], distribution key: [ 0 ], read pk prefix len hint: 1 }
(126 rows)
Flink:
INSERT INTO tpch_q17
select
sum(l_extendedprice) / 7.0 as avg_yearly
from
lineitem,
part
where
p_partkey = l_partkey
-- and p_brand = 'Brand#13'
-- and p_container = 'JUMBO PKG'
and l_quantity < (
select
20.2 * avg(l_quantity)
from
lineitem
where
l_partkey = p_partkey
);
Plan:
== Optimized Physical Plan ==
Sink(table=[default_catalog.default_database.tpch_q17], fields=[avg_yearly])
+- Calc(select=[CAST(/($f0, 7.0:DECIMAL(2, 1)) AS DECIMAL(10, 0)) AS avg_yearly])
+- GroupAggregate(select=[SUM_RETRACT(l_extendedprice) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[l_extendedprice])
+- Join(joinType=[InnerJoin], where=[AND(=(p_partkey, l_partkey), <(l_quantity, *(20.2:DECIMAL(3, 1), $f1)))], select=[l_quantity, l_extendedprice, p_partkey, l_partkey, $f1], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[p_partkey]])
: +- Calc(select=[l_quantity, l_extendedprice, p_partkey])
: +- Join(joinType=[InnerJoin], where=[=(p_partkey, l_partkey)], select=[l_partkey, l_quantity, l_extendedprice, p_partkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: :- Exchange(distribution=[hash[l_partkey]])
: : +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice], where=[=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
: : +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
: +- Exchange(distribution=[hash[p_partkey]])
: +- Calc(select=[part.p_partkey AS p_partkey], where=[=(eventType, _UTF-16LE'part':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")])
: +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
+- Exchange(distribution=[hash[l_partkey]])
+- GroupAggregate(groupBy=[l_partkey], select=[l_partkey, AVG(l_quantity) AS $f1])
+- Exchange(distribution=[hash[l_partkey]])
+- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity], where=[AND(=(eventType, _UTF-16LE'lineitem':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), IS NOT NULL(lineitem.l_partkey))])
+- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])
== Optimized Execution Plan ==
Sink(table=[default_catalog.default_database.tpch_q17], fields=[avg_yearly])
+- Calc(select=[CAST(($f0 / 7.0) AS DECIMAL(10, 0)) AS avg_yearly])
+- GroupAggregate(select=[SUM_RETRACT(l_extendedprice) AS $f0])
+- Exchange(distribution=[single])
+- Calc(select=[l_extendedprice])
+- Join(joinType=[InnerJoin], where=[((p_partkey = l_partkey) AND (l_quantity < (20.2 * $f1)))], select=[l_quantity, l_extendedprice, p_partkey, l_partkey, $f1], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
:- Exchange(distribution=[hash[p_partkey]])
: +- Calc(select=[l_quantity, l_extendedprice, p_partkey])
: +- Join(joinType=[InnerJoin], where=[(p_partkey = l_partkey)], select=[l_partkey, l_quantity, l_extendedprice, p_partkey], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
: :- Exchange(distribution=[hash[l_partkey]])
: : +- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity, lineitem.l_extendedprice AS l_extendedprice], where=[(eventType = 'lineitem')])
: : +- TableSourceScan(table=[[default_catalog, default_database, tpch]], fields=[eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region])(reuse_id=[1])
: +- Exchange(distribution=[hash[p_partkey]])
: +- Calc(select=[part.p_partkey AS p_partkey], where=[(eventType = 'part')])
: +- Reused(reference_id=[1])
+- Exchange(distribution=[hash[l_partkey]])
+- GroupAggregate(groupBy=[l_partkey], select=[l_partkey, AVG(l_quantity) AS $f1])
+- Exchange(distribution=[hash[l_partkey]])
+- Calc(select=[lineitem.l_partkey AS l_partkey, lineitem.l_quantity AS l_quantity], where=[((eventType = 'lineitem') AND lineitem.l_partkey IS NOT NULL)])
+- Reused(reference_id=[1])
We notice that
there are 4 StreamHashJoin
in Risingwave's query plan
even after we discount the 2 StreamHashJoin
s that are shared by StreamShare { id: 13 }
we still got 4 - (2-1) = 3 StreamHashJoin
:
2 InnerJoin
1 LeftOuter
while there are only 2 Join in Flink's query plan: 2 InnerJoin
After #15247 and since nightly-20240306
RW's plan now becomes:
StreamSink { type: append-only, columns: [avg_yearly] }
└─StreamProject { exprs: [(sum(sum($expr3)) / 7.0:Decimal) as $expr6] }
└─StreamSimpleAgg { aggs: [sum(sum($expr3)), count] }
└─StreamExchange { dist: Single }
└─StreamStatelessSimpleAgg { aggs: [sum($expr3)] }
└─StreamProject { exprs: [$expr3, _row_id, _row_id, $expr1, $expr4, $expr1] }
└─StreamFilter { predicate: ($expr2 < $expr5) }
└─StreamHashJoin { type: Inner, predicate: $expr4 = $expr1 }
├─StreamExchange { dist: HashShard($expr4) }
│ └─StreamHashJoin [append_only] { type: Inner, predicate: $expr1 = $expr4 }
│ ├─StreamExchange { dist: HashShard($expr1) }
│ │ └─StreamShare { id: 7 }
│ │ └─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
│ │ └─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
│ │ └─StreamShare { id: 4 }
│ │ └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
│ │ └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ │ └─StreamRowIdGen { row_id_index: 10 }
│ │ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
│ └─StreamExchange { dist: HashShard($expr4) }
│ └─StreamProject { exprs: [Field(part, 0:Int32) as $expr4, _row_id] }
│ └─StreamFilter { predicate: (eventType = 'part':Varchar) }
│ └─StreamShare { id: 4 }
│ └─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
│ └─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
│ └─StreamRowIdGen { row_id_index: 10 }
│ └─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
└─StreamProject { exprs: [(220.2:Decimal * (sum($expr2) / count($expr2)::Decimal)) as $expr5, $expr1] }
└─StreamHashAgg [append_only] { group_key: [$expr1], aggs: [sum($expr2), count($expr2), count] }
└─StreamExchange { dist: HashShard($expr1) }
└─StreamShare { id: 7 }
└─StreamProject { exprs: [Field(lineitem, 1:Int32) as $expr1, Field(lineitem, 4:Int32) as $expr2, Field(lineitem, 5:Int32) as $expr3, _row_id] }
└─StreamFilter { predicate: (eventType = 'lineitem':Varchar) }
└─StreamShare { id: 4 }
└─StreamProject { exprs: [eventType, lineitem, part, _row_id] }
└─StreamFilter { predicate: ((eventType = 'part':Varchar) OR (eventType = 'lineitem':Varchar)) }
└─StreamRowIdGen { row_id_index: 10 }
└─StreamSource { source: tpch, columns: [eventType, lineitem, supplier, part, partsupp, customer, orders, nation, region, _rw_kafka_timestamp, _row_id] }
(38 rows)
looks like #15305 again, same phenomenon as TPC-H q20: #q20
This issue has been open for 60 days with no activity. Could you please update the status? Feel free to continue discussion or close as not planned.
See performance numbers at https://www.notion.so/risingwave-labs/TPCH-Performance-Numbers-Table-e098ef82884546949333409f0513ada7?pvs=4#8de0bf4bda51444c8381f3b0c10ddfe1