prestodb / presto

The official home of the Presto distributed SQL query engine for big data
http://prestodb.io
Apache License 2.0
16.01k stars 5.36k forks source link

TPC-DS SF-1K 13 Prestissimo query results mismatch when data is partitioned #22149

Closed ethanyzhang closed 4 months ago

ethanyzhang commented 7 months ago

On this dashboard view, filter on failed = 0 and mismatch = 1

Query output

s3://presto-workloads-output/tpc-ds/c0w1_native_oss_save_output_power_ds_sf1k_par_240311-031741/

Query output baseline

s3://presto-workloads-output/tpc-ds/c0w1_java_oss_save_output_power_ds_sf1k_240311-001500/
aaneja commented 7 months ago

Plan diffs

I noticed that the stats for the below partitioned tables did not match their un-partitioned counterparts This is one reason for a join order diff I observed in Q4. This may also be the cause of incorrect results, since some plan sub-trees could've been eliminated because of out of date stats

Next steps -

catalog_sales

presto:tpcds_sf1000_parquet_varchar_part> show stats for hive.tpcds_sf1000_parquet_varchar.catalog_sales;
       column_name        | data_size | distinct_values_count |    nulls_fraction     |   row_count   | low_value | high_value 
--------------------------+-----------+-----------------------+-----------------------+---------------+-----------+------------
 cs_sold_date_sk          | NULL      |                1828.0 |  0.005002377754559684 | NULL          | 2450815   | 2452654    
 cs_sold_time_sk          | NULL      |               89157.0 |  0.005000990930143317 | NULL          | 0         | 86399      
 cs_ship_date_sk          | NULL      |                1950.0 |  0.005000558979824348 | NULL          | 2450817   | 2452744    
 cs_bill_customer_sk      | NULL      |           1.2236563E7 |  0.005001400657937837 | NULL          | 1         | 12000000   
 cs_bill_cdemo_sk         | NULL      |             1890006.0 |  0.005001549965523976 | NULL          | 1         | 1920800    
 cs_bill_hdemo_sk         | NULL      |                7082.0 |  0.004999260351051886 | NULL          | 1         | 7200       
 cs_bill_addr_sk          | NULL      |             5947530.0 | 0.0049997478576819755 | NULL          | 1         | 6000000    
 cs_ship_customer_sk      | NULL      |           1.2236563E7 |  0.005001114542935562 | NULL          | 1         | 12000000   
 cs_ship_cdemo_sk         | NULL      |             1890006.0 |  0.005000172863462054 | NULL          | 1         | 1920800    
 cs_ship_hdemo_sk         | NULL      |                7082.0 | 0.0050011388488216775 | NULL          | 1         | 7200       
 cs_ship_addr_sk          | NULL      |             5947530.0 |   0.00499884020644903 | NULL          | 1         | 6000000    
 cs_call_center_sk        | NULL      |                  42.0 |  0.004999867303750886 | NULL          | 1         | 42         
 cs_catalog_page_sk       | NULL      |               16739.0 |  0.004999395769560244 | NULL          | 1         | 25207      
 cs_ship_mode_sk          | NULL      |                  20.0 |  0.005000820094486619 | NULL          | 1         | 20         
 cs_warehouse_sk          | NULL      |                  20.0 |  0.005000545785200456 | NULL          | 1         | 20         
 cs_item_sk               | NULL      |              297612.0 |                   0.0 | NULL          | 1         | 300000     
 cs_promo_sk              | NULL      |                1483.0 |  0.005002043027785178 | NULL          | 1         | 1500       
 cs_order_number          | NULL      |          1.65467466E8 |                   0.0 | NULL          | 1         | 160000000  
 cs_quantity              | NULL      |                 100.0 |  0.005002071500394628 | NULL          | 1         | 100        
 cs_wholesale_cost        | NULL      |               10091.0 |  0.005000830511294954 | NULL          | 1.0       | 100.0      
 cs_list_price            | NULL      |               29733.0 |    0.0050011437099989 | NULL          | 1.0       | 300.0      
 cs_sales_price           | NULL      |               29691.0 |  0.005000259670198182 | NULL          | 0.0       | 300.0      
 cs_ext_discount_amt      | NULL      |             1070707.0 |  0.005000799955323837 | NULL          | 0.0       | 29982.0    
 cs_ext_sales_price       | NULL      |             1067497.0 |  0.004999807580716431 | NULL          | 0.0       | 29943.0    
 cs_ext_wholesale_cost    | NULL      |              388752.0 |  0.004999981888642574 | NULL          | 1.0       | 10000.0    
 cs_ext_list_price        | NULL      |             1115360.0 |  0.004999749941043643 | NULL          | 1.0       | 30000.0    
 cs_ext_tax               | NULL      |              216236.0 |  0.005000354115927088 | NULL          | 0.0       | 2673.27    
 cs_coupon_amt            | NULL      |             1568468.0 |  0.004998759649797904 | NULL          | 0.0       | 28730.0    
 cs_ext_ship_cost         | NULL      |              553991.0 |   0.00500182983044125 | NULL          | 0.0       | 14994.0    
 cs_net_paid              | NULL      |             1810472.0 |  0.004998448534455624 | NULL          | 0.0       | 29943.0    
 cs_net_paid_inc_tax      | NULL      |             2432400.0 |  0.005000555507554903 | NULL          | 0.0       | 32376.27   
 cs_net_paid_inc_ship     | NULL      |             2468889.0 |                   0.0 | NULL          | 0.0       | 43956.0    
 cs_net_paid_inc_ship_tax | NULL      |             3332491.0 |                   0.0 | NULL          | 0.0       | 46593.36   
 cs_net_profit            | NULL      |             2073314.0 |                   0.0 | NULL          | -10000.0  | 19962.0    
 NULL                     | NULL      | NULL                  | NULL                  | 1.439980416E9 | NULL      | NULL       
(35 rows)
presto:tpcds_sf1000_parquet_varchar_part> show stats for hive.tpcds_sf1000_parquet_varchar_part.catalog_sales;
       column_name        | data_size | distinct_values_count |    nulls_fraction    |      row_count       | low_value | high_value 
