Open b-slim opened 2 years ago
What version are you running? I'm not able to reproduce the problem with the query you provided above:
explain
select l.orderkey from (select orderkey from tpch.tiny.lineitem union all select * from (values 1) v(orderkey) ) l
join
tpch.tiny.orders o on l.orderkey = o.orderkey;
Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [orderkey]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[orderkey]
│ Layout: [orderkey:bigint]
│ Estimates: {rows: 60176 (528.89kB), cpu: 5.81M, memory: 263.67kB, network: 792.58kB}
└─ RemoteSource[1]
Layout: [orderkey:bigint]
Fragment 1 [tpch:orders:15000]
Output layout: [orderkey]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
InnerJoin[("orderkey" = "orderkey_3")][$hashvalue, $hashvalue_8]
│ Layout: [orderkey:bigint]
│ Estimates: {rows: 60176 (528.89kB), cpu: 5.81M, memory: 263.67kB, network: 263.69kB}
│ Distribution: PARTITIONED
│ dynamicFilterAssignments = {orderkey_3 -> #df_395}
├─ LocalExchange[ROUND_ROBIN] ()
│ │ Layout: [orderkey:bigint, $hashvalue:bigint]
│ │ Estimates: {rows: 60176 (1.03MB), cpu: 3.10M, memory: 0B, network: 18B}
│ ├─ ScanFilterProject[table = tpch:lineitem:sf0.01, grouped = false, filterPredicate = true, dynamicFilters = {"orderkey_0" = #df_395}]
│ │ Layout: [orderkey_0:bigint, $hashvalue_5:bigint]
│ │ Estimates: {rows: 60175 (1.03MB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 60175 (1.03MB), cpu: 1.03M, memory: 0B, network: 0B}/{ro
│ │ $hashvalue_5 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_0"), 0))
│ │ orderkey_0 := tpch:orderkey
│ └─ RemoteSource[2]
│ Layout: [field_1:bigint, $hashvalue_6:bigint]
└─ LocalExchange[HASH][$hashvalue_8] ("orderkey_3")
│ Layout: [orderkey_3:bigint, $hashvalue_8:bigint]
│ Estimates: {rows: 15000 (263.67kB), cpu: 922.85k, memory: 0B, network: 263.67kB}
└─ RemoteSource[3]
Fragment 2 [SINGLE]
Output layout: [field_1, $hashvalue_7]
Output partitioning: tpch:orders:15000 [field_1]
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [field_1:bigint, $hashvalue_7:bigint]
│ Estimates: {rows: 1 (18B), cpu: 18, memory: 0B, network: 0B}
│ $hashvalue_7 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("field_1"), 0))
└─ Values
Layout: [field_1:bigint]
Estimates: {rows: 1 (9B), cpu: 0, memory: 0B, network: 0B}
(BIGINT '1')
Fragment 3 [tpch:orders:15000]
Output layout: [orderkey_3, $hashvalue_10]
Output partitioning: tpch:orders:15000 [orderkey_3]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = tpch:orders:sf0.01, grouped = false]
Layout: [orderkey_3:bigint, $hashvalue_10:bigint]
Estimates: {rows: 15000 (263.67kB), cpu: 131.84k, memory: 0B, network: 0B}/{rows: 15000 (263.67kB), cpu: 395.51k, memory: 0B, network: 0B}
$hashvalue_10 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_3"), 0))
orderkey_3 := tpch:orderkey
tpch:orderstatus
:: [[F], [O], [P]]
@martint I am running 352 with some patches including the one Simplify set operations involving empty sets
(sha7491329a155b76890a27e8dec3cfa1df9b3c2861)
rino> explain
-> select l.orderkey from (select orderkey from tpch.tiny.lineitem union all select * from (values 1) v(orderkey) ) l
-> join
-> tpch.tiny.orders o on l.orderkey = o.orderkey;
Query Plan
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [orderkey]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[orderkey]
│ Layout: [orderkey:bigint]
│ Estimates: {rows: 60176 (528.89kB), cpu: 5.55M, memory: 263.67kB, network: 1.29MB}
└─ InnerJoin[("orderkey" = "orderkey_2")][$hashvalue, $hashvalue_9]
│ Layout: [orderkey:bigint]
│ Estimates: {rows: 60176 (528.89kB), cpu: 5.55M, memory: 263.67kB, network: 1.29MB}
│ Distribution: REPLICATED
│ maySkipOutputDuplicates = false
│ dynamicFilterAssignments = {orderkey_2 -> #df_422}
├─ LocalExchange[ROUND_ROBIN] ()
│ │ Layout: [orderkey:bigint, $hashvalue:bigint]
│ │ Estimates: {rows: 60176 (1.03MB), cpu: 3.10M, memory: 0B, network: 1.03MB}
│ ├─ Project[]
│ │ │ Layout: [field_1:bigint, $hashvalue_6:bigint]
│ │ │ Estimates: {rows: 1 (18B), cpu: 32, memory: 0B, network: 0B}
│ │ │ $hashvalue_6 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("field_1"), 0))
│ │ └─ Project[]
│ │ │ Layout: [field_1:bigint]
│ │ │ Estimates: {rows: 1 (9B), cpu: 14, memory: 0B, network: 0B}
│ │ │ field_1 := CAST("field" AS bigint)
│ │ └─ LocalExchange[ROUND_ROBIN] ()
│ │ │ Layout: [field:integer]
│ │ │ Estimates: {rows: 1 (5B), cpu: 5, memory: 0B, network: 0B}
│ │ └─ Values
│ │ Layout: [field:integer]
│ │ Estimates: {rows: 1 (5B), cpu: 0, memory: 0B, network: 0B}
│ │ (1)
│ └─ RemoteSource[1]
│ Layout: [orderkey_0:bigint, $hashvalue_7:bigint]
└─ LocalExchange[HASH][$hashvalue_9] ("orderkey_2")
│ Layout: [orderkey_2:bigint, $hashvalue_9:bigint]
│ Estimates: {rows: 15000 (263.67kB), cpu: 659.18k, memory: 0B, network: 263.67kB}
└─ RemoteSource[2]
Layout: [orderkey_2:bigint, $hashvalue_10:bigint]
Fragment 1 [tpch:orders:15000]
Output layout: [orderkey_0, $hashvalue_8]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanFilterProject[table = tpch:lineitem:sf0.01, grouped = false, filterPredicate = true, dynamicFilter = {"orderkey_0" = #df_422}]
Layout: [orderkey_0:bigint, $hashvalue_8:bigint]
Estimates: {rows: 60175 (1.03MB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 60175 (1.03MB), cpu: 1.03M, memory: 0B, network: 0B}/{rows: 60175 (1.03MB), cpu: 2.07M, memory: 0B, network: 0B}
$hashvalue_8 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("orderkey_0"), 0))
orderkey_0 := tpch:orderkey
:
I think either the Join Selection cost selection has changed since you get a plan with a shuffle or maybe it is picking shuffle because it sees that the join key is the same as the partition key. What version are you running?
I'm trying with current master.
@martint I can reproduce it with master by making join equality an expression
trino> explain
-> select l.orderkey from (select orderkey from tpch.tiny.lineitem union all select * from (values 1) v(orderkey) ) l
-> join
-> tpch.tiny.orders o on l.orderkey + 1 = o.orderkey - 2;
explain
-> select l.orderkey from (select orderkey from tpch.tiny.lineitem union all select * from (values 1) v(orderkey) ) l
-> join
-> tpch.tiny.orders o on l.orderkey + 1 = o.orderkey - 2;
Query Plan
-------------------------------------------------------------------------------------------------------------------------------------------------------
Fragment 0 [SINGLE]
Output layout: [orderkey]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Output[orderkey]
│ Layout: [orderkey:bigint]
│ Estimates: {rows: 60176 (528.89kB), cpu: 7.74M, memory: 263.67kB, network: 1.81MB}
└─ InnerJoin[("expr" = "expr_5")][$hashvalue, $hashvalue_15]
│ Layout: [orderkey:bigint]
│ Estimates: {rows: 60176 (528.89kB), cpu: 7.74M, memory: 263.67kB, network: 1.81MB}
│ Distribution: REPLICATED
├─ LocalExchange[ROUND_ROBIN] ()
│ │ Layout: [orderkey:bigint, expr:bigint, $hashvalue:bigint]
│ │ Estimates: {rows: 60176 (1.55MB), cpu: 4.65M, memory: 0B, network: 1.55MB}
│ ├─ Project[]
│ │ │ Layout: [field_1:bigint, expr_9:bigint, $hashvalue_12:bigint]
│ │ │ Estimates: {rows: 1 (27B), cpu: 27, memory: 0B, network: 0B}
│ │ │ $hashvalue_12 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_9"), 0))
│ │ └─ Values
│ │ Layout: [field_1:bigint, expr_9:bigint]
│ │ Estimates: {rows: 1 (18B), cpu: 0, memory: 0B, network: 0B}
│ │ (BIGINT '1', BIGINT '2')
│ └─ RemoteSource[1]
│ Layout: [orderkey_0:bigint, expr_7:bigint, $hashvalue_13:bigint]
└─ LocalExchange[HASH][$hashvalue_15] ("expr_5")
│ Layout: [expr_5:bigint, $hashvalue_15:bigint]
│ Estimates: {rows: 15000 (263.67kB), cpu: 791.02k, memory: 0B, network: 263.67kB}
└─ RemoteSource[2]
Layout: [expr_5:bigint, $hashvalue_16:bigint]
Fragment 1 [tpch:orders:15000]
Output layout: [orderkey_0, expr_7, $hashvalue_14]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [orderkey_0:bigint, expr_7:bigint, $hashvalue_14:bigint]
│ Estimates: {rows: 60175 (1.55MB), cpu: 3.10M, memory: 0B, network: 0B}
│ $hashvalue_14 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_7"), 0))
└─ ScanProject[table = tpch:lineitem:sf0.01, grouped = false]
Layout: [orderkey_0:bigint, expr_7:bigint]
Estimates: {rows: 60175 (1.03MB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 60175 (1.03MB), cpu: 1.55M, memory: 0B, network: 0B}
expr_7 := ("orderkey_0" + BIGINT '1')
orderkey_0 := tpch:orderkey
Fragment 2 [tpch:orders:15000]
Output layout: [expr_5, $hashvalue_17]
Output partitioning: SINGLE []
Stage Execution Strategy: UNGROUPED_EXECUTION
Project[]
│ Layout: [expr_5:bigint, $hashvalue_17:bigint]
│ Estimates: {rows: 15000 (263.67kB), cpu: 527.34k, memory: 0B, network: 0B}
│ $hashvalue_17 := combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_5"), 0))
└─ ScanProject[table = tpch:orders:sf0.01, grouped = false]
Layout: [expr_5:bigint]
Estimates: {rows: 15000 (131.84kB), cpu: 131.84k, memory: 0B, network: 0B}/{rows: 15000 (131.84kB), cpu: 263.67k, memory: 0B, network: 0B}
expr_5 := ("orderkey_3" - BIGINT '2')
orderkey_3 := tpch:orderkey
tpch:orderstatus
:: [[F], [O], [P]]
anyhow I hope you get the idea for the short term the CBO needs an overall cost to avoid this kind of Single Joins When We have a Broadcast Join not sure if this is only affecting unions or there are other cases where Join is planned as a single node.
I see what's happening. This is unrelated to empty values nodes (which should be removed, regardless), or cost-based optimizations. It's a limitation in AddExchanges related to how Unions are planned in the presence of a replicated join.
Given a plan like this one:
Join
┌────── ───────────┐
▼ │
Union │
┌──── ────┐ │
▼ ▼ ▼
TableScan Values TableScan
When planning a partitioned joined, we get the following plan:
Hash
┌───────────────────────────────────┐
│ │
│ Join │
│ ┌────── ───────────┐ │
│ ▼ │ │
│ Union │ │
│ ┌───── ─────┐ │ │
│ ▼ ▼ ▼ │
│ X(Hash) X(Hash) X(Hash) │
└───┬─────────────┬────────────┬────┘
│ │ │
│ │ │
│ │ │
┌──────────┘ │ └──────────────┐
│ │ │
│ │ │
│ │ │
│ Source │ Source │ Source
┌───────────┴──────────┐ ┌─────────┴───────────┐ ┌───────────┴──────────┐
│ ▼ │ │ ▼ │ │ ▼ │
│ TableScan │ │ Values │ │ TableScan │
│ │ │ │ │ │
└──────────────────────┘ └─────────────────────┘ └──────────────────────┘
But for a replicated join, we get:
Single
┌───────────────────────────────────┐
│ │
│ Join │
│ ┌────── ───────────┐ │
│ ▼ │ │
│ Union │ │
│ ┌───── ─────┐ │ │
│ ▼ ▼ ▼ │
│ X(Gather) Values X(Broadcast)│
└───┬──────────────────────────┬────┘
│ │
│ │
│ │
│ │
│ │
│ │
│ │
│ Source │ Source
┌───────────┴──────────┐ ┌───────────┴──────────┐
│ ▼ │ │ ▼ │
│ TableScan │ │ TableScan │
│ │ │ │
└──────────────────────┘ └──────────────────────┘
Ideally, we should be generating:
Source
┌──────────────────────────────────────────┐
│ │
│ Join │
│ ┌────────── ─────────┐ │
│ ▼ │ │
│ Union │ │
│ ┌───── ────┐ │ │
│ ▼ ▼ ▼ │
│ TableScan X(Arbitrary) X(Broadcast) │
│ │ │ │
└───────────────────┼──────────────┼───────┘
│ │
│ └───┐
│ │
│ Single │ Source
┌───────┴───────┐ ┌───────┴───────┐
│ ▼ │ │ ▼ │
│ Values │ │ TableScan │
│ │ │ │
└───────────────┘ └───────────────┘
@martint yes now we are on the same page.
X(Arbitrary)
for the case of union
we need to remove duplicate rows thus the single node has to be distributed by the same partition key?X(arbitrary) is an exchange with arbitrary redistribution. Since the other fragment is source-distributed and we’re doing a replicated join, it doesn’t matter where the rows land - they just need to be sent to some node.
I used https://asciiflow.com/ for the diagrams.
Issue:
The CBO produces a sub-optimal plan (Deadly in our case) leading to schedule a Join as a Single node join thus query fail with resource exhaustion (Join of billion of rows against a small table).
How does this happen?
AddExchanges
old optimizer rewrites a union node betweenSingle
Node with empty values and a set of partitioned splits it produces as a single local union node. This is by design not sure why TBH.How to reproduce it We can reproduce the issue using the following statement.
Now you can push the CBO to pick a better plan in absence of stats aka avoid doing a broadcast join.
Possible fixes for the issue:
Consider adding a cost per explored plan and choose the best one based on the overall plan cost. I can not find a way how the Join costing/selection does this perhaps This fix is not possible since Trino CBO does not keep track of the cost (memo) of other explored plans when searching. Adding such a fix to Trino seems to me a major work thus this option is not explored for now. I would appreciate any pointer.
Adding a minor patch to skip empty unions when the AddExchanges fires on the union node. FYI If you wonder why the
RemoveEmptyUnionBranches
does not prune such an empty single node is because of the empty values are produced later on when the filter predicate is pushed into the table scan. This will fix our current issue when the union has an empty value but will not fix the general case as pointed by the query aboveAdding an arbitrary distribution when we have a mix of un-partitioned and partitioned unions node, seems like it is avoided by design, I am missing some context please feel me in if possible.
Fire again the Iterative optimizer, seems like this can be an overkill see this thread
Testing:
Plan Join with Single node