Open ethanyzhang opened 6 months ago
Diff is between choosing a broadcast and partitoned join. Will try to repro this just with the base tables plan1 - Hive plan2 - Iceberg
We also noticed that the statistics from the same tables in the iceberg and hive versions don't match up. Specifically, the data size for many of the Hive columns is null, while Iceberg does have the statistics. This could be the reason the optimizer chose this distribution type. I have not confirmed this yet though.
(hive on left, iceberg on right)
I ran a simple test query to confirm that the REPLICATED join is indeed faster than PARTITIONED for a two table join For the same hive tables -
presto:tpcds_sf1000_parquet_varchar> explain analyze select 1 from store_sales , customer where ss_customer_sk = c_customer_sk;
Query Plan >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Fragment 1 [HASH] >
CPU: 2.83m, Scheduled: 3.75m, Input: 2,891,987,999 rows (93.26GB); per task: avg.: 1,445,993,999.50 std.dev.: 64,361,414.50, Output: 2,750,397,233 rows (10.26GB), 2 tasks >
Output layout: [expr] >
Output partitioning: SINGLE [] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- Project[PlanNodeId 273][projectLocality = LOCAL] => [expr:integer] >
Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (12.81GB), cpu: 88,833,483,754.00, memory: 108,000,000.00, network: 24,991,165,863.00} >
CPU: 1.22m (22.66%), Scheduled: 1.38m (15.09%), Output: 2,750,397,233 rows (10.26GB) >
Input avg.: 343,799,654.13 rows, Input std.dev.: 173.21% >
expr := INTEGER'1' >
- InnerJoin[PlanNodeId 384][("ss_customer_sk" = "c_customer_sk")] => [] >
Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (12.81GB), cpu: 75,081,497,589.00, memory: 108,000,000.00, network: 24,991,165,863.00} >
CPU: 1.20m (22.45%), Scheduled: 1.75m (19.15%), Output: 2,750,397,233 rows (0B) >
Distribution: PARTITIONED >
- RemoteSource[2] => [ss_customer_sk:bigint] >
CPU: 23.87s (7.42%), Scheduled: 36.14s (6.59%), Output: 2,879,987,999 rows (93.14GB) >
Input avg.: 359,998,499.88 rows, Input std.dev.: 173.44% >
- LocalExchange[PlanNodeId 460][HASH] (c_customer_sk) => [c_customer_sk:bigint] >
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (57.22MB), cpu: 324,000,000.00, memory: 0.00, network: 108,000,000.00} >
CPU: 380.00ms (0.12%), Scheduled: 960.00ms (0.18%), Output: 12,000,000 rows (115.97MB) >
Input avg.: 1,500,000.00 rows, Input std.dev.: 173.21% >
- RemoteSource[3] => [c_customer_sk:bigint] >
CPU: 167.00ms (0.05%), Scheduled: 270.00ms (0.05%), Output: 12,000,000 rows (115.97MB) >
Input avg.: 1,500,000.00 rows, Input std.dev.: 173.21% >
>
Fragment 2 [SOURCE] >
CPU: 2.53m, Scheduled: 5.37m, Input: 2,879,987,999 rows (26.73GB); per task: avg.: 1,439,993,999.50 std.dev.: 84,252,353.50, Output: 2,879,987,999 rows (20.68GB), 2 tasks >
Output layout: [ss_customer_sk] >
Output partitioning: HASH [ss_customer_sk] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_var>
Estimates: {source: CostBasedSourceInfo, rows: 2,879,987,999 (23.17GB), cpu: 24,883,165,863.00, memory: 0.00, network: 0.00} >
CPU: 2.53m (47.14%), Scheduled: 5.37m (58.82%), Output: 2,879,987,999 rows (20.68GB) >
Input avg.: 359,998,499.88 rows, Input std.dev.: 173.60% >
LAYOUT: tpcds_sf1000_parquet_varchar.store_sales{} >
ss_customer_sk := ss_customer_sk:bigint:3:REGULAR (1:31) >
Input: 2,879,987,999 rows (26.73GB), Filtered: 0.00% >
>
Fragment 3 [SOURCE] >
CPU: 487.53ms, Scheduled: 688.26ms, Input: 12,000,000 rows (112.39MB); per task: avg.: 6,000,000.00 std.dev.: 2,952,680.00, Output: 12,000,000 rows (91.61MB), 2 tasks >
Output layout: [c_customer_sk] >
Output partitioning: HASH [c_customer_sk] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- TableScan[PlanNodeId 1][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_varcha>
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (103.00MB), cpu: 108,000,000.00, memory: 0.00, network: 0.00} >
CPU: 487.00ms (0.15%), Scheduled: 689.00ms (0.13%), Output: 12,000,000 rows (91.61MB) >
Input avg.: 1,500,000.00 rows, Input std.dev.: 199.22% >
LAYOUT: tpcds_sf1000_parquet_varchar.customer{} >
c_customer_sk := c_customer_sk:bigint:0:REGULAR (1:45) >
Input: 12,000,000 rows (112.39MB), Filtered: 0.00% >
>
>
Query 20240313_050510_00048_wguzs, FINISHED, 3 nodes
Splits: 1,086 total, 1,086 done (100.00%)
[Latency: client-side: 1:48, server-side: 1:48] [2.89B rows, 8.02GB] [26.8M rows/s, 76.3MB/s]
presto:tpcds_sf1000_parquet_varchar> set session join_max_broadcast_table_size='150MB';
SET SESSION
presto:tpcds_sf1000_parquet_varchar> explain analyze select 1 from store_sales , customer where ss_customer_sk = c_customer_sk;
Query Plan >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Fragment 1 [SOURCE] >
CPU: 3.14m, Scheduled: 3.86m, Input: 2,903,987,999 rows (26.95GB); per task: avg.: 1,451,993,999.50 std.dev.: 96,904,309.50, Output: 2,750,397,233 rows (10.26GB), 2 tasks >
Output layout: [expr] >
Output partitioning: SINGLE [] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- Project[PlanNodeId 273][projectLocality = LOCAL] => [expr:integer] >
Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (12.81GB), cpu: 64,058,317,891.00, memory: 216,000,000.00, network: 216,000,000.00} >
CPU: 1.33m (42.15%), Scheduled: 1.27m (32.88%), Output: 2,750,397,233 rows (10.26GB) >
Input avg.: 343,799,654.13 rows, Input std.dev.: 173.73% >
expr := INTEGER'1' >
- InnerJoin[PlanNodeId 384][("ss_customer_sk" = "c_customer_sk")] => [] >
Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (12.81GB), cpu: 50,306,331,726.00, memory: 216,000,000.00, network: 216,000,000.00} >
CPU: 1.14m (36.33%), Scheduled: 1.22m (31.55%), Output: 2,750,397,233 rows (0B) >
Distribution: REPLICATED >
- TableScan[PlanNodeId 0][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=store_sales, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_par>
Estimates: {source: CostBasedSourceInfo, rows: 2,879,987,999 (13.41GB), cpu: 24,883,165,863.00, memory: 0.00, network: 0.00} >
CPU: 39.39s (20.86%), Scheduled: 1.33m (34.32%), Output: 2,879,987,999 rows (26.73GB) >
Input avg.: 359,998,499.88 rows, Input std.dev.: 173.73% >
LAYOUT: tpcds_sf1000_parquet_varchar.store_sales{} >
ss_customer_sk := ss_customer_sk:bigint:3:REGULAR (1:31) >
Input: 2,879,987,999 rows (26.73GB), Filtered: 0.00% >
- LocalExchange[PlanNodeId 454][HASH] (c_customer_sk) => [c_customer_sk:bigint] >
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (57.22MB), cpu: 216,000,000.00, memory: 0.00, network: 216,000,000.00} >
CPU: 707.00ms (0.37%), Scheduled: 2.03s (0.87%), Output: 24,000,000 rows (232.00MB) >
Input avg.: 3,000,000.00 rows, Input std.dev.: 173.21% >
- RemoteSource[2] => [c_customer_sk:bigint] >
CPU: 312.00ms (0.17%), Scheduled: 544.00ms (0.23%), Output: 24,000,000 rows (232.00MB) >
Input avg.: 3,000,000.00 rows, Input std.dev.: 173.21% >
>
Fragment 2 [SOURCE] >
CPU: 233.28ms, Scheduled: 318.09ms, Input: 12,000,000 rows (112.39MB); per task: avg.: 6,000,000.00 std.dev.: 2,952,680.00, Output: 12,000,000 rows (91.61MB), 2 tasks >
Output layout: [c_customer_sk] >
Output partitioning: BROADCAST [] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- TableScan[PlanNodeId 1][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_varcha>
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (103.00MB), cpu: 108,000,000.00, memory: 0.00, network: 0.00} >
CPU: 233.00ms (0.12%), Scheduled: 317.00ms (0.14%), Output: 12,000,000 rows (91.61MB) >
Input avg.: 1,500,000.00 rows, Input std.dev.: 199.22% >
LAYOUT: tpcds_sf1000_parquet_varchar.customer{} >
c_customer_sk := c_customer_sk:bigint:0:REGULAR (1:45) >
Input: 12,000,000 rows (112.39MB), Filtered: 0.00% >
>
>
(1 row)
Query 20240313_051826_00050_wguzs, FINISHED, 3 nodes
Splits: 1,080 total, 1,080 done (100.00%)
[Latency: client-side: 0:46, server-side: 0:45] [2.89B rows, 8.02GB] [64.2M rows/s, 182MB/s]
Consider the below query execution on Iceberg tables -
presto:tpcds_sf1000_parquet_iceberg> explain analyze select 1 from store_sales , customer where ss_customer_sk = c_customer_sk;
Query Plan >
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
Fragment 1 [SOURCE] >
CPU: 3.65m, Scheduled: 7.98m, Input: 2,903,987,999 rows (27.04GB); per task: avg.: 1,451,993,999.50 std.dev.: 59,296,407.50, Output: 2,750,397,233 rows (10.26GB), 2 tasks >
Output layout: [expr] >
Output partitioning: SINGLE [] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- Project[PlanNodeId 273][projectLocality = LOCAL] => [expr:integer] >
Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (12.81GB), cpu: 22,461,196,857.00, memory: 52,076,096.00, network: 52,076,096.00} >
CPU: 1.18m (32.20%), Scheduled: 1.00m (12.29%), Output: 2,750,397,233 rows (10.26GB) >
Input avg.: 343,799,654.13 rows, Input std.dev.: 173.40% >
expr := INTEGER'1' >
- InnerJoin[PlanNodeId 384][("ss_customer_sk" = "c_customer_sk")] => [] >
Estimates: {source: CostBasedSourceInfo, rows: 2,750,397,233 (12.81GB), cpu: 8,709,210,692.00, memory: 52,076,096.00, network: 52,076,096.00} >
CPU: 1.05m (28.77%), Scheduled: 1.06m (12.98%), Output: 2,750,397,233 rows (0B) >
Distribution: REPLICATED >
- TableScan[PlanNodeId 0][TableHandle {connectorId='iceberg', connectorHandle='store_sales$data@Optional[664188066876659448]', layout='Optional[store_sales$data@Optional[664188066876659448]]'}, grouped = false] => [ss_customer_sk:>
Estimates: {source: CostBasedSourceInfo, rows: 2,879,987,999 (13.41GB), cpu: 4,289,510,226.00, memory: 0.00, network: 0.00} >
CPU: 1.40m (38.27%), Scheduled: 5.87m (71.89%), Output: 2,879,987,999 rows (26.82GB) >
Input avg.: 359,998,499.88 rows, Input std.dev.: 173.40% >
ss_customer_sk := 4:ss_customer_sk:bigint (1:31) >
Input: 2,879,987,999 rows (26.82GB), Filtered: 0.00% >
- LocalExchange[PlanNodeId 454][HASH] (c_customer_sk) => [c_customer_sk:bigint] >
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (57.22MB), cpu: 52,076,096.00, memory: 0.00, network: 52,076,096.00} >
CPU: 701.00ms (0.32%), Scheduled: 2.01s (0.41%), Output: 24,000,000 rows (226.18MB) >
Input avg.: 3,000,000.00 rows, Input std.dev.: 173.21% >
- RemoteSource[2] => [c_customer_sk:bigint] >
CPU: 300.00ms (0.14%), Scheduled: 428.00ms (0.09%), Output: 24,000,000 rows (226.18MB) >
Input avg.: 3,000,000.00 rows, Input std.dev.: 173.21% >
>
Fragment 2 [SOURCE] >
CPU: 674.02ms, Scheduled: 11.44s, Input: 12,000,000 rows (112.40MB); per task: avg.: 12,000,000.00 std.dev.: 0.00, Output: 12,000,000 rows (91.61MB), 1 tasks >
Output layout: [c_customer_sk] >
Output partitioning: BROADCAST [] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- TableScan[PlanNodeId 1][TableHandle {connectorId='iceberg', connectorHandle='customer$data@Optional[6277298338347287396]', layout='Optional[customer$data@Optional[6277298338347287396]]'}, grouped = false] => [c_customer_sk:bigint] >
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (24.83MB), cpu: 26,038,048.00, memory: 0.00, network: 0.00} >
CPU: 674.00ms (0.31%), Scheduled: 11.44s (2.34%), Output: 12,000,000 rows (91.61MB) >
Input avg.: 3,000,000.00 rows, Input std.dev.: 173.21% >
c_customer_sk := 1:c_customer_sk:bigint (1:45) >
Input: 12,000,000 rows (112.40MB), Filtered: 0.00% >
>
>
(1 row)
Query 20240313_044810_00024_wguzs, FINISHED, 3 nodes
Splits: 860 total, 860 done (100.00%)
[Latency: client-side: 1:16, server-side: 1:15] [2.89B rows, 2.15GB] [38.3M rows/s, 29.1MB/s]
Points of note - Consider the scan of the build size
- TableScan[PlanNodeId 1][TableHandle {connectorId='iceberg', connectorHandle='customer$data@Optional[6277298338347287396]', layout='Optional[customer$data@Optional[6277298338347287396]]'}, grouped = false] => [c_customer_sk:bigint] >
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (24.83MB), cpu: 26,038,048.00, memory: 0.00, network: 0.00} >
CPU: 674.00ms (0.31%), Scheduled: 11.44s (2.34%), Output: 12,000,000 rows (91.61MB) >
Input avg.: 3,000,000.00 rows, Input std.dev.: 173.21% >
c_customer_sk := 1:c_customer_sk:bigint (1:45) >
Input: 12,000,000 rows (112.40MB), Filtered: 0.00%
(24.83MB)
which is under the default broadcast join threshold of 100MB(91.61MB)
, which is very different from the estimate.24.83MB
estimate is derived from the column stats of c_customer_sk
- PlanNodeStatsEstimate{outputRowCount=1.2E7, totalSize=1.4038048E7, variableStatistics={c_customer_sk=VariableStatsEstimate{range=[1.0-1.2E7], nulls=0.0, ndv=1.2E7, rowSize=1.1698373333333334}}, sourceInfo=CostBasedSourceInfo{}, joinNodeSpecificStatsEstimate=JoinNodeStatsEstimate{nullJoinBuildKeyCount=NaN, joinBuildKeyCount=NaN, nullJoinProbeKeyCount=NaN, joinProbeKeyCount=NaN}}
. The function that calculates the outputSize
uses the averageRowSize = 1.1698373333333334
to come up with this estimateLooks at the stats from the Hive execution
Fragment 3 [SOURCE] >
CPU: 487.53ms, Scheduled: 688.26ms, Input: 12,000,000 rows (112.39MB); per task: avg.: 6,000,000.00 std.dev.: 2,952,680.00, Output: 12,000,000 rows (91.61MB), 2 tasks >
Output layout: [c_customer_sk] >
Output partitioning: HASH [c_customer_sk] >
Stage Execution Strategy: UNGROUPED_EXECUTION >
- TableScan[PlanNodeId 1][TableHandle {connectorId='hive', connectorHandle='HiveTableHandle{schemaName=tpcds_sf1000_parquet_varchar, tableName=customer, analyzePartitionValues=Optional.empty}', layout='Optional[tpcds_sf1000_parquet_varcha>
Estimates: {source: CostBasedSourceInfo, rows: 12,000,000 (103.00MB), cpu: 108,000,000.00, memory: 0.00, network: 0.00} >
CPU: 487.00ms (0.15%), Scheduled: 689.00ms (0.13%), Output: 12,000,000 rows (91.61MB) >
Input avg.: 1,500,000.00 rows, Input std.dev.: 199.22% >
LAYOUT: tpcds_sf1000_parquet_varchar.customer{} >
c_customer_sk := c_customer_sk:bigint:0:REGULAR (1:45) >
Input: 12,000,000 rows (112.39MB), Filtered: 0.00%
We see
103.00MB
. Since the backing column stat has the data size stat missing, we use the default size per row (8 bytes) to arrive at this esimate91.61MB
join_max_broadcast_table_size
session property to see if we are near the thresholdI ran Q23 on Hive with set session join_max_broadcast_table_size='104MB';
(up from default of 100MB)
The query latency now matches what we observe with Iceberg - https://grafana.ibm.prestodb.dev/d/cc6a4ccf-2fc7-4220-8f4e-397b03061acf/query-detail-comparisons?orgId=1&from=now-90d&from=1710111542252&to=now&to=1710112384953&var-query1=20240313_110243_00020_u8q26&var-query2=20240311_235004_00031_awbwk
No plan diffs observed between the two queries anymore
We need to experiment with higher broadcast size threshold in our benchmark runs (for Hive and Iceberg). The hardware we're using does allow for a higher sized broadcast, and we may get better perf out of it
@aaneja should we create an issue to fix the Iceberg estimates to be more conservative?
@aaneja should we create an issue to fix the Iceberg estimates to be more conservative?
Yes we need to create one IMO. Will create one Update - I've created https://github.com/prestodb/presto/issues/22201
We need to understand why Iceberg is doing so much better than Hive on Prestissimo for Q23. Dashboard