--------------------------+-----------+-----------------------+----------------------+----------------------+-----------+------------
 cs_sold_time_sk          | NULL      |               89157.0 | 0.045610661459515325 | NULL                 | 0         | 86399      
 cs_ship_date_sk          | NULL      |                1950.0 |  0.04561997254672852 | NULL                 | 2450817   | 2452744    
 cs_bill_customer_sk      | NULL      |             3108233.0 |  0.04561836054971745 | NULL                 | 1         | 11999999   
 cs_bill_cdemo_sk         | NULL      |             1558270.0 |  0.04565392072258849 | NULL                 | 1         | 1920800    
 cs_bill_hdemo_sk         | NULL      |                7082.0 |  0.04560612621419314 | NULL                 | 1         | 7200       
 cs_bill_addr_sk          | NULL      |             2662298.0 |  0.04559979852443327 | NULL                 | 2         | 5999999    
 cs_ship_customer_sk      | NULL      |             3090168.0 |  0.04564642613946986 | NULL                 | 1         | 11999997   
 cs_ship_cdemo_sk         | NULL      |             1621418.0 |  0.04563854660184113 | NULL                 | 1         | 1920800    
 cs_ship_hdemo_sk         | NULL      |                7082.0 |  0.04563251965779228 | NULL                 | 1         | 7200       
 cs_ship_addr_sk          | NULL      |             2644798.0 |  0.04562125973837168 | NULL                 | 1         | 5999999    
 cs_call_center_sk        | NULL      |                  42.0 |  0.04561572601728891 | NULL                 | 1         | 42         
 cs_catalog_page_sk       | NULL      |               16739.0 |  0.04563116028717847 | NULL                 | 1         | 25207      
 cs_ship_mode_sk          | NULL      |                  20.0 |  0.04563911200377785 | NULL                 | 1         | 20         
 cs_warehouse_sk          | NULL      |                  20.0 |   0.0456171094475596 | NULL                 | 1         | 20         
 cs_item_sk               | NULL      |              297612.0 |                  0.0 | NULL                 | 1         | 300000     
 cs_promo_sk              | NULL      |                1483.0 | 0.045644284830007396 | NULL                 | 1         | 1500       
 cs_order_number          | NULL      |             6946746.0 |                  0.0 | NULL                 | 1         | 159999980  
 cs_quantity              | NULL      |                 100.0 |  0.04564774942059835 | NULL                 | 1         | 100        
 cs_wholesale_cost        | NULL      |               10091.0 | 0.045630643004555514 | NULL                 | 1.0       | 100.0      
 cs_list_price            | NULL      |               29642.0 |  0.04564127737289719 | NULL                 | 1.0       | 300.0      
 cs_sales_price           | NULL      |               27223.0 | 0.045607437465493186 | NULL                 | 0.0       | 299.97     
 cs_ext_discount_amt      | NULL      |              484281.0 |  0.04562892273908848 | NULL                 | 0.0       | 29767.0    
 cs_ext_sales_price       | NULL      |              475123.0 | 0.045629283633941704 | NULL                 | 0.0       | 29888.0    
 cs_ext_wholesale_cost    | NULL      |              374031.0 |  0.04560830361314093 | NULL                 | 1.0       | 10000.0    
 cs_ext_list_price        | NULL      |              689602.0 |  0.04561425837821913 | NULL                 | 1.01      | 29985.0    
 cs_ext_tax               | NULL      |              101017.0 |  0.04562485665707548 | NULL                 | 0.0       | 2598.06    
 cs_coupon_amt            | NULL      |              230056.0 | 0.045601831565439774 | NULL                 | 0.0       | 28693.0    
 cs_ext_ship_cost         | NULL      |              302470.0 |  0.04563207455413997 | NULL                 | 0.0       | 14992.0    
 cs_net_paid              | NULL      |              521799.0 |  0.04559549184585146 | NULL                 | 0.0       | 29651.0    
 cs_net_paid_inc_tax      | NULL      |              691944.0 |  0.04561026447517678 | NULL                 | 0.0       | 31726.57   
 cs_net_paid_inc_ship     | NULL      |              889736.0 |                  0.0 | NULL                 | 0.0       | 43777.0    
 cs_net_paid_inc_ship_tax | NULL      |             1221198.0 |                  0.0 | NULL                 | 0.0       | 46367.65   
 cs_net_profit            | NULL      |              831051.0 |                  0.0 | NULL                 | -10000.0  | 19701.0    
 cs_sold_date_sk          | NULL      |                1836.0 | 0.004717189765010423 | NULL                 | 2450815   | 2452654    
 NULL                     | NULL      | NULL                  | NULL                 | 1.5270375708500001E9 | NULL      | NULL       
(35 rows)

presto:tpcds_sf1000_parquet_varchar_part> select count(*) from hive.tpcds_sf1000_parquet_varchar_part.catalog_sales;
   _col0    
------------
 1439980416 
(1 row)

Query 20240311_121051_00095_ivzjq, FINISHED, 8 nodes
Splits: 1,848 total, 1,848 done (100.00%)
[Latency: client-side: 0:09, server-side: 0:09] [1.44B rows, 1.8GB] [162M rows/s, 207MB/s]

presto:tpcds_sf1000_parquet_varchar_part> select count(*) from hive.tpcds_sf1000_parquet_varchar.catalog_sales;
   _col0    
------------
 1439980416 
(1 row)

Query 20240311_121115_00096_ivzjq, FINISHED, 7 nodes
Splits: 844 total, 844 done (100.00%)
[Latency: client-side: 0:01, server-side: 0:01] [1.44B rows, 838MB] [2.13B rows/s, 1.21GB/s]

Note that actual row counts are the same, but stats seem to be out of date for the partitioned table

inventory

presto> show stats for hive.tpcds_sf1000_parquet_varchar_part.inventory;
     column_name      | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value 
----------------------+-----------+-----------------------+----------------+-----------+-----------+------------
 inv_item_sk          | NULL      |              151228.0 |            0.0 | NULL      | 1         | 300000     
 inv_warehouse_sk     | NULL      |                  20.0 |            0.0 | NULL      | 1         | 20         
 inv_quantity_on_hand | NULL      |                1010.0 |     0.05001737 | NULL      | 0         | 1000       
 inv_date_sk          | NULL      |                 261.0 |            0.0 | NULL      | 2450815   | 2452635    
 NULL                 | NULL      | NULL                  | NULL           |    7.83E8 | NULL      | NULL       
(5 rows)
presto> show stats for hive.tpcds_sf1000_parquet_varchar.inventory;
     column_name      | data_size | distinct_values_count |   nulls_fraction    | row_count | low_value | high_value 
----------------------+-----------+-----------------------+---------------------+-----------+-----------+------------
 inv_date_sk          | NULL      |                 261.0 |                 0.0 | NULL      | 2450815   | 2452635    
 inv_item_sk          | NULL      |              297612.0 |                 0.0 | NULL      | 1         | 300000     
 inv_warehouse_sk     | NULL      |                  20.0 |                 0.0 | NULL      | 1         | 20         
 inv_quantity_on_hand | NULL      |                1010.0 | 0.05000479948914432 | NULL      | 0         | 1000       
 NULL                 | NULL      | NULL                  | NULL                |    7.83E8 | NULL      | NULL       
(5 rows)

store_sales

Partitioned on ss_sold_date_sk

presto> show stats for hive.tpcds_sf1000_parquet_varchar_part.store_sales;
      column_name      | data_size | distinct_values_count |    nulls_fraction    |      row_count      | low_value | high_value 
-----------------------+-----------+-----------------------+----------------------+---------------------+-----------+------------
 ss_sold_time_sk       | NULL      |               47961.0 |  0.23922516057106902 | NULL                | 28800     | 75599      
 ss_item_sk            | NULL      |              297612.0 |                  0.0 | NULL                | 1         | 300000     
 ss_customer_sk        | NULL      |           1.2124495E7 |  0.23924228921378904 | NULL                | 1         | 12000000   
 ss_cdemo_sk           | NULL      |             1890006.0 |  0.23931612089546253 | NULL                | 1         | 1920800    
 ss_hdemo_sk           | NULL      |                7082.0 |  0.23927428620311655 | NULL                | 1         | 7200       
 ss_addr_sk            | NULL      |             5947530.0 |  0.23927779717312136 | NULL                | 1         | 6000000    
 ss_store_sk           | NULL      |                 513.0 |  0.23923616161041744 | NULL                | 1         | 1000       
 ss_promo_sk           | NULL      |                1483.0 |   0.2392977450723527 | NULL                | 1         | 1500       
 ss_ticket_number      | NULL      |           1.0071624E8 |                  0.0 | NULL                | 2         | 240000000  
 ss_quantity           | NULL      |                 100.0 |  0.23922890211223832 | NULL                | 1         | 100        
 ss_wholesale_cost     | NULL      |               10091.0 |   0.2392495696729831 | NULL                | 1.0       | 100.0      
 ss_list_price         | NULL      |               19495.0 |  0.23928140945469847 | NULL                | 1.0       | 200.0      
 ss_sales_price        | NULL      |               19348.0 |   0.2392575802741981 | NULL                | 0.0       | 200.0      
 ss_ext_discount_amt   | NULL      |              529048.0 |  0.23929047160016365 | NULL                | 0.0       | 19336.68   
 ss_ext_sales_price    | NULL      |              600147.0 |  0.23928937114687854 | NULL                | 0.0       | 19800.0    
 ss_ext_wholesale_cost | NULL      |              388752.0 |  0.23927765743302168 | NULL                | 1.0       | 10000.0    
 ss_ext_list_price     | NULL      |              731384.0 |   0.2392458316253163 | NULL                | 1.0       | 19998.0    
 ss_ext_tax            | NULL      |              115731.0 |  0.23925412520023315 | NULL                | 0.0       | 1758.24    
 ss_coupon_amt         | NULL      |              529048.0 |  0.23929047160016365 | NULL                | 0.0       | 19336.68   
 ss_net_paid           | NULL      |              786675.0 |  0.23930163683412922 | NULL                | 0.0       | 19800.0    
 ss_net_paid_inc_tax   | NULL      |             1098165.0 |   0.2393256861052866 | NULL                | 0.0       | 21294.24   
 ss_net_profit         | NULL      |             1023699.0 |   0.2392519068261505 | NULL                | -10000.0  | 9900.0     
 ss_sold_date_sk       | NULL      |                1823.0 | 0.024822416010531874 | NULL                | 2450816   | 2452642    
 NULL                  | NULL      | NULL                  | NULL                 | 5.221121221440001E9 | NULL      | NULL       
