apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.3k stars 1.19k forks source link

Avoid extra copies in `CoalesceBatchesExec` to improve performance #7957

Open alamb opened 1 year ago

alamb commented 1 year ago

Is your feature request related to a problem or challenge?

While looking at TPCH query performance for #6782 I noticed several times that CoalesceBatchesExec takes non trivial amounts of time (like 5% of the overall query time)

Here is a specific examples

Create Data:

cd arrow-datafusion/benchmarks
./bench.sh  data tpch10

Run query with datafusion-cli:

cd arrow-datafusion/benchmarks/data/tpch_sf10
datafusion-cli -c "select   o_year, sum(case    when nation = 'BRAZIL' then volume  else 0  end) / sum(volume) as mkt_share from    (   select  extract(year from o_orderdate) as o_year,   l_extendedprice * (1 - l_discount) as volume,   n2.n_name as nation from    part,   supplier,   lineitem,   orders, customer,   nation n1,  nation n2,  region  where   p_partkey = l_partkey   and s_suppkey = l_suppkey   and l_orderkey = o_orderkey and o_custkey = c_custkey   and c_nationkey = n1.n_nationkey    and n1.n_regionkey = r_regionkey    and r_name = 'AMERICA'  and s_nationkey = n2.n_nationkey    and o_orderdate between date '1995-01-01' and date '1996-12-31' and p_type = 'ECONOMY ANODIZED STEEL'   ) as all_nations    group by    o_year  order by    o_year;"
select  o_year,
  sum(case  when nation = 'BRAZIL' then volume  else 0  end) / sum(volume) as mkt_share
from (
  select
    extract(year from o_orderdate) as o_year,
    l_extendedprice * (1 - l_discount) as volume,
    n2.n_name as nation
  from
    part,
    supplier,
    lineitem,
    orders,
    customer,
    nation n1,
    nation n2,
    region
  where
    p_partkey = l_partkey
    and s_suppkey = l_suppkey
    and l_orderkey = o_orderkey
    and o_custkey = c_custkey
    and c_nationkey = n1.n_nationkey
    and n1.n_regionkey = r_regionkey
    and r_name = 'AMERICA'
    and s_nationkey = n2.n_nationkey
    and o_orderdate between date '1995-01-01' and date '1996-12-31' and p_type = 'ECONOMY ANODIZED STEEL'   ) as all_nations
  group by
    o_year
  order by
    o_year;"

Here is the full EXPLAIN ANLAYZE output: explan-analyze-q8.txt

A small subset shows there is a single CoalesceBatchesExec that takes 3 seconds (elapsed_compute=3.066514072s):

