Open andygrove opened 1 year ago
Possibly related: https://github.com/apache/arrow-datafusion/issues/5995
Suggestion I posted in slack:
I wonder whether there might be some regressions here wrt scalability with the nr of cores - https://github.com/apache/arrow-datafusion/pull/4219 comes to mind which makes building the hashmap for the build-side done in a single thread for smaller tables might be checked by changing datafusion.optimizer.hash_join_single_partition_threshold
For tpch-h q1 (SF=1) the metrics are as follows for the complete plan with metrics is as follows:
SortExec: [l_returnflag@0 ASC NULLS LAST,l_linestatus@1 ASC NULLS LAST], metrics=[output_rows=4, elapsed_compute=25.042µs, spill_count=0, spilled_bytes=0]
CoalescePartitionsExec, metrics=[output_rows=4, elapsed_compute=3.167µs, spill_count=0, spilled_bytes=0, mem_used=0]
ProjectionExec: expr=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus, SUM(lineitem.l_quantity)@2 as sum_qty, SUM(lineitem.l_extendedprice)@3 as sum_base_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)@4 as sum_disc_price, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax)@5 as sum_charge, AVG(lineitem.l_quantity)@6 as avg_qty, AVG(lineitem.l_extendedprice)@7 as avg_price, AVG(lineitem.l_discount)@8 as avg_disc, COUNT(UInt8(1))@9 as count_order], metrics=[output_rows=4, elapsed_compute=8.376µs, spill_count=0, spilled_bytes=0, mem_used=0]
AggregateExec: mode=FinalPartitioned, gby=[l_returnflag@0 as l_returnflag, l_linestatus@1 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))], metrics=[output_rows=4, elapsed_compute=181.588µs, spill_count=0, spilled_bytes=0, mem_used=0]
CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=64, elapsed_compute=350.731µs, spill_count=0, spilled_bytes=0, mem_used=0]
RepartitionExec: partitioning=Hash([Column { name: "l_returnflag", index: 0 }, Column { name: "l_linestatus", index: 1 }], 16), input_partitions=16, metrics=[repart_time=296.707µs, fetch_time=1.652179502s, send_time=110.333µs]
AggregateExec: mode=Partial, gby=[l_returnflag@5 as l_returnflag, l_linestatus@6 as l_linestatus], aggr=[SUM(lineitem.l_quantity), SUM(lineitem.l_extendedprice), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount), SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount * Int64(1) + lineitem.l_tax), AVG(lineitem.l_quantity), AVG(lineitem.l_extendedprice), AVG(lineitem.l_discount), COUNT(UInt8(1))], metrics=[output_rows=64, elapsed_compute=670.144742ms, spill_count=0, spilled_bytes=0, mem_used=0]
ProjectionExec: expr=[CAST(l_extendedprice@1 AS Decimal128(38, 4)) * CAST(Some(100),23,2 - CAST(l_discount@2 AS Decimal128(23, 2)) AS Decimal128(38, 4)) as CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2))CAST(lineitem.l_discount AS Decimal128(23, 2))lineitem.l_discountDecimal128(Some(100),23,2)CAST(lineitem.l_extendedprice AS Decimal128(38, 4))lineitem.l_extendedprice, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax, l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus], metrics=[output_rows=5916591, elapsed_compute=162.828798ms, spill_count=0, spilled_bytes=0, mem_used=0]
CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=5916591, elapsed_compute=29.656446ms, spill_count=0, spilled_bytes=0, mem_used=0]
FilterExec: l_shipdate@6 <= 10471, metrics=[output_rows=5916591, elapsed_compute=54.707373ms, spill_count=0, spilled_bytes=0, mem_used=0]
ParquetExec: limit=None, partitions={16 groups: [[...]]}, predicate=l_shipdate <= Date32("10471"), pruning_predicate=l_shipdate_min@0 <= 10471, projection=[l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate], metrics=[output_rows=6001215, elapsed_compute=16ns, spill_count=0, spilled_bytes=0, mem_used=0, row_groups_pruned=0, predicate_evaluation_errors=0, bytes_scanned=44132502, num_predicate_creation_errors=0, pushdown_rows_filtered=0, page_index_rows_filtered=0, pushdown_eval_time=32ns, time_elapsed_scanning_until_data=493.033123ms, time_elapsed_scanning_total=1.643740915s, time_elapsed_processing=290.136543ms, time_elapsed_opening=6.494334ms, page_index_eval_time=32ns]
Relevant metrics:
I ran the benchmarks with a simpler version of q1 with just the table scan and filter:
select
*
from
lineitem
where
l_shipdate <= date '1998-12-01' - interval '113 days';
Here are the results:
Cores | DataFusion Python 22.0.0 | DuckDB | DuckDB x Times Faster |
---|---|---|---|
1 | 14376 | 15000.9 | 0.96 |
2 | 7547.9 | 7915.7 | 0.95 |
4 | 4243.9 | 3850.7 | 1.10 |
8 | 2655.8 | 1869.3 | 1.42 |
16 | 2096.6 | 954.9 | 2.20 |
32 | 2252.2 | 531.7 | 4.24 |
Here are the results for just the table scan:
select
*
from
lineitem;
Cores | DataFusion Python 22.0.0 | DuckDB | DuckDB x Times Faster |
---|---|---|---|
1 | 14501.3 | 14751 | 0.98 |
2 | 7562.5 | 8135.8 | 0.93 |
4 | 4508.7 | 3618.6 | 1.25 |
8 | 2363.2 | 1837.5 | 1.29 |
16 | 2135.4 | 888 | 2.40 |
32 | 2355 | 511.4 | 4.61 |
This screenshot shows CPU usage with the DataFusion run on the left and the DuckDB run on the right, both using 32 cores.
Here is the directory listing of my lineitem directory:
$ ls -l /mnt/bigdata/tpch/sf10-parquet/lineitem.parquet/
total 2134532
-rw-rw-r-- 1 andy andy 91100218 Feb 3 08:45 part-0.parquet
-rw-rw-r-- 1 andy andy 90998179 Feb 3 08:46 part-10.parquet
-rw-rw-r-- 1 andy andy 91113498 Feb 3 08:46 part-11.parquet
-rw-rw-r-- 1 andy andy 90979631 Feb 3 08:46 part-12.parquet
-rw-rw-r-- 1 andy andy 91042031 Feb 3 08:46 part-13.parquet
-rw-rw-r-- 1 andy andy 91013004 Feb 3 08:46 part-14.parquet
-rw-rw-r-- 1 andy andy 91102199 Feb 3 08:46 part-15.parquet
-rw-rw-r-- 1 andy andy 91113429 Feb 3 08:46 part-16.parquet
-rw-rw-r-- 1 andy andy 91057291 Feb 3 08:46 part-17.parquet
-rw-rw-r-- 1 andy andy 90968073 Feb 3 08:46 part-18.parquet
-rw-rw-r-- 1 andy andy 91085729 Feb 3 08:47 part-19.parquet
-rw-rw-r-- 1 andy andy 91065849 Feb 3 08:45 part-1.parquet
-rw-rw-r-- 1 andy andy 90977725 Feb 3 08:47 part-20.parquet
-rw-rw-r-- 1 andy andy 91092444 Feb 3 08:47 part-21.parquet
-rw-rw-r-- 1 andy andy 91114853 Feb 3 08:47 part-22.parquet
-rw-rw-r-- 1 andy andy 91029929 Feb 3 08:47 part-23.parquet
-rw-rw-r-- 1 andy andy 91139350 Feb 3 08:46 part-2.parquet
-rw-rw-r-- 1 andy andy 91167180 Feb 3 08:46 part-3.parquet
-rw-rw-r-- 1 andy andy 91083996 Feb 3 08:46 part-4.parquet
-rw-rw-r-- 1 andy andy 91024700 Feb 3 08:46 part-5.parquet
-rw-rw-r-- 1 andy andy 91022201 Feb 3 08:46 part-6.parquet
-rw-rw-r-- 1 andy andy 91101885 Feb 3 08:46 part-7.parquet
-rw-rw-r-- 1 andy andy 91198702 Feb 3 08:46 part-8.parquet
-rw-rw-r-- 1 andy andy 91120030 Feb 3 08:46 part-9.parquet
@Dandandan I added more benchmark results. The issues appear to be related to the ParquetExec
.
I could also replicate the issue of non-perfect scaling with loading the tables in memory.
One thing I noticed is that the current round-robin RepartitionExec
doesn't spread the batches evenly over the number of output channels, which can already be seen in MemoryExec itself:
MemoryExec: partitions=32, partition_sizes=[32, 32, 32, 32, 32, 32, 32, 32, 26, 26, 26, 25, 25, 25, 25, 25, 25, 25, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16, 16], metrics=[]
It has a bias for the starting partitions as the outputs always go first to those channels.
Also, I wonder if DistributionSender
/ DistributionReceiver
didn't introduce any regressions here FYI @crepererum
Also, I wonder if
DistributionSender
/DistributionReceiver
didn't introduce any regressions here FYI @crepererum
Definitely possible, even if it's only due to changes it triggers in tokio's scheduler. On the other hand, tokio's scheduler isn't really optimized for the DataFusion use case in the first place (as @tustvold probably pointed out before).
There is a small application written in Rust called odbc2parquet that reads data from databases and exports parquet files. For testing on dummy data I think it is useful.
Here are latest results with DataFusion Python 23, which fixes a bug around how the Tokio runtime was being managed.
Cores | DuckDB 0.7.1 | DataFusion Python 21.0.0 | DataFusion Python 23.0.0 | DuckDB x Faster |
---|---|---|---|---|
1 | 315,631 | 318,038 | 218,604 | 0.7 |
2 | 155,035 | 197,257 | 142,480 | 0.9 |
4 | 75,110 | 111,243 | 93,804 | 1.2 |
8 | 37,585 | 73,142 | 58,083 | 1.5 |
16 | 18,880 | 55,072 | 45,016 | 2.4 |
Interestingly, the speedup seems bigger for fewer cores.
There have been quite a few improvement for Q1 between 21 and 23 for aggregates and casts, that might explain most of the difference.
I believe https://github.com/apache/arrow-datafusion/pull/6929 from @YjyJeff will perhaps help with this
Latest results with DF 33
I believe that https://github.com/apache/arrow-datafusion/issues/6937 (basically that the fact we are doing much more work as the cores go up (as each group gets processed in each core) contributes to this problem
I produced the chart using scripts at https://github.com/sql-benchmarks/sqlbench-runners/blob/main/scripts
@sunchao @viirya cc ^^^^
The regression between 32 and 33 seems to be caused by my "improvement" to filter statistics, although we saw that others saw big improvements with 33 (such as https://x.com/mim_djo/status/1725510410567307702?s=46&t=8JleG5FR5SAVU1vl35CPLg) so maybe this is environment specific. I will continue to debug this in the next few days.
$ git bisect bad
6fe00ce2e30d2af2b64d3a97b877a96109c215f9 is the first bad commit
commit 6fe00ce2e30d2af2b64d3a97b877a96109c215f9
Author: Andy Grove <andygrove73@gmail.com>
Date: Sun Nov 12 01:41:15 2023 -0700
Fix join order for TPCH Q17 & Q18 by improving FilterExec statistics (#8126)
I wonder if this is just bad configuration? I created a PR to update the DF configurations to use the defaults. https://github.com/sql-benchmarks/sqlbench-runners/pull/31
@Dandandan was correct. It was the configs that were the issue. Here is a run of just versions 32 and 33 using default configs.
Is your feature request related to a problem or challenge?
I ran some benchmarks in constrained Docker containers and found that DataFusion is pretty close to DuckDB speed when running on a single core but does not scale as well as DuckDB when more cores are added.
At 16 cores, DuckDB was ~2.9x faster than DataFusion for this particular test.
Describe the solution you'd like
I would like DataFusion to scale as well as DuckDB as more cores are added.
Describe alternatives you've considered
No response
Additional context
Instructions for creating the Docker images can be found at https://github.com/sql-benchmarks/sqlbench-runners