(24 rows)
presto> show stats for hive.tpcds_sf1000_parquet_varchar.store_sales;
      column_name      | data_size | distinct_values_count |    nulls_fraction    |   row_count   | low_value | high_value 
-----------------------+-----------+-----------------------+----------------------+---------------+-----------+------------
 ss_sold_date_sk       | NULL      |                1820.0 |  0.04500048022595944 | NULL          | 2450816   | 2452642    
 ss_sold_time_sk       | NULL      |               47961.0 |  0.04499776111740666 | NULL          | 28800     | 75599      
 ss_item_sk            | NULL      |              297612.0 |                  0.0 | NULL          | 1         | 300000     
 ss_customer_sk        | NULL      |           1.2236563E7 |  0.04499698125304584 | NULL          | 1         | 12000000   
 ss_cdemo_sk           | NULL      |             1890006.0 |  0.04500093578341331 | NULL          | 1         | 1920800    
 ss_hdemo_sk           | NULL      |                7082.0 | 0.044998298272422764 | NULL          | 1         | 7200       
 ss_addr_sk            | NULL      |             5947530.0 | 0.044996645487757815 | NULL          | 1         | 6000000    
 ss_store_sk           | NULL      |                 513.0 |  0.04499048261485481 | NULL          | 1         | 1000       
 ss_promo_sk           | NULL      |                1483.0 |  0.04499917917887129 | NULL          | 1         | 1500       
 ss_ticket_number      | NULL      |          2.43256717E8 |                  0.0 | NULL          | 1         | 240000000  
 ss_quantity           | NULL      |                 100.0 |  0.04499472152140729 | NULL          | 1         | 100        
 ss_wholesale_cost     | NULL      |               10091.0 |  0.04499681007177697 | NULL          | 1.0       | 100.0      
 ss_list_price         | NULL      |               19495.0 |  0.04499915278987244 | NULL          | 1.0       | 200.0      
 ss_sales_price        | NULL      |               19536.0 | 0.044999514249711985 | NULL          | 0.0       | 200.0      
 ss_ext_discount_amt   | NULL      |             1149239.0 |  0.04500334759901894 | NULL          | 0.0       | 19778.0    
 ss_ext_sales_price    | NULL      |              738120.0 |  0.04499642951463563 | NULL          | 0.0       | 19972.0    
 ss_ext_wholesale_cost | NULL      |              388752.0 |  0.04499845764808689 | NULL          | 1.0       | 10000.0    
 ss_ext_list_price     | NULL      |              752801.0 |  0.04499803472965792 | NULL          | 1.0       | 20000.0    
 ss_ext_tax            | NULL      |              150267.0 |  0.04499627500010287 | NULL          | 0.0       | 1797.48    
 ss_coupon_amt         | NULL      |             1149239.0 |  0.04500334759901894 | NULL          | 0.0       | 19778.0    
 ss_net_paid           | NULL      |             1274594.0 |   0.0449999816127706 | NULL          | 0.0       | 19972.0    
 ss_net_paid_inc_tax   | NULL      |             1697156.0 |  0.04500332989061181 | NULL          | 0.0       | 21769.48   
 ss_net_profit         | NULL      |             1499246.0 | 0.044990789213354636 | NULL          | -10000.0  | 9986.0     
 NULL                  | NULL      | NULL                  | NULL                 | 2.879987999E9 | NULL      | NULL       
(24 rows)

web_sales

Partitioned on ws_sold_date_sk

presto> show stats for hive.tpcds_sf1000_parquet_varchar.web_sales;
       column_name        | data_size | distinct_values_count |    nulls_fraction     |  row_count   | low_value | high_value 
--------------------------+-----------+-----------------------+-----------------------+--------------+-----------+------------
 ws_sold_date_sk          | NULL      |                1820.0 | 2.4989014727958974E-4 | NULL         | 2450816   | 2452642    
 ws_sold_time_sk          | NULL      |               89157.0 |   2.49972091681241E-4 | NULL         | 0         | 86399      
 ws_ship_date_sk          | NULL      |                1962.0 |   2.50015147214312E-4 | NULL         | 2450817   | 2452762    
 ws_item_sk               | NULL      |              297612.0 |                   0.0 | NULL         | 1         | 300000     
 ws_bill_customer_sk      | NULL      |           1.2188957E7 | 2.4974570291057735E-4 | NULL         | 1         | 12000000   
 ws_bill_cdemo_sk         | NULL      |             1890006.0 | 2.4970542515383355E-4 | NULL         | 1         | 1920800    
 ws_bill_hdemo_sk         | NULL      |                7082.0 | 2.5019292489925033E-4 | NULL         | 1         | 7200       
 ws_bill_addr_sk          | NULL      |             5947530.0 | 2.4951098081093224E-4 | NULL         | 1         | 6000000    
 ws_ship_customer_sk      | NULL      |           1.2132428E7 | 2.4995264724695087E-4 | NULL         | 1         | 12000000   
 ws_ship_cdemo_sk         | NULL      |             1890006.0 |   2.50402647011951E-4 | NULL         | 1         | 1920800    
 ws_ship_hdemo_sk         | NULL      |                7082.0 |  2.497554251277224E-4 | NULL         | 1         | 7200       
 ws_ship_addr_sk          | NULL      |             5947530.0 | 2.4978875844364835E-4 | NULL         | 1         | 6000000    
 ws_web_page_sk           | NULL      |                3006.0 |   2.49627647416673E-4 | NULL         | 1         | 3000       
 ws_web_site_sk           | NULL      |                  54.0 |   2.49902647273062E-4 | NULL         | 1         | 54         
 ws_ship_mode_sk          | NULL      |                  20.0 |  2.500234805432935E-4 | NULL         | 1         | 20         
 ws_warehouse_sk          | NULL      |                  20.0 | 2.5014570270168857E-4 | NULL         | 1         | 20         
 ws_promo_sk              | NULL      |                1483.0 |  2.500220916551299E-4 | NULL         | 1         | 1500       
 ws_order_number          | NULL      |           5.9619264E7 |                   0.0 | NULL         | 1         | 60000000   
 ws_quantity              | NULL      |                 100.0 |  2.496957029366885E-4 | NULL         | 1         | 100        
 ws_wholesale_cost        | NULL      |               10091.0 | 2.4976931400935823E-4 | NULL         | 1.0       | 100.0      
 ws_list_price            | NULL      |               29733.0 |  2.500734805171824E-4 | NULL         | 1.0       | 300.0      
 ws_sales_price           | NULL      |               29691.0 | 2.5000681388533054E-4 | NULL         | 0.0       | 300.0      
 ws_ext_discount_amt      | NULL      |             1059688.0 |  2.497929251081391E-4 | NULL         | 0.0       | 29982.0    
 ws_ext_sales_price       | NULL      |             1059915.0 | 2.5003181387227497E-4 | NULL         | 0.0       | 29810.0    
 ws_ext_wholesale_cost    | NULL      |              388752.0 | 2.5008320273432744E-4 | NULL         | 1.0       | 10000.0    
 ws_ext_list_price        | NULL      |             1115360.0 | 2.4981375843059284E-4 | NULL         | 1.0       | 30000.0    
 ws_ext_tax               | NULL      |              211496.0 | 2.4957625855462053E-4 | NULL         | 0.0       | 2682.9     
 ws_coupon_amt            | NULL      |             1509736.0 |  2.499068139375527E-4 | NULL         | 0.0       | 28824.0    
 ws_ext_ship_cost         | NULL      |              552520.0 |  2.501165360502534E-4 | NULL         | 0.0       | 14950.0    
 ws_net_paid              | NULL      |             1750832.0 | 2.4995820279960523E-4 | NULL         | 0.0       | 29810.0    
 ws_net_paid_inc_tax      | NULL      |             2360578.0 |  2.499609805759324E-4 | NULL         | 0.0       | 32492.9    
 ws_net_paid_inc_ship     | NULL      |             2436710.0 |                   0.0 | NULL         | 0.0       | 44263.0    
 ws_net_paid_inc_ship_tax | NULL      |             3163161.0 |                   0.0 | NULL         | 0.0       | 46004.19   
 ws_net_profit            | NULL      |             1994307.0 |                   0.0 | NULL         | -10000.0  | 19840.0    
 NULL                     | NULL      | NULL                  | NULL                  | 7.20000376E8 | NULL      | NULL       
