prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.05k stars 5.38k forks source link

Add a new scheduler + exchange rules to support multiple table scans in a single stage #23476

Open aaneja opened 2 months ago

aaneja commented 2 months ago

For use cases like a JOIN after a UNION ALL, it can be beneficial to have the UNION ALL run in a single stage.

Existing behavior

For example, consider this simplified fragment of the TPCDS Q5 query -

select 1
from (
              select cs_catalog_page_sk as page_sk,
                     cs_sold_date_sk as date_sk
                     from catalog_sales
              union all
              select cr_catalog_page_sk as page_sk,
                     cr_returned_date_sk as date_sk
                     from catalog_returns
       ) salesreturns,
       date_dim,
       catalog_page
where date_sk = d_date_sk
       and d_date between cast('2001-08-04' as date)
       and date_add('day', 14, cast('2001-08-04' as date))
       and page_sk = cp_catalog_page_sk

The distributed Presto plan for this is -

 Fragment 0 [SINGLE]                                                                                                                                                                                                                                                                    >
     Output layout: [expr_25]                                                                                                                                                                                                                                                           >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                      >
     - Output[PlanNodeId 18][_col0] => [expr_25:integer]                                                                                                                                                                                                                                >
             Estimates: {source: CostBasedSourceInfo, rows: 10,450,222 (49.83MB), cpu: 91,476,243,213.87, memory: 420,196.00, network: 29,768,258,357.28}                                                                                                                               >
             _col0 := expr_25 (1:35)                                                                                                                                                                                                                                                    >
         - Project[PlanNodeId 470][projectLocality = LOCAL] => [expr_25:integer]                                                                                                                                                                                                        >
                 Estimates: {source: CostBasedSourceInfo, rows: 10,450,222 (49.83MB), cpu: 91,476,243,213.87, memory: 420,196.00, network: 29,768,258,357.28}                                                                                                                           >
                 expr_25 := INTEGER'1'                                                                                                                                                                                                                                                  >
             - InnerJoin[PlanNodeId 768][("cs_catalog_page_sk_12" = "cp_catalog_page_sk")][$hashvalue_38, $hashvalue_39] => []                                                                                                                                                          >
                     Estimates: {source: CostBasedSourceInfo, rows: 10,450,222 (49.83MB), cpu: 91,423,992,105.86, memory: 420,196.00, network: 29,768,258,357.28}                                                                                                                       >
                     Distribution: REPLICATED                                                                                                                                                                                                                                           >
                 - Project[PlanNodeId 1033][projectLocality = LOCAL] => [cs_catalog_page_sk_12:integer, $hashvalue_38:bigint]                                                                                                                                                           >
                         Estimates: {source: CostBasedSourceInfo, rows: 10,425,141 (49.71MB), cpu: 91,276,480,130.88, memory: 196.00, network: 29,767,838,357.28}                                                                                                                       >
                         $hashvalue_38 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cs_catalog_page_sk_12), BIGINT'0')) (1:52)                                                                                                                                               >
                     - InnerJoin[PlanNodeId 767][("cs_sold_date_sk_13" = "d_date_sk")][$hashvalue, $hashvalue_35] => [cs_catalog_page_sk_12:integer]                                                                                                                                    >
                             Estimates: {source: CostBasedSourceInfo, rows: 10,425,141 (49.71MB), cpu: 91,130,528,155.90, memory: 196.00, network: 29,767,838,357.28}                                                                                                                   >
                             Distribution: REPLICATED                                                                                                                                                                                                                                   >
                         - RemoteSource[1,2] => [cs_catalog_page_sk_12:integer, cs_sold_date_sk_13:integer, $hashvalue:bigint]                                                                                                                                                          >
                         - LocalExchange[PlanNodeId 961][HASH][$hashvalue_35] (d_date_sk) => [d_date_sk:integer, $hashvalue_35:bigint]                                                                                                                                                  >
                                 Estimates: {source: CostBasedSourceInfo, rows: 14 (70B), cpu: 1,461,372.01, memory: 0.00, network: 196.00}                                                                                                                                             >
                             - RemoteSource[3] => [d_date_sk:integer, $hashvalue_36:bigint]                                                                                                                                                                                             >
                 - LocalExchange[PlanNodeId 962][HASH][$hashvalue_39] (cp_catalog_page_sk) => [cp_catalog_page_sk:integer, $hashvalue_39:bigint]                                                                                                                                        >
                         Estimates: {source: CostBasedSourceInfo, rows: 30,000 (146.48kB), cpu: 1,140,000.00, memory: 0.00, network: 420,000.00}                                                                                                                                        >
                     - RemoteSource[4] => [cp_catalog_page_sk:integer, $hashvalue_40:bigint]                                                                                                                                                                                            >
                                                                                                                                                                                                                                                                                        >
 Fragment 1 [SOURCE]                                                                                                                                                                                                                                                                    >
     Output layout: [cs_catalog_page_sk, cs_sold_date_sk, $hashvalue_33]                                                                                                                                                                                                                >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                      >
     - ScanFilterProject[PlanNodeId 0,865,1031][table = TableHandle {connectorId='local_hms', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_v2, tableName=catalog_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_v2.catalog_sa>
             Estimates: {source: CostBasedSourceInfo, rows: 1,439,980,416 (25.43GB), cpu: 14,342,194,728.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 1,425,614,070 (25.23GB), cpu: 28,684,389,456.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo>
             $hashvalue_33 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cs_sold_date_sk), BIGINT'0')) (1:82)                                                                                                                                                                 >
             LAYOUT: tpcds_sf1000_parquet_v2.catalog_sales{domains={cs_sold_date_sk=[ [(<min>, <max>)] ], cs_catalog_page_sk=[ [(<min>, <max>)] ]}}                                                                                                                                     >
             cs_catalog_page_sk := cs_catalog_page_sk:int:12:REGULAR (1:114)                                                                                                                                                                                                            >
             cs_sold_date_sk := cs_sold_date_sk:int:0:REGULAR (1:114)                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                                                        >
 Fragment 2 [SOURCE]                                                                                                                                                                                                                                                                    >
     Output layout: [cr_catalog_page_sk, cr_returned_date_sk, $hashvalue_34]                                                                                                                                                                                                            >
     Output partitioning: SINGLE []                                                                                                                                                                                                                                                     >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                                                      >
     - ScanFilterProject[PlanNodeId 3,866,1032][table = TableHandle {connectorId='local_hms', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_v2, tableName=catalog_returns, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_v2.catalog_>
             Estimates: {source: CostBasedSourceInfo, rows: 143,996,756 (2.54GB), cpu: 1,428,437,552.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 141,114,254 (2.50GB), cpu: 2,856,875,104.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: >
             $hashvalue_34 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(cr_returned_date_sk), BIGINT'0')) (1:176)                                                                                                                                                            >
             LAYOUT: tpcds_sf1000_parquet_v2.catalog_returns{domains={cr_catalog_page_sk=[ [(<min>, <max>)] ], cr_returned_date_sk=[ [(<min>, <max>)] ]}}                                                                                                                               >
             cr_catalog_page_sk := cr_catalog_page_sk:int:12:REGULAR (1:212)                                                                                                                                                                                                            >
             cr_returned_date_sk := cr_returned_date_sk:int:0:REGULAR (1:212)   
...

where we see that we have large data transfers from Fragment 1 & 2 to perform the UNION, only to have the very selective JOIN execute next and discard most rows

If instead, we could execute the table scans of catalog_sales and catalog_returns in a single stage, we would save the network cost of data transfer

Proposed behavior

Trino added a MultiSourcePartitionedScheduler in https://github.com/trinodb/trino/pull/17265, that allows the scheduler to run multiple source partitioned table scans in a single stage. This along with changes to how exchanges are added, improved the performance of TPCDS Q02 and Q05 significantly

The Trino distributed plan for the above query is -

Trino version: 453                                                                                                                                                                            
 Fragment 0 [SOURCE]                                                                                                                                                                           
     Output layout: [expr]                                                                                                                                                                     
     Output partitioning: SINGLE []                                                                                                                                                            
     Output[columnNames = [_col0]]                                                                                                                                                             
     │   Layout: [expr:integer]                                                                                                                                                                
     │   Estimates: {rows: 10450287 (49.83MB), cpu: 0, memory: 0B, network: 0B}                                                                                                                
     │   _col0 := expr                                                                                                                                                                         
     └─ Project[]                                                                                                                                                                              
        │   Layout: [expr:integer]                                                                                                                                                             
        │   Estimates: {rows: 10450287 (49.83MB), cpu: 49.83M, memory: 0B, network: 0B}                                                                                                        
        │   expr := integer '1'                                                                                                                                                                
        └─ InnerJoin[criteria = (page_sk = cp_catalog_page_sk), distribution = REPLICATED]                                                                                                     
           │   Layout: []                                                                                                                                                                      
           │   Estimates: {rows: 10450287 (0B), cpu: 49.92M, memory: 146.48kB, network: 0B}                                                                                                    
           │   Distribution: REPLICATED                                                                                                                                                        
           │   dynamicFilterAssignments = {cp_catalog_page_sk -> #df_705}                                                                                                                      
           ├─ InnerJoin[criteria = (date_sk = d_date_sk), distribution = REPLICATED]                                                                                                           
           │  │   Layout: [page_sk:integer]                                                                                                                                                    
           │  │   Estimates: {rows: 10491985 (49.77MB), cpu: 14.74G, memory: 70B, network: 0B}                                                                                                 
           │  │   Distribution: REPLICATED                                                                                                                                                     
           │  │   dynamicFilterAssignments = {d_date_sk -> #df_706}                                                                                                                            
           │  ├─ LocalExchange[partitioning = ROUND_ROBIN]                                                                                                                                     
           │  │  │   Layout: [page_sk:integer, date_sk:integer]                                                                                                                                
           │  │  │   Estimates: {rows: 1583977172 (14.69GB), cpu: 14.69G, memory: 0B, network: 0B}                                                                                             
           │  │  ├─ ScanFilter[table = local_hms:tpcds_sf1000_parquet_v2:catalog_sales, dynamicFilters = {cs_catalog_page_sk = #df_705, cs_sold_date_sk = #df_706}]                            
           │  │  │      Layout: [cs_sold_date_sk:integer, cs_catalog_page_sk:integer]                                                                                                          
           │  │  │      Estimates: {rows: 1439980416 (13.36GB), cpu: 13.36G, memory: 0B, network: 0B}/{rows: 1439980416 (13.36GB), cpu: 13.36G, memory: 0B, network: 0B}                       
           │  │  │      cs_sold_date_sk := cs_sold_date_sk:int:REGULAR                                                                                                                         
           │  │  │      cs_catalog_page_sk := cs_catalog_page_sk:int:REGULAR                                                                                                                   
           │  │  └─ ScanFilter[table = local_hms:tpcds_sf1000_parquet_v2:catalog_returns, dynamicFilters = {cr_catalog_page_sk = #df_705, cr_returned_date_sk = #df_706}]                      
           │  │         Layout: [cr_returned_date_sk:integer, cr_catalog_page_sk:integer]                                                                                                      
           │  │         Estimates: {rows: 143996756 (1.33GB), cpu: 1.33G, memory: 0B, network: 0B}/{rows: 143996756 (1.33GB), cpu: 1.33G, memory: 0B, network: 0B}                             
           │  │         cr_catalog_page_sk := cr_catalog_page_sk:int:REGULAR                                                                                                                   
           │  │         cr_returned_date_sk := cr_returned_date_sk:int:REGULAR                                                                                                                 
           │  └─ LocalExchange[partitioning = SINGLE]                                                                                                                                          
           │     │   Layout: [d_date_sk:integer]                                                                                                                                               
           │     │   Estimates: {rows: 14 (70B), cpu: 0, memory: 0B, network: 0B}                                                                                                              
           │     └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                                                      
           │            Layout: [d_date_sk:integer]                                                                                                                                            
           └─ LocalExchange[partitioning = SINGLE]                                                                                                                                             
              │   Layout: [cp_catalog_page_sk:integer]                                                                                                                                         
              │   Estimates: {rows: 30000 (146.48kB), cpu: 0, memory: 0B, network: 0B}                                                                                                         
              └─ RemoteSource[sourceFragmentIds = [2]]                                                                                                                                         
                     Layout: [cp_catalog_page_sk:integer] 
...

We can experiment with this to see if this is feasible to implement in Presto as well

jaystarshot commented 2 months ago

This can also help cte materialization

jaystarshot commented 2 months ago

However I am not sure if this performs the same at scale since the same nodes will do the work of scanning. Also not sure how soft affinity etc will be handled