prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.06k stars 5.38k forks source link

ReorderJoins should handle projections #12155

Closed findepi closed 3 years ago

findepi commented 5 years ago

Query

presto:sf1>
SELECT custkey, totalprice FROM orders o JOIN customer c USING (custkey)
WHERE EXISTS (SELECT 1 FROM lineitem WHERE partkey = 638 AND orderkey = o.orderkey);

has a plan:

         - InnerJoin[("orderkey" = "orderkey_6")][$hashvalue_24, $hashvalue_25] => [custkey_5:bigint, totalprice:double]
                 Distribution: PARTITIONED
                 Cost: {rows: 30 (540B), cpu: 522496980.66, memory: 2700810.16, network: 2700540.11}
             - Project[] => [custkey_5:bigint, orderkey:bigint, totalprice:double, $hashvalue_24:bigint]
                     Cost: {rows: 1500000 (51.50MB), cpu: 252450000.00, memory: 2700000.00, network: 2700000.00}
                     custkey_5 := COALESCE("custkey", "custkey")
                     $hashvalue_24 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("orderkey"), 0))
                 - InnerJoin[("custkey" = "custkey_0")][$hashvalue, $hashvalue_21] => [orderkey:bigint, custkey:bigint, totalprice:double]
                         Distribution: REPLICATED
                         Cost: {rows: 1500000 (38.62MB), cpu: 198450000.00, memory: 2700000.00, network: 2700000.00}
                     - ScanProject[table = tpch:tpch:orders:sf1.0] => [orderkey:bigint, custkey:bigint, totalprice:double, $hashvalue:bigint]
                             Cost: {rows: 1500000 (38.62MB), cpu: 40500000.00, memory: 0.00, network: 0.00}/{rows: 1500000 (51.50MB), cpu: 94500000.00, memory: 0.00, network: 0.00}
                             $hashvalue := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("custkey"), 0))
                             orderkey := tpch:orderkey
                             totalprice := tpch:totalprice
                             custkey := tpch:custkey
                             tpch:orderstatus
                                 :: [[F], [O], [P]]
                     - LocalExchange[HASH][$hashvalue_21] ("custkey_0") => custkey_0:bigint, $hashvalue_21:bigint
                             Cost: {rows: 150000 (2.57MB), cpu: 6750000.00, memory: 0.00, network: 2700000.00}
                         - RemoteExchange[REPLICATE] => custkey_0:bigint, $hashvalue_22:bigint
                                 Cost: {rows: 150000 (2.57MB), cpu: 4050000.00, memory: 0.00, network: 2700000.00}
                             - ScanProject[table = tpch:tpch:customer:sf1.0] => [custkey_0:bigint, $hashvalue_23:bigint]
                                     Cost: {rows: 150000 (1.29MB), cpu: 1350000.00, memory: 0.00, network: 0.00}/{rows: 150000 (2.57MB), cpu: 4050000.00, memory: 0.00, network: 0.00}
                                     $hashvalue_23 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("custkey_0"), 0))
                                     custkey_0 := tpch:custkey
             - LocalExchange[HASH][$hashvalue_25] ("orderkey_6") => orderkey_6:bigint, $hashvalue_25:bigint
                     Cost: {rows: 30 (540B), cpu: 216045900.44, memory: 270.05, network: 540.11}
                 - RemoteExchange[REPARTITION] => orderkey_6:bigint, $hashvalue_26:bigint
                         Cost: {rows: 30 (540B), cpu: 216045360.33, memory: 270.05, network: 540.11}
                     - Project[] => [orderkey_6:bigint, $hashvalue_27:bigint]
                             Cost: {rows: 30 (540B), cpu: 216044820.22, memory: 270.05, network: 0.00}
                             $hashvalue_27 := "combine_hash"(bigint '0', COALESCE("$operator$hash_code"("orderkey_6"), 0))
                         - Aggregate(STREAMING)[orderkey_6] => [orderkey_6:bigint]
                                 Cost: {rows: 30 (270B), cpu: 216044280.11, memory: 270.05, network: 0.00}
                             - ScanFilterProject[table = tpch:tpch:lineitem:sf1.0, filterPredicate = ("partkey" = BIGINT '638')] => [orderkey_6:bigint]
                                     Cost: {rows: 6001215 (103.02MB), cpu: 108021870.00, memory: 0.00, network: 0.00}/{rows: 30 (540B), cpu: 216043740.00, memory: 0.00, network: 0.00}/{rows: 30 (270B), cpu
                                     partkey := tpch:partkey
                                     orderkey_6 := tpch:orderkey

ReorderJoins cannot reorder the InnerJoins in this plan because they are separated by a Project node. At the time ReorderJoins, the projection is simpler (as below), but still preventing ReorderJoins from rotating the plan and joining orders with filtered lineitem first.

resolved = {com.facebook.presto.sql.planner.plan.ProjectNode} 
 source = ...
 assignments = {com.facebook.presto.sql.planner.plan.Assignments} 
  assignments =
   0 = {com.google.common.collect.ImmutableMapEntry@31173} "custkey_5" -> "COALESCE("custkey", "custkey")"
   1 = {com.google.common.collect.ImmutableMapEntry@31174} "orderkey" -> ""orderkey""
   2 = {com.google.common.collect.ImmutableMapEntry@31175} "totalprice" -> ""totalprice""
findepi commented 5 years ago

Of course, the projection "custkey_5" -> "COALESCE("custkey", "custkey")" is nonsensical, the coalesce should be pruned. This would solve the problem for my example query.

rschlussel commented 5 years ago

I think a good way to do this would be to have a rewrite that pulls projections above joins when you have a join on top of a project. Later rewrites can push them down again.

The limitation on reordering across projections (and pulling projections above joins) is that none of the columns in the join predicates can use any of the (non-identity) projected columns.

sopel39 commented 5 years ago

I think a good way to do this would be to have a rewrite that pulls projections above joins when you have a join on top of a project. Later rewrites can push them down again.

Actually we could consume projections and add them to allInference for multi-join-node. Then we could add projection on top of joins tree that rewrites global outputSymbols in terms of source symbols. When constructing joins (during enumeration) we don't need to do any special steps.

The problem with the approach is that if there is projection: c=a+b and there are multiple joins that use c, then PredicatePushdown will later on create projection below each of those nodes, therefore c=a+b will be evaluated multiple times. Maybe that's not a problem for now

sopel39 commented 5 years ago

Who does add those: COALESCE("custkey", "custkey") expressions?

findepi commented 5 years ago

Who does add those: COALESCE("custkey", "custkey") expressions?

Join with USING? as in https://github.com/prestodb/presto/pull/12193 ?

stale[bot] commented 3 years ago

This issue has been automatically marked as stale because it has not had any activity in the last 2 years. If you feel that this issue is important, just comment and the stale tag will be removed; otherwise it will be closed in 7 days. This is an attempt to ensure that our open issues remain valuable and relevant so that we can keep track of what needs to be done and prioritize the right things.

aaneja commented 2 months ago

This has been addressed with https://github.com/prestodb/presto/pull/21153 The plan when handle_complex_equi_joins=true is -

presto:sf1> explain SELECT custkey, totalprice FROM orders o JOIN customer c USING (custkey) WHERE EXISTS (SELECT 1 FROM lineitem WHERE partkey = 638 AND orderkey = o.orderkey);
                                                                                                                                                                                    Query Plan                                                                                          >
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 - Output[PlanNodeId 26][custkey, totalprice] => [custkey:bigint, totalprice:double]                                                                                                                                                                                                    >
     - RemoteStreamingExchange[PlanNodeId 890][GATHER] => [custkey:bigint, totalprice:double]                                                                                                                                                                                           >
         - InnerJoin[PlanNodeId 813][("custkey_0" = "custkey")][$hashvalue, $hashvalue_21] => [custkey:bigint, totalprice:double]                                                                                                                                                       >
                 Distribution: REPLICATED                                                                                                                                                                                                                                               >
             - ScanFilterProject[PlanNodeId 1,886,1032][table = TableHandle {connectorId='tpch', connectorHandle='customer:sf1.0', layout='Optional[customer:sf1.0]'}, filterPredicate = not(IS_NULL(custkey_0)), projectLocality = LOCAL] => [custkey_0:bigint, $hashvalue:bigint]     >
                     Estimates: {source: CostBasedSourceInfo, rows: 150,000 (2.57MB), cpu: 1,350,000.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 150,000 (2.57MB), cpu: 2,700,000.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 150,000 >
                     $hashvalue := combine_hash(BIGINT'0', COALESCE($operator$hash_code(custkey_0), BIGINT'0')) (1:73)                                                                                                                                                                  >
                     custkey_0 := tpch:custkey (1:55)                                                                                                                                                                                                                                   >
             - LocalExchange[PlanNodeId 960][HASH][$hashvalue_21] (custkey) => [custkey:bigint, totalprice:double, $hashvalue_21:bigint]                                                                                                                                                >
                 - RemoteStreamingExchange[PlanNodeId 889][REPLICATE] => [custkey:bigint, totalprice:double, $hashvalue_22:bigint]                                                                                                                                                      >
                     - Project[PlanNodeId 1036][projectLocality = LOCAL] => [custkey:bigint, totalprice:double, $hashvalue_27:bigint]                                                                                                                                                   >
                             $hashvalue_27 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(custkey), BIGINT'0')) (1:41)                                                                                                                                                         >
                         - InnerJoin[PlanNodeId 812][("orderkey" = "orderkey_6")][$hashvalue_23, $hashvalue_26] => [custkey:bigint, totalprice:double]                                                                                                                                  >
                                 Distribution: PARTITIONED                                                                                                                                                                                                                              >
                             - ScanFilterProject[PlanNodeId 0,887,1033][table = TableHandle {connectorId='tpch', connectorHandle='orders:sf1.0', layout='Optional[orders:sf1.0]'}, filterPredicate = (not(IS_NULL(orderkey))) AND (not(IS_NULL(custkey))), projectLocality = LOCAL] => [>
                                     Estimates: {source: CostBasedSourceInfo, rows: 1,500,000 (25.75MB), cpu: 40,500,000.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 1,500,000 (25.75MB), cpu: 81,000,000.00, memory: 0.00, network: 0.00}/{source: CostBasedSo>
                                     $hashvalue_23 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey), BIGINT'0')) (1:42)                                                                                                                                                >
                                     custkey := tpch:custkey (1:41)                                                                                                                                                                                                                     >
                                     totalprice := tpch:totalprice (1:41)                                                                                                                                                                                                               >
                                     orderkey := tpch:orderkey (1:41)                                                                                                                                                                                                                   >
                                     tpch:orderstatus                                                                                                                                                                                                                                   >
                                         :: [["F"], ["O"], ["P"]]                                                                                                                                                                                                                       >
                             - Project[PlanNodeId 1035][projectLocality = LOCAL] => [orderkey_6:bigint, $hashvalue_26:bigint]                                                                                                                                                           >
                                     $hashvalue_26 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey_6), BIGINT'0')) (1:111)                                                                                                                                             >
                                 - Aggregate(FINAL)[orderkey_6][PlanNodeId 324] => [orderkey_6:bigint]                                                                                                                                                                                  >
                                     - LocalExchange[PlanNodeId 981][HASH][$hashvalue_24] (orderkey_6) => [orderkey_6:bigint, $hashvalue_24:bigint]                                                                                                                                     >
                                         - Project[PlanNodeId 1034][projectLocality = LOCAL] => [orderkey_6:bigint, $hashvalue_25:bigint]                                                                                                                                               >
                                                 $hashvalue_25 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(orderkey_6), BIGINT'0')) (1:111)                                                                                                                                 >
                                             - Aggregate(PARTIAL)[orderkey_6][PlanNodeId 979] => [orderkey_6:bigint]                                                                                                                                                                    >
                                                 - ScanFilterProject[PlanNodeId 6,888,566][table = TableHandle {connectorId='tpch', connectorHandle='lineitem:sf1.0', layout='Optional[lineitem:sf1.0]'}, filterPredicate = ((partkey) = (BIGINT'638')) AND (not(IS_NULL(orderkey_6))), >
                                                         Estimates: {source: CostBasedSourceInfo, rows: 6,001,215 (103.02MB), cpu: 108,021,870.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 30 (540B), cpu: 216,043,740.00, memory: 0.00, network: 0.00}/{source>
                                                         orderkey_6 := tpch:orderkey (1:110)                                                                                                                                                                                            >
                                                         partkey := tpch:partkey (1:110)