(35 rows)
presto> show stats for hive.tpcds_sf1000_parquet_varchar_part.web_sales;
       column_name        | data_size | distinct_values_count |    nulls_fraction     |      row_count      | low_value | high_value 
--------------------------+-----------+-----------------------+-----------------------+---------------------+-----------+------------
 ws_sold_time_sk          | NULL      |               52647.0 |  0.002432408205134582 | NULL                | 0         | 86399      
 ws_ship_date_sk          | NULL      |                1959.0 | 0.0024310978895414745 | NULL                | 2450817   | 2452762    
 ws_item_sk               | NULL      |              150788.0 |                   0.0 | NULL                | 1         | 300000     
 ws_bill_customer_sk      | NULL      |               89977.0 |   0.00243346159610159 | NULL                | 4         | 12000000   
 ws_bill_cdemo_sk         | NULL      |               86256.0 | 0.0024247775437394262 | NULL                | 1         | 1920800    
 ws_bill_hdemo_sk         | NULL      |                7082.0 | 0.0024371356182548132 | NULL                | 1         | 7200       
 ws_bill_addr_sk          | NULL      |               89878.0 | 0.0024310721970788648 | NULL                | 2         | 6000000    
 ws_ship_customer_sk      | NULL      |               88279.0 | 0.0024286314131309193 | NULL                | 1         | 12000000   
 ws_ship_cdemo_sk         | NULL      |               86578.0 | 0.0024394993248149288 | NULL                | 1         | 1920800    
 ws_ship_hdemo_sk         | NULL      |                7082.0 | 0.0024355683780356062 | NULL                | 1         | 7200       
 ws_ship_addr_sk          | NULL      |               89481.0 |  0.002441606106748945 | NULL                | 4         | 5999994    
 ws_web_page_sk           | NULL      |                3006.0 | 0.0024332817488633206 | NULL                | 1         | 3000       
 ws_web_site_sk           | NULL      |                  54.0 | 0.0024285286432804793 | NULL                | 1         | 54         
 ws_ship_mode_sk          | NULL      |                  20.0 |  0.002435645455423436 | NULL                | 1         | 20         
 ws_warehouse_sk          | NULL      |                  20.0 |  0.002437264080567863 | NULL                | 1         | 20         
 ws_promo_sk              | NULL      |                1483.0 | 0.0024348746815451377 | NULL                | 1         | 1500       
 ws_order_number          | NULL      |              179921.0 |                   0.0 | NULL                | 17        | 59999994   
 ws_quantity              | NULL      |                 100.0 | 0.0024364933066895644 | NULL                | 1         | 100        
 ws_wholesale_cost        | NULL      |               10091.0 | 0.0024274752523134715 | NULL                | 1.0       | 100.0      
 ws_list_price            | NULL      |               29468.0 |  0.002436801616240884 | NULL                | 1.0       | 300.0      
 ws_sales_price           | NULL      |               25189.0 | 0.0024397819419036382 | NULL                | 0.0       | 299.94     
 ws_ext_discount_amt      | NULL      |              301396.0 | 0.0024311492744666947 | NULL                | 0.0       | 29506.0    
 ws_ext_sales_price       | NULL      |              301559.0 | 0.0024307638875275452 | NULL                | 0.0       | 29625.0    
 ws_ext_wholesale_cost    | NULL      |              287750.0 |  0.002432973439312001 | NULL                | 1.0       | 10000.0    
 ws_ext_list_price        | NULL      |              417339.0 | 0.0024357482252738757 | NULL                | 1.03      | 30000.0    
 ws_ext_tax               | NULL      |               74442.0 | 0.0024322026654337026 | NULL                | 0.0       | 2648.76    
 ws_coupon_amt            | NULL      |              105656.0 |  0.002434720526769478 | NULL                | 0.0       | 27717.34   
 ws_ext_ship_cost         | NULL      |              209696.0 |  0.002433050516699831 | NULL                | 0.0       | 14704.47   
 ws_net_paid              | NULL      |              316442.0 | 0.0024370328484043737 | NULL                | 0.0       | 29625.0    
 ws_net_paid_inc_tax      | NULL      |              396610.0 |  0.002429736189023147 | NULL                | 0.0       | 32079.48   
 ws_net_paid_inc_ship     | NULL      |              404497.0 |                   0.0 | NULL                | 0.0       | 43567.0    
 ws_net_paid_inc_ship_tax | NULL      |              500594.0 |                   0.0 | NULL                | 0.0       | 44597.91   
 ws_net_profit            | NULL      |              361861.0 |                   0.0 | NULL                | -10000.0  | 19750.0    
 ws_sold_date_sk          | NULL      |                1823.0 |  2.534327612524752E-4 | NULL                | 2450816   | 2452642    
 NULL                     | NULL      | NULL                  | NULL                  | 7.099358390400001E8 | NULL      | NULL       
(35 rows)
aaneja commented 7 months ago

Q4 plan diff

Join order is different, but sub-plans are the same

Unpartitioned plan

Output <- TopN <- LocalExchange <- RemoteSource <- TopNPartial <- InnerJoin[PARTITIONED] :
    ├── InnerJoin[PARTITIONED] :
    │    ├── InnerJoin[PARTITIONED] :
    │    │    ├── RemoteSource <- Project <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
    │    │    │    ├── RemoteSource <- InnerJoin[REPLICATED] :
    │    │    │    │    ├── Scan(catalog_sales)
    │    │    │    │    └── LocalExchange <- RemoteSource <- Scan(date_dim)
    │    │    │    └── LocalExchange <- RemoteSource <- Scan(customer)
    │    │    └── LocalExchange <- RemoteSource <- Project <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
    │    │         ├── RemoteSource <- InnerJoin[REPLICATED] :
    │    │         │    ├── Scan(web_sales)
    │    │         │    └── LocalExchange <- RemoteSource <- Scan(date_dim)
    │    │         └── LocalExchange <- RemoteSource <- Scan(customer)
    │    └── LocalExchange <- InnerJoin[PARTITIONED] :
    │         ├── RemoteSource <- Filter <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
    │         │    ├── RemoteSource <- InnerJoin[REPLICATED] :
    │         │    │    ├── Scan(catalog_sales)
    │         │    │    └── LocalExchange <- RemoteSource <- Scan(date_dim)
    │         │    └── LocalExchange <- RemoteSource <- Scan(customer)
    │         └── LocalExchange <- RemoteSource <- Filter <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
    │              ├── RemoteSource <- InnerJoin[REPLICATED] :
    │              │    ├── Scan(web_sales)
    │              │    └── LocalExchange <- RemoteSource <- Scan(date_dim)
    │              └── LocalExchange <- RemoteSource <- Scan(customer)
    └── LocalExchange <- InnerJoin[PARTITIONED] :
         ├── RemoteSource <- Project <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
         │    ├── RemoteSource <- InnerJoin[REPLICATED] :
         │    │    ├── Scan(store_sales)
         │    │    └── LocalExchange <- RemoteSource <- Scan(date_dim)
         │    └── LocalExchange <- RemoteSource <- Scan(customer)
         └── LocalExchange <- RemoteSource <- Filter <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
              ├── RemoteSource <- InnerJoin[REPLICATED] :
              │    ├── Scan(store_sales)
              │    └── LocalExchange <- RemoteSource <- Scan(date_dim)
              └── LocalExchange <- RemoteSource <- Scan(customer)

