apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
808 stars 157 forks source link

Improve performance of broadcast hash join #808

Open andygrove opened 2 months ago

andygrove commented 2 months ago

What is the problem the feature request solves?

Query:

select ss_sold_date_sk, ss_sold_time_sk, ss_quantity, d_year, d_moy, d_dom
from date_dim join store_sales on d_date_sk = ss_sold_date_sk
where d_year = 2000;

Benchmark results:

AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
join_inner                                          495            509          13        582.3           1.7       1.0X
join_inner: Comet (Scan)                            736            750          14        391.4           2.6       0.7X
join_inner: Comet (Scan, Exec)                     1094           1110          22        263.3           3.8       0.5X

Native metrics (for one task).

ProjectionExec: expr=[col_0@4 as col_0, col_1@5 as col_1, col_2@6 as col_2, col_1@1 as col_3, col_2@2 as col_4, col_3@3 as col_5], metrics=[output_rows=582202, elapsed_compute=181.746µs]
  HashJoinExec: mode=Partitioned, join_type=Inner, on=[(col_0@0, col_0@0)], metrics=[output_rows=582202, build_input_rows=366, output_batches=370, build_input_batches=1, input_rows=2894083, input_batches=370, build_mem_used=15032, build_time=46.427µs, join_time=18.433483ms]
    CopyExec, metrics=[output_rows=366, elapsed_compute=9.938µs]
      ScanExec: source=[BroadcastExchange (unknown)], schema=[col_0: Int32, col_1: Int32, col_2: Int32, col_3: Int32], metrics=[output_rows=366, elapsed_compute=560ns]
    CopyExec, metrics=[output_rows=2894083, elapsed_compute=4.801962ms]
      FilterExec: col_0@0 IS NOT NULL, metrics=[output_rows=2894083, elapsed_compute=23.927183ms]
        ScanExec: source=[CometScan parquet spark_catalog.default.store_sales (unknown)], schema=[col_0: Int32, col_1: Int32, col_2: Int32], metrics=[output_rows=3030375, elapsed_compute=12.800384ms]

Describe the potential solution

No response

Additional context

No response

andygrove commented 2 months ago

The FilterExec in the above example is even more expensive than the HashJoinExec. Evaluating the predicate is cheap but copying data to the filtered batch takes 99% of the time. We could potentially avoid this copy by using a selection vector approach instead.

Time to compute filter mask on batch of 32768 rows is: 581ns
Time to filter batch is: 252.194µs
andygrove commented 2 months ago

The filter on the probe input is very simple (col_0@0 IS NOT NULL) and it should be possible to push down to the parquet scan?

edit: we do push the filter down to the scan:

      +- CometFilter [ss_sold_date_sk#1545, ss_sold_time_sk#1546, ss_quantity#1555], isnotnull(ss_sold_date_sk#1545)
         +- CometScan parquet ...  PushedFilters: [IsNotNull(ss_sold_date_sk)], ...

The FilterExec does receive batches where ss_sold_date_sk is null though:

predicate: length=8192, true=7843, false=349
predicate: length=8192, true=7832, false=360
predicate: length=8192, true=7846, false=346
andygrove commented 2 months ago

Latest results after merging https://github.com/apache/datafusion-comet/pull/835

sf 10

AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
join_inner                                           98            110           8        293.4           3.4       1.0X
join_inner: Comet (Scan)                            125            137          11        231.2           4.3       0.8X
join_inner: Comet (Scan, Exec)                      151            164          12        190.9           5.2       0.7X

sf 100

OpenJDK 64-Bit Server VM 11.0.24+8-post-Ubuntu-1ubuntu322.04 on Linux 6.5.0-45-generic
AMD Ryzen 9 7950X3D 16-Core Processor
TPCDS Micro Benchmarks:                   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
join_inner                                          516            535          23        558.4           1.8       1.0X
join_inner: Comet (Scan)                            747            767          10        385.8           2.6       0.7X
join_inner: Comet (Scan, Exec)                      990           1018          17        291.1           3.4       0.5X