CoalesceBatchesExec: target_batch_size=8192, metrics=[output_rows=59986052, elapsed_compute=3.066514072s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
  RepartitionExec: partitioning=Hash([l_partkey@1], 16), input_partitions=16, metrics=[fetch_time=16.321190026s, repart_time=10.382230411s, send_time=4.650058274s]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                         |
     ParquetExec: file_groups={16 groups: [[Users/alamb/Software/arrow-datafusion/benchmarks/data/tpch_sf10

I profiled the query and confirmed that CoalesceBatchExec takes 5% of the overall time, as shown in this screen shot

Screenshot 2023-10-27 at 7 30 00 AM

In diagrams this looks like

┌────────────────────┐        Filter                                                                          
│                    │                    ┌────────────────────┐            Coalesce                          
│                    │    ─ ─ ─ ─ ─ ─ ▶   │    RecordBatch     │             Batches                          
│    RecordBatch     │                    │   num_rows = 234   │─ ─ ─ ─ ─ ┐                                   
│  num_rows = 8000   │                    └────────────────────┘                                              
│                    │                                                    │                                   
│                    │                                                                ┌────────────────────┐  
└────────────────────┘                                                    │           │                    │  
┌────────────────────┐                    ┌────────────────────┐                      │                    │  
│                    │        Filter      │                    │          │           │                    │  
│                    │                    │    RecordBatch     │           ─ ─ ─ ─ ─ ▶│                    │  
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   num_rows = 500   │─ ─ ─ ─ ─ ┐           │                    │  
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │  
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │  
│                    │                    └────────────────────┘                      │                    │  
└────────────────────┘                                                                │                    │  
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │  
          ...                   ...                                       │           │                    │  
                                                                                      │                    │  
┌────────────────────┐                                                    │           └────────────────────┘  
│                    │                    ┌────────────────────┐                                              
│                    │       Filter       │                    │          │                                   
│    RecordBatch     │                    │    RecordBatch     │                                              
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   num_rows = 333   │─ ─ ─ ─ ─ ┘                                   
│                    │                    │                    │                                              
│                    │                    └────────────────────┘                                              
└────────────────────┘                                                                                        

                      FilterExec                                          RepartitonExec copies the data      
                      creates output batches with copies                  *again* to form final large         
                      of  the matching rows (calls take()                 RecordBatches                       
                      to make a copy)                                                                         

Describe the solution you'd like

I think we can avoid this overhead by combining the behavior of CoalesceBatchesExec into the operators that make small batches (FilterExec, JoinExec, and RepartitionExec). Something like

┌────────────────────┐        Filter                                                                        
│                    │                    ┌────────────────────┐          Filter output                     
│                    │    ─ ─ ─ ─ ─ ─ ▶   │        mask        │                                            
│    RecordBatch     │                    │   (BooleanArray)   │─ ─ ─ ─ ─ ┐                                 
│  num_rows = 8000   │                    └────────────────────┘                                            
│                    │                                                    │                                 
│                    │                                                                ┌────────────────────┐
└────────────────────┘                                                    │           │                    │
┌────────────────────┐                    ┌────────────────────┐                      │                    │
│                    │        Filter      │                    │          │           │                    │
│                    │                    │        mask        │           ─ ─ ─ ─ ─ ▶│                    │
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   (BooleanArray)   │─ ─ ─ ─ ─ ┐           │                    │
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │
│                    │                    └────────────────────┘                      │                    │
└────────────────────┘                                                                │                    │
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │
          ...                   ...                                       │           │                    │
                                                                                      │                    │
┌────────────────────┐                                                    │           └────────────────────┘
│                    │                    ┌────────────────────┐                                            
│                    │       Filter       │                    │          │                                 
│    RecordBatch     │                    │        mask        │                                            
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   (BooleanArray)   │─ ─ ─ ─ ─ ┘                                 
│                    │                    │                    │                                            
│                    │                    └────────────────────┘                                            
└────────────────────┘                                                                                      

                      FilterExec et all                             The Exec then copies rows from          
                      internally buffers RecordBatches and          multiple input batches (based on the    
                      filter results until enough rows are          masks) into a single new output         
                      ready                                         RecordBatch avoiding the second data    
                                                                    copy                                    

The idea would be to take the core coalesce logic from CoalesceBatchesExec that calls concat_batches And instead of creating new small record batches in FilterExec, HashJoinExec, and RepartitionExec buffer the inputs until there are at least target_batch_size rows available, and then call interleave instead

Here is the code in CoalesceBatchesExec that could be adapted: https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/coalesce_batches.rs#L215-L283

Here is where FilterExec makes the potentially small record batches https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/filter.rs#L294-L306

The same think would be done in RepartitionExec: https://github.com/apache/arrow-datafusion/blob/a9d66e2b492843c2fb335a7dfe27fed073629b09/datafusion/physical-plan/src/repartition/mod.rs#L193-L218

Describe alternatives you've considered

No response

Additional context

No response

tustvold commented 1 year ago

Is the majority of the concat_batches overhead string concatenation? Just wondering if this is something that StringView might help with?

alamb commented 1 year ago

From an internal conversation, @tustvold points out that interleave is fair bit slower than the filter kernel, especially for primitives

The current code uses filter then concat_batches

The proposal would use interleave

Thus we may have to potentially special case how the columns are handled depending on type (e.g. what kernels are called for what operations)

Dandandan commented 11 months ago

I tried (https://github.com/Dandandan/arrow-datafusion/tree/buffer_batches) to convert the code to use interleave instead of concat (CoalesceBatches) (for hash repartitioning), but this leads to no improvement whatsoever (even a small slowdown):

┏━━━━━━━━━━━━━━┳━━━━━━━━━━┳━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃     main ┃ buffer_batches ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━╇━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 1     │  93.31ms │        90.88ms │     no change │
│ QQuery 2     │  26.18ms │        28.39ms │  1.08x slower │
│ QQuery 3     │  53.96ms │        57.86ms │  1.07x slower │
│ QQuery 4     │  52.33ms │        51.77ms │     no change │
│ QQuery 5     │ 114.93ms │       125.69ms │  1.09x slower │
│ QQuery 6     │   9.38ms │         9.20ms │     no change │
│ QQuery 7     │ 211.60ms │       227.30ms │  1.07x slower │
│ QQuery 8     │  61.54ms │        59.56ms │     no change │
│ QQuery 9     │  60.66ms │        63.80ms │  1.05x slower │
│ QQuery 10    │ 114.31ms │       114.95ms │     no change │
│ QQuery 11    │  20.12ms │        20.54ms │     no change │
│ QQuery 12    │  59.98ms │        59.26ms │     no change │
│ QQuery 13    │  53.37ms │        56.42ms │  1.06x slower │
│ QQuery 14    │  18.47ms │        18.63ms │     no change │
│ QQuery 15    │  58.99ms │        59.31ms │     no change │
│ QQuery 16    │  22.79ms │        22.27ms │     no change │
│ QQuery 17    │  54.41ms │        53.59ms │     no change │
│ QQuery 18    │ 158.61ms │       162.68ms │     no change │
│ QQuery 19    │  36.98ms │        34.78ms │ +1.06x faster │
│ QQuery 20    │  62.49ms │        62.04ms │     no change │
│ QQuery 21    │ 251.42ms │       256.99ms │     no change │
│ QQuery 22    │  14.35ms │        14.14ms │     no change │
└──────────────┴──────────┴────────────────┴───────────────┘
Dandandan commented 11 months ago

Another finding to add to the list is that changing RepartitionExec to do a filter (with hash repartitioning) is a bit slower than take or interleave on tpc-h benchmarks.

alamb commented 11 months ago

Thanks for the info @Dandandan -- those are some interesting results

alamb commented 3 months ago

I made some pictures for what I had in mind

This is what happens today (FilterExec + RepartitionExec):

┌────────────────────┐        Filter                                                                          
│                    │                    ┌────────────────────┐            Coalesce                          
│                    │    ─ ─ ─ ─ ─ ─ ▶   │    RecordBatch     │             Batches                          
│    RecordBatch     │                    │   num_rows = 234   │─ ─ ─ ─ ─ ┐                                   
│  num_rows = 8000   │                    └────────────────────┘                                              
│                    │                                                    │                                   
│                    │                                                                ┌────────────────────┐  
└────────────────────┘                                                    │           │                    │  
┌────────────────────┐                    ┌────────────────────┐                      │                    │  
│                    │        Filter      │                    │          │           │                    │  
│                    │                    │    RecordBatch     │           ─ ─ ─ ─ ─ ▶│                    │  
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   num_rows = 500   │─ ─ ─ ─ ─ ┐           │                    │  
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │  
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │  
│                    │                    └────────────────────┘                      │                    │  
└────────────────────┘                                                                │                    │  
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │  
          ...                   ...                                       │           │                    │  
                                                                                      │                    │  
┌────────────────────┐                                                    │           └────────────────────┘  
│                    │                    ┌────────────────────┐                                              
│                    │       Filter       │                    │          │                                   
│    RecordBatch     │                    │    RecordBatch     │                                              
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   num_rows = 333   │─ ─ ─ ─ ─ ┘                                   
│                    │                    │                    │                                              
│                    │                    └────────────────────┘                                              
└────────────────────┘                                                                                        

                      FilterExec                                          RepartitonExec copies the data      
                      creates output batches with copies                  *again* to form final large         
                      of  the matching rows (calls take()                 RecordBatches                       
                      to make a copy)                                                                         

Instead do it like this (only buffer the filter mask results and create the final output in one go):

┌────────────────────┐        Filter                                                                        
│                    │                    ┌────────────────────┐          Filter output                     
│                    │    ─ ─ ─ ─ ─ ─ ▶   │        mask        │                                            
│    RecordBatch     │                    │   (BooleanArray)   │─ ─ ─ ─ ─ ┐                                 
│  num_rows = 8000   │                    └────────────────────┘                                            
│                    │                                                    │                                 
│                    │                                                                ┌────────────────────┐
└────────────────────┘                                                    │           │                    │
┌────────────────────┐                    ┌────────────────────┐                      │                    │
│                    │        Filter      │                    │          │           │                    │
│                    │                    │        mask        │           ─ ─ ─ ─ ─ ▶│                    │
│    RecordBatch     │    ─ ─ ─ ─ ─ ─ ▶   │   (BooleanArray)   │─ ─ ─ ─ ─ ┐           │                    │
│  num_rows = 8000   │                    │                    │                      │    RecordBatch     │
│                    │                    │                    │          └ ─ ─ ─ ─ ─▶│  num_rows = 8000   │
│                    │                    └────────────────────┘                      │                    │
└────────────────────┘                                                                │                    │
                                                    ...                    ─ ─ ─ ─ ─ ▶│                    │
          ...                   ...                                       │           │                    │
                                                                                      │                    │
┌────────────────────┐                                                    │           └────────────────────┘
│                    │                    ┌────────────────────┐                                            
│                    │       Filter       │                    │          │                                 
│    RecordBatch     │                    │        mask        │                                            
│  num_rows = 8000   │   ─ ─ ─ ─ ─ ─ ▶    │   (BooleanArray)   │─ ─ ─ ─ ─ ┘                                 
│                    │                    │                    │                                            
│                    │                    └────────────────────┘                                            
└────────────────────┘                                                                                      

                      FilterExec et all                             The Exec then copies rows from          
                      internally buffers RecordBatches and          multiple input batches (based on the    
                      filter results until enough rows are          masks) into a single new output         
                      ready                                         RecordBatch avoiding the second data    
                                                                    copy                                    
alamb commented 3 months ago

Maybe we could add a filter_multi kernel along side https://docs.rs/arrow/latest/arrow/compute/kernels/filter/fn.filter.html

The signature would be like

/// filters paris of arrays / predicates into a single output arrauy
pub fn filter_multi(
    input: impl IntoIterator<Item = (&dyn Array, &BooleanArray>)
) -> Result<Arc<dyn Array>, ArrowError>

And then the internal machinery could be the same

2010YOUY01 commented 3 months ago

TPCH-Q1's expensive CoalesceBatches might be easier to solve 🤔 Unlike TPCH-Q8 (looks like with more selective predicate), Q1's filter only throws away a small number of rows:

input_batch(4096 rows) -> [FILTER] -> output_batch(~4000 rows)

And the following CoalesceBatches condition will be triggered every time to copy large output batches https://github.com/apache/datafusion/blob/49d9d45f36989cd448ed6513af65948b6b0100ec/datafusion/physical-plan/src/coalesce_batches.rs#L228 But if it didn't do coalescing, the output batch still benefits from vectorization, so maybe this coalescing threshold can be better tuned to like if self.buffered_rows >= 0.6 * self.target_batch_size {

I remember I tried before, but the overall performance on Q1 is like 2% improvement, I think it's possible to set a better threshold for triggering coalescing.

alamb commented 3 months ago

Thanks @2010YOUY01 -- that sounds neat. Also thanks for the tip related to Q1 as a good candidate query this could improve.

In general I agree there are likely easier ways to improve specific queries. I am still bullish on the more general idea off trying to avoid the copy (I think it would be good for several percentage points across many queries), though it will be a fairly substantial taks

I am going to spend some more time prototyping and see how it goes

alamb commented 3 months ago

I am pretty happy with how this is headed.

I have a PR to start refactoring the code here https://github.com/apache/datafusion/pull/11610

I am also feeling good about my WIP prototype