apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

Performance regression after adding support for SMJ with join filter #901

Open andygrove opened 2 months ago

andygrove commented 2 months ago

Describe the bug

When running TPC-DS benchmarks against 100 GB data set I see a large regression in performance. For example, here are the timings for q72 before and after adding support for SMJ with join condition.

Adding support for SMJ with join condition means that more of the plan is likely running natively and the performance issue isn't necessarily directly related to SMJ.

before

    "72": [
        22.729433059692383,
        18.11495876312256,
        17.545786142349243
    ]

after

    "72": [
        38.576566219329834,
        35.433213233947754,
        35.262585401535034
    ]

A secondary issue is that I do not see metrics for CometSort / CometSortMergeJoin.

Screenshot from 2024-09-01 10-48-04

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

andygrove commented 2 months ago

Disabling sortMergeJoin via configs restores the original performance.

comphead commented 2 months ago

Thanks @andygrove I'm planning to profile it. Just for reference Q72 using INNER and LEFT join types

viirya commented 2 months ago

A secondary issue is that I do not see metrics for CometSort / CometSortMergeJoin.

Hmm, @andygrove Can you verify it again? In unit test, we have a test for CometSortMergeJoinExec metrics. It should have SQL metrics.

andygrove commented 2 months ago

I ran again with latest from main (0033), and then with SMJ + join filter disabled manually (0034). Here are the event logs.

app-20240904131653-0033.gz

app-20240904132048-0034.gz

andygrove commented 2 months ago

Here is a screenshot comparing the plans with SMJ+filter enabled on the left and disabled on the right.

Screenshot from 2024-09-04 13-34-37

viirya commented 2 months ago

Hmm, I will run locally to see why the metrics are not there but they are in unit test.

comphead commented 2 months ago

Im running slightly changed Q72 in DF

select  i_item_desc
      ,w_warehouse_name
      ,d1.d_week_seq
      ,sum(case when p_promo_sk is null then 1 else 0 end) no_promo
      ,sum(case when p_promo_sk is not null then 1 else 0 end) promo
      ,count(*) total_cnt
from catalog_sales
join date_dim d1 on (cs_sold_date_sk = d1.d_date_sk)
join customer_demographics on (cs_bill_cdemo_sk = cd_demo_sk)
join household_demographics on (cs_bill_hdemo_sk = hd_demo_sk)
join item on (i_item_sk = cs_item_sk)
join inventory on (cs_item_sk = inv_item_sk)
join warehouse on (w_warehouse_sk=inv_warehouse_sk)
join date_dim d2 on (inv_date_sk = d2.d_date_sk)
join date_dim d3 on (cs_ship_date_sk = d3.d_date_sk)
left outer join promotion on (cs_promo_sk=p_promo_sk)
left outer join catalog_returns on (cr_item_sk = cs_item_sk and cr_order_number = cs_order_number)
where d1.d_week_seq = d2.d_week_seq
  and inv_quantity_on_hand < cs_quantity 
  and d3.d_date > d1.d_date + interval '5' day
  and d1.d_year = 1999
group by i_item_desc,w_warehouse_name,d1.d_week_seq
order by total_cnt desc, i_item_desc, w_warehouse_name, d_week_seq
LIMIT 100

HashJoin - 9sec, SMJ - 20 sec

Having LEFT OUTER joins removed the results are still the same

HashJoin - 8.5 SMJ - 19 sec

I'll build a flamegraph for SMJ soon

comphead commented 2 months ago

smj

Looks like a lot of time spent on arrow_select::take::take_impl which is specific for filtered join. take gets filtered matched indices from the joined data

viirya commented 2 months ago

A secondary issue is that I do not see metrics for CometSort / CometSortMergeJoin.

Hmm, @andygrove Can you verify it again? In unit test, we have a test for CometSortMergeJoinExec metrics. It should have SQL metrics.

@andygrove I just ran a simple sort merge join query locally on Spark 4.0 + Comet built from latest main branch:

Screenshot 2024-09-09 at 1 28 20 PM

Looks like the metrics are shown for CometSort / CometSortMergeJoin.