trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.43k stars 3k forks source link

Unexpected planner behaviour with DISTINCT aggregations and UNION ALL as source #19130

Open losipiuk opened 1 year ago

losipiuk commented 1 year ago

When we do a query which computes a bunch of grouped distinct aggregations and read data from a single table:

explain select 
       a,
       b,
       count(distinct d1),
       count(distinct d1),
       count(distinct d3)
from (
    select
        1 a,
        2 b,
        (orderkey * 7) d1,
        (orderkey * 13) d2,
        (orderkey * 93) d3
    from lineitem)
group by 1,2;

we get expected output where each MarkDistinct operator is in separate HASH distributed stage distributed on tuple (a, b and dN):

 Fragment 0 [HASH]
     Output layout: [expr$gid, expr$gid_4, count, count, count_5]
     Output partitioning: SINGLE []
     Partition count: 4
     Output[columnNames = [a, b, _col2, _col3, _col4]]
     │   Layout: [expr$gid:integer, expr$gid_4:integer, count:bigint, count:bigint, count_5:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   a := expr$gid
     │   b := expr$gid_4
     │   _col2 := count
     │   _col3 := count
     │   _col4 := count_5
     └─ Aggregate[type = FINAL, keys = [expr$gid, expr$gid_4]]
        │   Layout: [expr$gid:integer, expr$gid_4:integer, count_5:bigint, count:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   count_5 := count("count_7")
        │   count := count("count_8")
        └─ LocalExchange[partitioning = HASH, arguments = ["expr$gid", "expr$gid_4"]]
           │   Layout: [expr$gid:integer, expr$gid_4:integer, count_7:bigint, count_8:bigint]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ RemoteSource[sourceFragmentIds = [1]]
                  Layout: [expr$gid:integer, expr$gid_4:integer, count_7:bigint, count_8:bigint]

 Fragment 1 [HASH]
     Output layout: [expr$gid, expr$gid_4, count_7, count_8]
     Output partitioning: HASH [expr$gid, expr$gid_4]
     Partition count: 4
     Aggregate[type = PARTIAL, keys = [expr$gid, expr$gid_4]]
     │   Layout: [expr$gid:integer, expr$gid_4:integer, count_7:bigint, count_8:bigint]
     │   count_7 := count("expr_3") (mask = expr$distinct_6)
     │   count_8 := count("expr_1") (mask = expr$distinct)
     └─ MarkDistinct[distinct = [expr_3:bigint], marker = expr$distinct_6]
        │   Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint, expr$distinct:boolean, expr$distinct_6:boolean]
        └─ LocalExchange[partitioning = HASH, arguments = ["expr$gid", "expr$gid_4", "expr_3"]]
           │   Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint, expr$distinct:boolean]
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}
           └─ RemoteSource[sourceFragmentIds = [2]]
                  Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint, expr$distinct:boolean]

 Fragment 2 [HASH]
     Output layout: [expr_3, expr$gid, expr$gid_4, expr_1, expr$distinct]
     Output partitioning: HASH [expr$gid, expr$gid_4, expr_3]
     Partition count: 4
     MarkDistinct[distinct = [expr_1:bigint], marker = expr$distinct]
     │   Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint, expr$distinct:boolean]
     └─ LocalExchange[partitioning = HASH, arguments = ["expr$gid", "expr$gid_4", "expr_1"]]
        │   Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint]
        │   Estimates: {rows: 60175 (1.61MB), cpu: 1.61M, memory: 0B, network: 0B}
        └─ RemoteSource[sourceFragmentIds = [3]]
               Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint]

 Fragment 3 [SOURCE]
     Output layout: [expr_3, expr$gid, expr$gid_4, expr_1]
     Output partitioning: HASH [expr$gid, expr$gid_4, expr_1]
     ScanProject[table = memory:2]
         Layout: [expr_3:bigint, expr$gid:integer, expr$gid_4:integer, expr_1:bigint]
         Estimates: {rows: 60175 (1.61MB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 60175 (1.61MB), cpu: 1.61M, memory: 0B, network: 0B}
         expr_3 := ("orderkey" * BIGINT '93')
         expr$gid := 1
         expr$gid_4 := 2
         expr_1 := ("orderkey" * BIGINT '7')
         orderkey := 0

When we change lineitem to (select * from lineitem union all select * from lineitem):

explain select 
       a,
       b,
       count(distinct d1),
       count(distinct d1),
       count(distinct d3)
from (
    select
        1 a,
        2 b,
        (orderkey * 7) d1,
        (orderkey * 13) d2,
        (orderkey * 93) d3
    from (select * from lineitem union all select * from lineitem))
group by 1,2;

We get single HASH distributed stage, using just (a, b) as grouping key, perfoming all the MarkDistinct operations.

 Fragment 0 [HASH]
     Output layout: [expr, expr_32, count, count, count_37]
     Output partitioning: SINGLE []
     Partition count: 4
     Output[columnNames = [a, b, _col2, _col3, _col4]]
     │   Layout: [expr:integer, expr_32:integer, count:bigint, count:bigint, count_37:bigint]
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
     │   a := expr
     │   b := expr_32
     │   _col2 := count
     │   _col3 := count
     │   _col4 := count_37
     └─ Aggregate[type =  (STREAMING), keys = [expr, expr_32]]
        │   Layout: [expr:integer, expr_32:integer, count_37:bigint, count:bigint]
        │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}
        │   count_37 := count("expr_35") (mask = expr$distinct_46)
        │   count := count("expr_33") (mask = expr$distinct)
        └─ MarkDistinct[distinct = [expr_35:bigint], marker = expr$distinct_46]
           │   Layout: [expr_33:bigint, expr_32:integer, expr_35:bigint, expr:integer, expr$distinct:boolean, expr$distinct_46:boolean]
           └─ MarkDistinct[distinct = [expr_33:bigint], marker = expr$distinct]
              │   Layout: [expr_33:bigint, expr_32:integer, expr_35:bigint, expr:integer, expr$distinct:boolean]
              └─ LocalExchange[partitioning = HASH, arguments = ["expr", "expr_32"]]
                 │   Layout: [expr_33:bigint, expr_32:integer, expr_35:bigint, expr:integer]
                 │   Estimates: {rows: 120350 (3.21MB), cpu: 3.21M, memory: 0B, network: 0B}
                 ├─ RemoteSource[sourceFragmentIds = [1]]
                 │      Layout: [expr_38:bigint, expr_39:integer, expr_40:bigint, expr_41:integer]
                 └─ RemoteSource[sourceFragmentIds = [2]]
                        Layout: [expr_42:bigint, expr_43:integer, expr_44:bigint, expr_45:integer]

 Fragment 1 [SOURCE]
     Output layout: [expr_38, expr_39, expr_40, expr_41]
     Output partitioning: HASH [expr_41, expr_39]
     ScanProject[table = memory:2]
         Layout: [expr_38:bigint, expr_39:integer, expr_40:bigint, expr_41:integer]
         Estimates: {rows: 60175 (1.61MB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 60175 (1.61MB), cpu: 1.61M, memory: 0B, network: 0B}
         expr_38 := ("orderkey_0" * BIGINT '7')
         expr_39 := 2
         expr_40 := ("orderkey_0" * BIGINT '93')
         expr_41 := 1
         orderkey_0 := 0

 Fragment 2 [SOURCE]
     Output layout: [expr_42, expr_43, expr_44, expr_45]
     Output partitioning: HASH [expr_45, expr_43]
     ScanProject[table = memory:2]
         Layout: [expr_42:bigint, expr_43:integer, expr_44:bigint, expr_45:integer]
         Estimates: {rows: 60175 (1.61MB), cpu: 528.88k, memory: 0B, network: 0B}/{rows: 60175 (1.61MB), cpu: 1.61M, memory: 0B, network: 0B}
         expr_42 := ("orderkey_16" * BIGINT '7')
         expr_43 := 2
         expr_44 := ("orderkey_16" * BIGINT '93')
         expr_45 := 1
         orderkey_16 := 0

If cardinality of groping key is low we will get very skewed execution when most of work will be done by one node in the cluster. Also I think (not sure about that) having many MarkDistinct operators in single stage bumps memory requirements of the query.

cc: @lukasz-stec @martint @findepi @kasiafi @sopel39

findepi commented 1 year ago

Also I think (not sure about that) having many MarkDistinct operators in single stage bumps memory requirements of the query.

True for batch mode (FTE).

lukasz-stec commented 1 year ago

For UNION the plan uses a remote MarkDistinct with use_exact_partitioning=true, although it adds a redundant stage to do this (Fragment 3 [HASH] below). Also, the UNION plan (local MarkDistinct) would be better if there is enough cardinality in the group-by keys so there is enough parallelism as the data are sent over the network only once.

trino:tpch> set session use_exact_partitioning=true;
SET SESSION
trino:tpch> explain select 
         ->        a,
         ->        b,
         ->        count(distinct d1),
         ->        count(distinct d1),
         ->        count(distinct d3)
         -> from (
         ->     select
         ->         1 a,
         ->         2 b,
         ->         (orderkey * 7) d1,
         ->         (orderkey * 13) d2,
         ->         (orderkey * 93) d3
         ->     from (select * from lineitem union all select * from lineitem))
         -> group by 1,2;
                                                                                                         Query Plan                                                                                                         
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 Trino version: testversion                                                                                                                                                                                                 
 Fragment 0 [HASH]                                                                                                                                                                                                          
     Output layout: [expr, expr_35, count, count, count_40]                                                                                                                                                                 
     Output partitioning: SINGLE []                                                                                                                                                                                         
     Partition count: 11                                                                                                                                                                                                    
     Output[columnNames = [a, b, _col2, _col3, _col4]]                                                                                                                                                                      
     │   Layout: [expr:integer, expr_35:integer, count:bigint, count:bigint, count_40:bigint]                                                                                                                               
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}                                                                                                                                                          
     │   a := expr                                                                                                                                                                                                          
     │   b := expr_35                                                                                                                                                                                                       
     │   _col2 := count                                                                                                                                                                                                     
     │   _col3 := count                                                                                                                                                                                                     
     │   _col4 := count_40                                                                                                                                                                                                  
     └─ Project[]                                                                                                                                                                                                           
        │   Layout: [expr:integer, expr_35:integer, count:bigint, count_40:bigint]                                                                                                                                          
        │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                       
        └─ Aggregate[type = FINAL, keys = [expr, expr_35], hash = [$hashvalue]]                                                                                                                                             
           │   Layout: [expr:integer, expr_35:integer, $hashvalue:bigint, count:bigint, count_40:bigint]                                                                                                                    
           │   Estimates: {rows: ? (?), cpu: ?, memory: ?, network: 0B}                                                                                                                                                     
           │   count := count("count_50")                                                                                                                                                                                   
           │   count_40 := count("count_51")                                                                                                                                                                                
           └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue], arguments = ["expr", "expr_35"]]                                                                                                                
              │   Layout: [expr:integer, expr_35:integer, count_50:bigint, count_51:bigint, $hashvalue:bigint]                                                                                                              
              │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                 
              └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                                                                                      
                     Layout: [expr:integer, expr_35:integer, count_50:bigint, count_51:bigint, $hashvalue_52:bigint]                                                                                                        

 Fragment 1 [HASH]                                                                                                                                                                                                          
     Output layout: [expr, expr_35, count_50, count_51, $hashvalue_53]                                                                                                                                                      
     Output partitioning: HASH [expr, expr_35][$hashvalue_53]                                                                                                                                                               
     Partition count: 11                                                                                                                                                                                                    
     Aggregate[type = PARTIAL, keys = [expr, expr_35], hash = [$hashvalue_53]]                                                                                                                                              
     │   Layout: [expr:integer, expr_35:integer, $hashvalue_53:bigint, count_50:bigint, count_51:bigint]                                                                                                                    
     │   count_50 := count("expr_36") (mask = expr$distinct)                                                                                                                                                                
     │   count_51 := count("expr_38") (mask = expr$distinct_49)                                                                                                                                                             
     └─ MarkDistinct[distinct = [expr_38:bigint], marker = expr$distinct_49]                                                                                                                                                
        │   Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, expr$distinct:boolean, $hashvalue_53:bigint, $hashvalue_54:bigint, expr$distinct_49:boolean]                                            
        └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_54], arguments = ["expr", "expr_35", "expr_38"]]                                                                                                     
           │   Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, expr$distinct:boolean, $hashvalue_53:bigint, $hashvalue_54:bigint]                                                                   
           │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                    
           └─ RemoteSource[sourceFragmentIds = [2]]                                                                                                                                                                         
                  Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, expr$distinct:boolean, $hashvalue_55:bigint, $hashvalue_56:bigint]                                                                

 Fragment 2 [HASH]                                                                                                                                                                                                          
     Output layout: [expr_38, expr_35, expr_36, expr, expr$distinct, $hashvalue_57, $hashvalue_58]                                                                                                                          
     Output partitioning: HASH [expr, expr_35, expr_38][$hashvalue_58]                                                                                                                                                      
     Partition count: 11                                                                                                                                                                                                    
     Project[]                                                                                                                                                                                                              
     │   Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, $hashvalue_57:bigint, $hashvalue_58:bigint, expr$distinct:boolean]                                                                         
     │   Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                                                                                                                          
     └─ MarkDistinct[distinct = [expr_36:bigint], marker = expr$distinct]                                                                                                                                                   
        │   Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, $hashvalue_57:bigint, $hashvalue_58:bigint, $hashvalue_59:bigint, expr$distinct:boolean]                                                
        └─ LocalExchange[partitioning = HASH, hashColumn = [$hashvalue_59], arguments = ["expr", "expr_35", "expr_36"]]                                                                                                     
           │   Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, $hashvalue_57:bigint, $hashvalue_58:bigint, $hashvalue_59:bigint]                                                                    
           │   Estimates: {rows: 119972104 (6.15GB), cpu: 6.15G, memory: 0B, network: 0B}                                                                                                                                   
           └─ RemoteSource[sourceFragmentIds = [3]]                                                                                                                                                                         
                  Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, $hashvalue_60:bigint, $hashvalue_61:bigint, $hashvalue_62:bigint]                                                                 

 Fragment 3 [HASH]                                                                                                                                                                                                          
     Output layout: [expr_38, expr_35, expr_36, expr, $hashvalue_63, $hashvalue_64, $hashvalue_65]                                                                                                                          
     Output partitioning: HASH [expr, expr_35, expr_36][$hashvalue_65]                                                                                                                                                      
     Partition count: 11                                                                                                                                                                                                    
     LocalExchange[partitioning = ROUND_ROBIN]                                                                                                                                                                              
     │   Layout: [expr_38:bigint, expr_35:integer, expr_36:bigint, expr:integer, $hashvalue_63:bigint, $hashvalue_64:bigint, $hashvalue_65:bigint]                                                                          
     │   Estimates: {rows: 119972104 (6.15GB), cpu: 6.15G, memory: 0B, network: 0B}                                                                                                                                         
     ├─ RemoteSource[sourceFragmentIds = [4]]                                                                                                                                                                               
     │      Layout: [expr_41:bigint, expr_42:integer, expr_43:bigint, expr_44:integer, $hashvalue_66:bigint, $hashvalue_67:bigint, $hashvalue_68:bigint]                                                                    
     └─ RemoteSource[sourceFragmentIds = [5]]                                                                                                                                                                               
            Layout: [expr_45:bigint, expr_46:integer, expr_47:bigint, expr_48:integer, $hashvalue_72:bigint, $hashvalue_73:bigint, $hashvalue_74:bigint]                                                                    

 Fragment 4 [SOURCE]                                                                                                                                                                                                        
     Output layout: [expr_41, expr_42, expr_43, expr_44, $hashvalue_69, $hashvalue_70, $hashvalue_71]                                                                                                                       
     Output partitioning: HASH [expr_44, expr_42][$hashvalue_69]                                                                                                                                                            
     Project[]                                                                                                                                                                                                              
     │   Layout: [expr_41:bigint, expr_42:integer, expr_43:bigint, expr_44:integer, $hashvalue_69:bigint, $hashvalue_70:bigint, $hashvalue_71:bigint]                                                                       
     │   Estimates: {rows: 59986052 (3.07GB), cpu: 3.07G, memory: 0B, network: 0B}                                                                                                                                          
     │   $hashvalue_69 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_44"), 0)), COALESCE("$operator$hash_code"("expr_42"), 0))                                                              
     │   $hashvalue_70 := combine_hash(combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_44"), 0)), COALESCE("$operator$hash_code"("expr_42"), 0)), COALESCE("$operator$hash_code"("expr_41"), 0)) 
     │   $hashvalue_71 := combine_hash(combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_44"), 0)), COALESCE("$operator$hash_code"("expr_42"), 0)), COALESCE("$operator$hash_code"("expr_43"), 0)) 
     └─ ScanProject[table = hive:tpch:lineitem]                                                                                                                                                                             
            Layout: [expr_41:bigint, expr_42:integer, expr_43:bigint, expr_44:integer]                                                                                                                                      
            Estimates: {rows: 59986052 (1.56GB), cpu: 514.86M, memory: 0B, network: 0B}/{rows: 59986052 (1.56GB), cpu: 1.56G, memory: 0B, network: 0B}                                                                      
            expr_41 := ("orderkey_0" * BIGINT '93')                                                                                                                                                                         
            expr_42 := 2                                                                                                                                                                                                    
            expr_43 := ("orderkey_0" * BIGINT '7')                                                                                                                                                                          
            expr_44 := 1                                                                                                                                                                                                    
            orderkey_0 := orderkey:bigint:REGULAR                                                                                                                                                                           

 Fragment 5 [SOURCE]                                                                                                                                                                                                        
     Output layout: [expr_45, expr_46, expr_47, expr_48, $hashvalue_75, $hashvalue_76, $hashvalue_77]                                                                                                                       
     Output partitioning: HASH [expr_48, expr_46][$hashvalue_75]                                                                                                                                                            
     Project[]                                                                                                                                                                                                              
     │   Layout: [expr_45:bigint, expr_46:integer, expr_47:bigint, expr_48:integer, $hashvalue_75:bigint, $hashvalue_76:bigint, $hashvalue_77:bigint]                                                                       
     │   Estimates: {rows: 59986052 (3.07GB), cpu: 3.07G, memory: 0B, network: 0B}                                                                                                                                          
     │   $hashvalue_75 := combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_48"), 0)), COALESCE("$operator$hash_code"("expr_46"), 0))                                                              
     │   $hashvalue_76 := combine_hash(combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_48"), 0)), COALESCE("$operator$hash_code"("expr_46"), 0)), COALESCE("$operator$hash_code"("expr_45"), 0)) 
     │   $hashvalue_77 := combine_hash(combine_hash(combine_hash(bigint '0', COALESCE("$operator$hash_code"("expr_48"), 0)), COALESCE("$operator$hash_code"("expr_46"), 0)), COALESCE("$operator$hash_code"("expr_47"), 0)) 
     └─ ScanProject[table = hive:tpch:lineitem]                                                                                                                                                                             
            Layout: [expr_45:bigint, expr_46:integer, expr_47:bigint, expr_48:integer]                                                                                                                                      
            Estimates: {rows: 59986052 (1.56GB), cpu: 514.86M, memory: 0B, network: 0B}/{rows: 59986052 (1.56GB), cpu: 1.56G, memory: 0B, network: 0B}                                                                      
            expr_45 := ("orderkey_16" * BIGINT '93')                                                                                                                                                                        
            expr_46 := 2                                                                                                                                                                                                    
            expr_47 := ("orderkey_16" * BIGINT '7')                                                                                                                                                                         
            expr_48 := 1                                                                                                                                                                                                    
            orderkey_16 := orderkey:bigint:REGULAR