Partitioned plan

Output <- TopN <- LocalExchange <- RemoteSource <- TopNPartial <- InnerJoin[PARTITIONED] :
    ├── RemoteSource <- Project <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
    │    ├── RemoteSource <- Scan(customer)
    │    └── LocalExchange <- RemoteSource <- InnerJoin[REPLICATED] :
    │         ├── Scan(store_sales)
    │         └── LocalExchange <- RemoteSource <- Filter <- Scan(date_dim)
    └── LocalExchange <- RemoteSource <- InnerJoin[PARTITIONED] :
         ├── RemoteSource <- Filter <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
         │    ├── RemoteSource <- Scan(customer)
         │    └── LocalExchange <- RemoteSource <- InnerJoin[REPLICATED] :
         │         ├── Scan(store_sales)
         │         └── LocalExchange <- RemoteSource <- Filter <- Scan(date_dim)
         └── LocalExchange <- InnerJoin[PARTITIONED] :
              ├── InnerJoin[PARTITIONED] :
              │    ├── RemoteSource <- Project <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
              │    │    ├── RemoteSource <- Scan(customer)
              │    │    └── LocalExchange <- RemoteSource <- InnerJoin[REPLICATED] :
              │    │         ├── Scan(catalog_sales)
              │    │         └── LocalExchange <- RemoteSource <- Scan(date_dim)
              │    └── LocalExchange <- RemoteSource <- Project <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
              │         ├── RemoteSource <- Scan(customer)
              │         └── LocalExchange <- RemoteSource <- InnerJoin[REPLICATED] :
              │              ├── Scan(web_sales)
              │              └── LocalExchange <- RemoteSource <- Scan(date_dim)
              └── LocalExchange <- InnerJoin[PARTITIONED] :
                   ├── RemoteSource <- Filter <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
                   │    ├── RemoteSource <- Scan(customer)
                   │    └── LocalExchange <- RemoteSource <- InnerJoin[REPLICATED] :
                   │         ├── Scan(catalog_sales)
                   │         └── LocalExchange <- RemoteSource <- Scan(date_dim)
                   └── LocalExchange <- RemoteSource <- Filter <- Agg() <- LocalExchange <- RemoteSource <- Agg() <- Project <- InnerJoin[PARTITIONED] :
                        ├── RemoteSource <- Scan(customer)
                        └── LocalExchange <- RemoteSource <- InnerJoin[REPLICATED] :
                             ├── Scan(web_sales)
                             └── LocalExchange <- RemoteSource <- Scan(date_dim)
aaneja commented 7 months ago

Possible Bug in filter pushdown

During debugging I could narrow down the issue to incorrectly performing TableScan+filter pushdown (or Join ?) with the partitioned schema

Consider the simple 2 table join between store_sales and date_dim - select 1 from store_sales , date_dim where ss_sold_date_sk = d_date_sk We use the below flags to turn on filter pushdown into the scan -

- hive.parquet_pushdown_filter_enabled=true
- hive.pushdown_filter_enabled=true

With this we expect all rows from store_sales with a NOT NULL ss_sold_date_sk to appear in the output

Unpartitioned

This is what we see with the unpartitioned table - final output rows are 2,750,387,156, and we see this is because NULLs were filtered out - Input: 2,750,387,156 rows (26.34GB), Filtered: 0.00%, Raw input: 2,879,987,999 rows (2.13GB), Filtered: 4.50%

presto:tpcds_sf1000_parquet_varchar> explain analyze select 1 from store_sales , date_dim where ss_sold_date_sk = d_date_sk;
                                                                                                                                                          Query Plan                                                                               >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 1 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 1.77m, Scheduled: 1.94m, Input: 2,750,752,401 rows (26.34GB); per task: avg.: 550,150,480.20 std.dev.: 160,840,318.60, Output: 2,750,387,156 rows (10.26GB), 5 tasks                                                                     >
     Output layout: [expr]                                                                                                                                                                                                                         >
     Output partitioning: SINGLE []                                                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - Project[PlanNodeId 273][projectLocality = LOCAL] => [expr:integer]                                                                                                                                                                          >
             Estimates: {source: CostBasedSourceInfo, rows: 2,750,726,059 (12.81GB), cpu: 63,530,977,286.64, memory: 5,259,528.00, network: 5,259,528.00}                                                                                          >
             CPU: 58.50s (55.16%), Scheduled: 1.13m (58.21%), Output: 2,750,387,156 rows (10.26GB)                                                                                                                                                 >
             Input avg.: 34,379,839.45 rows, Input std.dev.: 404.57%                                                                                                                                                                               >
             expr := INTEGER'1'                                                                                                                                                                                                                    >
         - InnerJoin[PlanNodeId 384][("ss_sold_date_sk" = "d_date_sk")] => []                                                                                                                                                                      >
                 Estimates: {source: CostBasedSourceInfo, rows: 2,750,726,059 (12.81GB), cpu: 49,777,346,991.00, memory: 5,259,528.00, network: 5,259,528.00}                                                                                      >
                 CPU: 1.63s (1.54%), Scheduled: 758.00ms (0.65%), Output: 2,750,387,156 rows (0B)                                                                                                                                                  >
                 Distribution: REPLICATED                                                                                                                                                                                                          >
             - TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_par>
                     Estimates: {source: CostBasedSourceInfo, rows: 2,879,987,999 (13.41GB), cpu: 24,883,085,247.00, memory: 0.00, network: 0.00}                                                                                                  >
                     CPU: 45.87s (43.25%), Scheduled: 47.87s (41.09%), Output: 2,750,387,156 rows (26.34GB)                                                                                                                                        >
                     Input avg.: 34,379,839.45 rows, Input std.dev.: 404.57%                                                                                                                                                                       >
                     LAYOUT: tpcds_sf1000_parquet_varchar.store_sales{}                                                                                                                                                                            >
                     ss_sold_date_sk := ss_sold_date_sk:bigint:0:REGULAR (1:31)                                                                                                                                                                    >
                     Input: 2,750,387,156 rows (26.34GB), Filtered: 0.00%                                                                                                                                                                          >
                     Raw input: 2,879,987,999 rows (2.13GB), Filtered: 4.50%                                                                                                                                                                       >
             - LocalExchange[PlanNodeId 454][HASH] (d_date_sk) => [d_date_sk:bigint]                                                                                                                                                               >
                     Estimates: {source: CostBasedSourceInfo, rows: 73,049 (356.68kB), cpu: 1,314,882.00, memory: 0.00, network: 5,259,528.00}                                                                                                     >
                     CPU: 39.00ms (0.04%), Scheduled: 42.00ms (0.04%), Output: 365,245 rows (3.36MB)                                                                                                                                               >
                     Input avg.: 4,565.56 rows, Input std.dev.: 387.30%                                                                                                                                                                            >
                 - RemoteSource[2] => [d_date_sk:bigint]                                                                                                                                                                                           >
                         CPU: 5.00ms (0.00%), Scheduled: 5.00ms (0.00%), Output: 365,245 rows (3.36MB)                                                                                                                                             >
                         Input avg.: 4,565.56 rows, Input std.dev.: 387.30%                                                                                                                                                                        >
                                                                                                                                                                                                                                                   >
 Fragment 2 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 7.87ms, Scheduled: 13.67ms, Input: 73,049 rows (686.88kB); per task: avg.: 14,609.80 std.dev.: 8,093.87, Output: 73,049 rows (571.25kB), 5 tasks                                                                                         >
     Output layout: [d_date_sk]                                                                                                                                                                                                                    >
     Output partitioning: BROADCAST []                                                                                                                                                                                                             >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - TableScan[PlanNodeId 1][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=date_dim, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_varcha>
             Estimates: {source: CostBasedSourceInfo, rows: 73,049 (642.03kB), cpu: 657,441.00, memory: 0.00, network: 0.00}                                                                                                                       >
             CPU: 6.00ms (0.01%), Scheduled: 12.00ms (0.01%), Output: 73,049 rows (571.25kB)                                                                                                                                                       >
             Input avg.: 913.11 rows, Input std.dev.: 446.21%                                                                                                                                                                                      >
             LAYOUT: tpcds_sf1000_parquet_varchar.date_dim{}                                                                                                                                                                                       >
             d_date_sk := d_date_sk:bigint:0:REGULAR (1:45)                                                                                                                                                                                        >
             Input: 73,049 rows (686.88kB), Filtered: 0.00%                                                                                                                                                                                        >
                                                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                   >
(1 row)

Query 20240311_162851_00029_enyyd, FINISHED, 9 nodes
Splits: 1,098 total, 1,098 done (100.00%)
[Latency: client-side: 0:12, server-side: 0:10] [2.88B rows, 2.13GB] [278M rows/s, 210MB/s]

Partitioned

For the partitioned table the NULL rows are not filtered out by the TableScan We see that the layout pushed into the table scan for the partition key is [NULL, ["2450816", "2452642"]], which effectively does not filter any rows and we get all the 2,879,987,999 rows as the output of this table scan

presto:tpcds_sf1000_parquet_varchar_part> explain analyze select 1 from store_sales , date_dim where ss_sold_date_sk = d_date_sk;
                                                                                                                                                                        Query Plan                                                                 >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Fragment 1 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 1.08m, Scheduled: 1.16m, Input: 2,880,002,615 rows (2.33MB); per task: avg.: 360,000,326.88 std.dev.: 24,542,981.73, Output: 2,879,987,999 rows (10.74GB), 8 tasks                                                                       >
     Output layout: [expr]                                                                                                                                                                                                                         >
     Output partitioning: SINGLE []                                                                                                                                                                                                                >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - Project[PlanNodeId 277][projectLocality = LOCAL] => [expr:integer]                                                                                                                                                                          >
             Estimates: {source: CostBasedSourceInfo, rows: 5,092,147,756 (23.71GB), cpu: 117,369,250,354.33, memory: 131,473.80, network: 131,473.80}                                                                                             >
             CPU: 1.00m (93.24%), Scheduled: 1.10m (95.27%), Output: 2,879,987,999 rows (10.74GB)                                                                                                                                                  >
             Input avg.: 22,499,906.24 rows, Input std.dev.: 388.26%                                                                                                                                                                               >
             expr := INTEGER'1'                                                                                                                                                                                                                    >
         - InnerJoin[PlanNodeId 443][("ss_sold_date_sk" = "cast")] => []                                                                                                                                                                           >
                 Estimates: {source: CostBasedSourceInfo, rows: 5,092,147,756 (23.71GB), cpu: 91,908,511,572.52, memory: 131,473.80, network: 131,473.80}                                                                                          >
                 CPU: 1.47s (2.27%), Scheduled: 597.00ms (0.86%), Output: 2,879,987,999 rows (0B)                                                                                                                                                  >
                 Distribution: REPLICATED                                                                                                                                                                                                          >
             - TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar_part, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf100>
                     Estimates: {source: CostBasedSourceInfo, rows: 5,221,121,221 (24.31GB), cpu: 45,953,284,248.96, memory: 0.00, network: 0.00}                                                                                                  >
                     CPU: 2.89s (4.47%), Scheduled: 2.66s (3.83%), Output: 2,879,987,999 rows (2.21MB)                                                                                                                                             >
                     Input avg.: 22,499,906.24 rows, Input std.dev.: 388.26%                                                                                                                                                                       >
                     LAYOUT: tpcds_sf1000_parquet_varchar_part.store_sales{}                                                                                                                                                                       >
                     ss_sold_date_sk := ss_sold_date_sk:bigint:-13:PARTITION_KEY (1:31)                                                                                                                                                            >
                         :: [NULL, ["2450816", "2452642"]]                                                                                                                                                                                         >
                     Input: 2,879,987,999 rows (2.21MB), Filtered: 0.00%                                                                                                                                                                           >
             - LocalExchange[PlanNodeId 567][HASH] (cast) => [cast:bigint]                                                                                                                                                                         >
                     Estimates: {source: CostBasedSourceInfo, rows: 1,826 (8.92kB), cpu: 1,696,561.22, memory: 0.00, network: 131,473.80}                                                                                                          >
                     CPU: 8.00ms (0.01%), Scheduled: 21.00ms (0.03%), Output: 14,616 rows (127.19kB)                                                                                                                                               >
                     Input avg.: 114.19 rows, Input std.dev.: 387.30%                                                                                                                                                                              >
                 - RemoteSource[2] => [cast:bigint]                                                                                                                                                                                                >
                         CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 14,616 rows (127.25kB)                                                                                                                                            >
                         Input avg.: 114.19 rows, Input std.dev.: 387.30%                                                                                                                                                                          >
                                                                                                                                                                                                                                                   >
 Fragment 2 [SOURCE]                                                                                                                                                                                                                               >
     CPU: 2.61ms, Scheduled: 3.05ms, Input: 73,049 rows (351.25kB); per task: avg.: 73,049.00 std.dev.: 0.00, Output: 1,827 rows (14.32kB), 1 tasks                                                                                                >
     Output layout: [cast]                                                                                                                                                                                                                         >
     Output partitioning: BROADCAST []                                                                                                                                                                                                             >
     Stage Execution Strategy: UNGROUPED_EXECUTION                                                                                                                                                                                                 >
     - Filter[PlanNodeId 378][filterPredicate = (cast BETWEEN (BIGINT'2450816') AND (BIGINT'2452642')) OR (IS_NULL(cast))] => [cast:bigint]                                                                                                        >
             Estimates: {source: CostBasedSourceInfo, rows: 1,826 (16.05kB), cpu: 1,680,127.00, memory: 0.00, network: 0.00}                                                                                                                       >
             CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1,827 rows (14.32kB)                                                                                                                                                          >
             Input avg.: 4,565.56 rows, Input std.dev.: 387.30%                                                                                                                                                                                    >
         - ScanProject[PlanNodeId 1,214][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar_part, tableName=date_dim, analyzePartitionValues=Optional.empty}', layout='Optional[tpcd>
                 Estimates: {source: CostBasedSourceInfo, rows: 73,049 (642.03kB), cpu: 365,245.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 73,049 (642.03kB), cpu: 1,022,686.00, memory: 0.00, network: 0.00}            >
                 CPU: 1.00ms (0.00%), Scheduled: 0.00ns (0.00%), Output: 73,049 rows (695.25kB)                                                                                                                                                    >
                 Input avg.: 4,565.56 rows, Input std.dev.: 387.30%                                                                                                                                                                                >
                 cast := CAST(d_date_sk AS bigint) (1:46)                                                                                                                                                                                          >
                 LAYOUT: tpcds_sf1000_parquet_varchar_part.date_dim{}                                                                                                                                                                              >
                 d_date_sk := d_date_sk:int:0:REGULAR (1:45)                                                                                                                                                                                       >
                 Input: 73,049 rows (351.25kB), Filtered: 0.00%                                                                                                                                                                                    >
                                                                                                                                                                                                                                                   >
                                                                                                                                                                                                                                                   >
(1 row)

Query 20240311_162840_00028_enyyd, FINISHED, 9 nodes
Splits: 1,889 total, 1,889 done (100.00%)
[Latency: client-side: 0:12, server-side: 0:11] [2.88B rows, 1.82GB] [261M rows/s, 169MB/s]

@yingsu00 Can you take a look into this ?

ethanyzhang commented 7 months ago

@tdcmeehan btw to clarify, this issue happens on Prestissimo with Hive tables as well

tdcmeehan commented 7 months ago

@yzhang1991 oh, I just saw the plans where I see we're using Hive. To be clear, this issue is confirmed to exist on Iceberg as well?

ethanyzhang commented 7 months ago

@tdcmeehan Yes, we observe it on Iceberg too.

yingsu00 commented 7 months ago

Thanks @aaneja for the detailed analysis. I will look at the partition key filter pushdown issue. But I wonder why the NDVs for partitioned tables are so different than the unpartitioned ones. Is it possible there are bugs in the stats collection? cc @ZacBlanco

ethanyzhang commented 7 months ago

@yingsu00 I think NDV on partitioned table may not work correctly: https://github.com/prestodb/presto/pull/22162#discussion_r1521025445

yingsu00 commented 7 months ago

There are a couple of things noticed from the plans

  1. Both partitioned and unpartitioned plans do not add explicit filters to the TableScan. So it seems the partitioned case is correct to read all rows from store_sales, since the domainPredicate in the HiveTableLayoutHandle does allow nulls: image I tried this on Presto Java with and without filter pushdown enabled, and it's the same (they read all rows including the nulls)
  2. But it doesn't explain why the InnerJoin later didn't filter out the null rows. This may also be a bug in the Join operator.
  3. For the unpartitioned case, the TableScan only read non-null ss_sold_date_sk rows. I guess this is because Velox pushed an implicit dynamic filter, but I can't verify now. My Prestissimo build is not working and I'm rebuilding it now.

Overall I think it's possible to be related to join and dynamic filters. I will keep looking.

aaneja commented 7 months ago

@yingsu00 You're right w.r.t to pushing down NOT NULL filters; the plan does not generate these filters unless joins_not_null_inference_strategy is set to a value other than NONE (which is the default).
I agree, in the un-partitioned case that it looks like the NOT NULL was being pushed via a dynamic filter. @aditi-pandit is this what we would expect from Prestissimo/Velox's dynamic filter implementation ?

I will look into the stats issue for paritioned tables. If you notice, the row-counts reported are also incorrect for some of the tables. I will make a new issue with what I find Update : I've created https://github.com/prestodb/presto/issues/22202

karteekmurthys commented 7 months ago

Just to back @yingsu00's findings, the only diff between Presto and Prestissimo is rows with NULLS: Query:

select ss_sold_date_sk, count(*) count from store_sales, date_dim where ss_sold_date_sk=d_date_sk group by ss_sold_date_sk;
Screenshot 2024-03-13 at 3 38 40 PM

Presto Plan:

- ScanFilterProject[PlanNodeId 0,449,714][table = TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar_part, tableName=store_sales, analyzeP>
                         Estimates: {source: CostBasedSourceInfo, rows: 5,221,121,221 (130.32GB), cpu: 45,953,284,248.96, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 5,221,121,221 (130.32GB>
                         CPU: 22.22s (4.98%), Scheduled: 32.97s (4.94%), Output: 2,879,987,999 rows (57.73MB)                                                                                                     >
                         Input avg.: 1,545,887.28 rows, Input std.dev.: 56.86%                                                                                                                                    >
                         $hashvalue_13 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(ss_sold_date_sk), BIGINT'0')) (1:62)                                                                               >
                         LAYOUT: tpcds_sf1000_parquet_varchar_part.store_sales{domains={ss_sold_date_sk=[ NULL, [["2450816", "2452642"]] ]}}                                                                      >
                         ss_sold_date_sk := ss_sold_date_sk:bigint:-13:PARTITION_KEY (1:61)                                                                                                                       >
                             :: [NULL, ["2450816", "2452642"]]                                                                                                                                                    >
                         Input: 2,879,987,999 rows (0B), Filtered: 0.00%        

Prestisismo Plan:

TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar_part, tableName=store_sales, analyzePartitionValues=Optional.empt>
                     Estimates: {source: CostBasedSourceInfo, rows: 5,221,121,221 (86.56GB), cpu: 45,953,284,248.96, memory: 0.00, network: 0.00}                                                                 >
                     CPU: 2.40s (13.68%), Scheduled: 2.30s (13.92%), Output: 2,879,987,999 rows (2.21MB)                                                                                                          >
                     Input avg.: 11,249,953.12 rows, Input std.dev.: 389.66%                                                                                                                                      >
                     LAYOUT: tpcds_sf1000_parquet_varchar_part.store_sales{}                                                                                                                                      >
                     ss_sold_date_sk := ss_sold_date_sk:bigint:-13:PARTITION_KEY (1:61)                                                                                                                           >
                         :: [NULL, ["2450816", "2452642"]]                                                                                                                                                        >
                     Input: 2,879,987,999 rows (2.21MB), Filtered: 0.00%       
yingsu00 commented 7 months ago

Ethan and I debugged on the 1TB dataset yesterday. So far the findings are

  1. There are dynamic filters on ss_sold_date_sk pushed down to scan for both partitioned and unpartitioned cases and it does NOT allow nulls. The TableScan failed to eliminate the null values for the partitioned schema.
  2. The splits from not-null partitions looked normal and the rows don't contain null ss_sold_date_sk values.
  3. We still need to check the null partitions to find out why they were not eliminated correctly.

We will produce a debug build with necessary debugging logs for further investigation.

karteekmurthys commented 7 months ago

@Ying I was lookinginto this as well yesterday. I was trying to verify why it wasn't reproducible on local testing. Like you stated I found that the HashJoin is skipped coz of dynamic filters: select ss_sold_date_sk,count(*) count from ss_partitioned group by ss_sold_date_sk order by count desc

From Query json:

"HashProbe.394.replacedWithDynamicFilterRows" : {
          "name" : "HashProbe.394.replacedWithDynamicFilterRows",
          "unit" : "NONE",
          "sum" : 115192,
          "count" : 1783,
          "max" : 265,
          "min" : 6
        },

I commented this section of code to use normal HashJoin alogrithm: https://github.com/facebookincubator/velox/blob/main/velox/exec/HashProbe.cpp#L536 . But still it is not reproducible locally.

aditi-pandit commented 7 months ago

@yingsu00, @karteekmurthys, @yzhang1991 : Dynamic filters have one to allow NULLs only for right semi-join. So this finding adds up. https://github.com/facebookincubator/velox/blob/main/velox/exec/HashProbe.cpp#L330

aditi-pandit commented 7 months ago

@yingsu00 You're right w.r.t to pushing down NOT NULL filters; the plan does not generate these filters unless joins_not_null_inference_strategy is set to a value other than NONE (which is the default). I agree, in the un-partitioned case that it looks like the NOT NULL was being pushed via a dynamic filter. @aditi-pandit is this what we would expect from Prestissimo/Velox's dynamic filter implementation ?

I will look into the stats issue for paritioned tables. If you notice, the row-counts reported are also incorrect for some of the tables. I will make a new issue with what I find Update : I've created #22202

@aaneja, @yzhang1991, @karteekmurthys, @yingsu00 : Dynamic filters have one to allow NULLs only for right semi-join. So this finding adds up. https://github.com/facebookincubator/velox/blob/main/velox/exec/HashProbe.cpp#L330.

Off-course that is the intent. We will have to check if this Filter evaluation is problematic. Are the NULL values isolated to a single or few splits in this case ?

@yzhang1991 : If you provide us few split files with NULL values we can try in a Velox repro case as well.

yingsu00 commented 7 months ago

Here're some findings so far

  1. The dynamic filters were pushed to scan successfully. So join expects the incoming rows doesn't have nulls.
  2. On the 1K dataset, the splits on null store_sales partitions failed to skip, but instead, returned all rows in those splits without filtering. We confirmed this by checking the row counts of all null partitions.
  3. The SplitReader::prepareSplit is supposed to filter out all null partitions when calling testFilters() and return early. But somehow it fails to run the filters and SplitReader::next is still called, maybe on a stale baseRowReader_.

We're producing a build based on https://github.com/yingsu00/velox/tree/PrintOutForNullSplits, and @karteekmurthys will work on https://github.ibm.com/lakehouse/presto/issues/1002 to expose RuntimeStats.

karteekmurthys commented 7 months ago

I connected to the remote HMS cluster and debugged locally. I noticed that the testFiles is never applied to Parquet files. For testFiles to function, the columnStatistics are required info and the ParquetReader returns null for col statistics: https://github.com/facebookincubator/velox/blob/main/velox/dwio/parquet/reader/ParquetReader.h#L95

  std::unique_ptr<dwio::common::ColumnStatistics> columnStatistics(
      uint32_t index) const override {
    return nullptr;
  }

Filters are applied on a ScanSpec only if it has column stats:

 auto columnStats = reader->columnStatistics(typeWithId->id());
        if (columnStats != nullptr &&
            !testFilter(
                child->filter(),
                columnStats.get(),
                totalRows.value(),
                typeWithId->type())) {
          VLOG(1) << "Skipping " << filePath
                  << " based on stats and filter for column "
                  << child->fieldName();
          return false;
        }

testFilter logic relies on ColumnStats to know if there are possible nulls in the column etc to skip a split. The DWRF by default returns the columnStatistics initialized in its base class ReaderBase (Parquet has its own ReaderBase class with no column stats).

Screenshot 2024-03-18 at 1 36 20 PM

cc: @majetideepak @yingsu00

But this is true for both Unpartitioned and partitioned data. Split level filtering is not applied for Parquet files. Lmk if I am missing something here.

yingsu00 commented 7 months ago

We got the logs today and it showed the testFilters() passed for the null partitions despite the filter does exist, which it shouldn't. We produced more detailed build to debug that today, but I was unable to connect to the cluster using presto cli due to network issues. Will update later once we get the new logs.

yingsu00 commented 7 months ago

Regarding @karteekmurthys's finding, it was for normal partitions. For null partitions, it does not go through that code path, but this highlighted part. Our new debug build should reveal more insights once we get them. Screenshot 2024-03-19 at 11 28 27 PM

yingsu00 commented 7 months ago

The problem was that the data files created by Spark is different than the ones from Presto. Unlike Presto, Spark writes the partitioning column as a data field in the Parquet files, no matter if the value is NULL or not. While normally in Hive, the partitioning key is not written into the data files. Velox assumes the partitioning key is not in the data file and thus missed the opportunity to filter the NULL partitions. I will work on a fix.

Another 2 minor issues:

  1. The partitioning columns don't have stats. This may also because Presto/Prestissimo assumes the partitioning columns are not written to the data files.
  2. Raw input positions/bytes were reported wrong in Velox. I'll aslo fix this one.
ethanyzhang commented 7 months ago

This behavioral difference is from Iceberg, regardless of whether you use Spark or Presto. Here is my test process:

  1. We got a TPC-DS scale factor 1GB unpartitioned dataset created using the Iceberg connector in Presto.
  2. In Spark, we create a partitioned version of store_sales with Iceberg, name it store_sales_part, then do INSERT INTO SELECT FROM the Iceberg unpartitioned version.
  3. Inspect the parquet files of store_sales_part, we found that the files contain the partition column ss_sold_date_sk.
    
    ❯ parquet-tools inspect s3://presto-workload/tpcds-sf1-parquet-iceberg/store_sales_part/data/ss_sold_date_sk=null/00001-8-2f5a52cb-e502-43f1-bd61-8488c6b70d1f-00001.parquet
    ℹ s3://presto-workload/tpcds-sf1-parquet-iceberg/store_sales_part/data/ss_sold_date_sk=null/00001-8-2f5a52cb-e502-43f1-bd61-8488c6b70d1f-00001.parquet => /var/folders/ld/32yq04mn3t32g2sz0tw8jgnm0000gn/T/tmpx7ntm86o/37c750e7-5d60-4125-8092-3f5d26c4677b.parquet

############ file meta data ############ created_by: parquet-mr version 1.13.1 (build ${buildNumber}) num_columns: 23 num_rows: 130093 num_row_groups: 1 format_version: 1.0 serialized_size: 4853

############ Columns ############ ss_sold_time_sk ss_item_sk ss_customer_sk ss_cdemo_sk ss_hdemo_sk ss_addr_sk ss_store_sk ss_promo_sk ss_ticket_number ss_quantity ss_wholesale_cost ss_list_price ss_sales_price ss_ext_discount_amt ss_ext_sales_price ss_ext_wholesale_cost ss_ext_list_price ss_ext_tax ss_coupon_amt ss_net_paid ss_net_paid_inc_tax ss_net_profit ss_sold_date_sk

4. Rename its `ss_sold_date_sk=null` data folder to `ss_sold_date_sk=__HIVE_DEFAULT_PARTITION__`, then create an external **Hive** table in Spark using the data files of `store_sales_part`, name it `store_sales_part_ice_as_hive`.
5. Run `repair table store_sales_part_ice_as_hive sync partitions;` in Spark
6. Create a new partitioned `store_sales` table in Spark Hive named `store_sales_part_hive`. Do INSERT INTO SELECT from `store_sales_part_ice_as_hive`.
7. Inspect the parquet files, we found the files do not contain the partition column `ss_sold_date_sk`, even though the INSERT INTO SELECT is reading from a table whose data files contain `ss_sold_date_sk`

❯ parquet-tools inspect s3://presto-workload/tpcds-sf1-parquet-iceberg/store_sales_part_hive/ss_sold_date_sk=2452619/part-00042-fa719914-bae5-4eac-802d-2457901c6df0.c000.snappy.parquet ℹ s3://presto-workload/tpcds-sf1-parquet-iceberg/store_sales_part_hive/ss_sold_datesk=2452619/part-00042-fa719914-bae5-4eac-802d-2457901c6df0.c000.snappy.parquet => /var/folders/ld/32yq04mn3t32g2sz0tw8jgnm0000gn/T/tmpw165bvj/80c8f37b-db0f-4da1-ab5d-26f606eb4c3b.parquet

############ file meta data ############ created_by: parquet-mr version 1.13.1 (build ${buildNumber}) num_columns: 22 num_rows: 2968 num_row_groups: 1 format_version: 1.0 serialized_size: 4666

############ Columns ############ ss_sold_time_sk ss_item_sk ss_customer_sk ss_cdemo_sk ss_hdemo_sk ss_addr_sk ss_store_sk ss_promo_sk ss_ticket_number ss_quantity ss_wholesale_cost ss_list_price ss_sales_price ss_ext_discount_amt ss_ext_sales_price ss_ext_wholesale_cost ss_ext_list_price ss_ext_tax ss_coupon_amt ss_net_paid ss_net_paid_inc_tax ss_net_profit


8. In Presto, I tried to create a partitioned `store_sales` table with the Iceberg connector and insert from the unpartitioned version. I found Presto also writes Parquet files with the partition column. This suggests that it is a standard way for Iceberg to write partitioned table data.
tdcmeehan commented 7 months ago

My guess is this is because the partitioning could be based off of a derived value. To be compatible with Hive, you'd need to remove partition columns only when they used the identity transform, which might be weird. Also, it's possible that a change in partitioning could be a metadata-only change (such as increasing the granularity of a timestamp column from minutes to millis).

aditi-pandit commented 4 months ago

Closing this. Fix https://github.com/facebookincubator/velox/pull/9275 is in OSS now.