apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
5.57k stars 1.04k forks source link

Improve performance for grouping by variable length columns (strings) #9403

Open alamb opened 4 months ago

alamb commented 4 months ago

Is your feature request related to a problem or challenge?

As always I would like faster aggregation performance

Describe the solution you'd like

clickbench, Q17 and Q18 include

SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;
SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" LIMIT 10;
SELECT "UserID", extract(minute FROM to_timestamp_seconds("EventTime")) AS m, "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", m, "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10;

This is an Int 64 and string

DataFusion CLI v36.0.0
❯ describe 'hits.parquet';
+-----------------------+-----------+-------------+
| column_name           | data_type | is_nullable |
+-----------------------+-----------+-------------+
...
| UserID                | Int64     | NO          |
...
| SearchPhrase          | Utf8      | NO          |
...
+-----------------------+-----------+-------------+
105 rows in set. Query took 0.035 seconds.

In some profiling of Q19, SELECT "UserID", "SearchPhrase", COUNT(*) FROM hits GROUP BY "UserID", "SearchPhrase" ORDER BY COUNT(*) DESC LIMIT 10; I found that 20-30% of the time is spent going from Array --> Row or Row --> Array.

Thus I think adding some special handling for variable length data vs fixed length data in the group management may help

Background

GroupValuesRows, used for the queries above, is here: https://github.com/apache/arrow-datafusion/blob/edec4189242ab07ac65967490537d77e776aad5c/datafusion/physical-plan/src/aggregates/group_values/row.rs#L32

Given a query like SELECT ... GROUP BY i1, i2, s1, where i1 and i2 are integer columns and s1 is a string column

For input looks like this:

                       ┌─────┐ ┌─────────┐                                                       
                       │  0  │ │TheQuickB│                                                       
┌─────┐   ┌─────┐      ├─────┤ │rownFox..│                                                       
│  1  │   │ 10  │      │ 100 │ │.FooSomeO│               In the input Arrow Arrays, variable     
├─────┤   ├─────┤      ├─────┤ │therVeryL│               length columns have offsets into other  
│  2  │   │ 20  │      │ 103 │ │argeStrin│               buffers                                 
├─────┤   ├─────┤      ├─────┤ │gs       │                                                       
│  5  │   │ 50  │      │ 300 │ │         │                                                       
├─────┤   ├─────┤      ├─────┤ └─────────┘                                                       
│ ... │   │ ... │      │ ... │  data (s1)                                                        
├─────┤   ├─────┤      ├─────┤                                                                   
│  6  │   │ 60  │      │ 600 │                                                                   
└─────┘   └─────┘      └─────┘                                                                   
                       offsets (s1)                                                              

   i1        i2                s1                                                                

GroupValuesRows will do

┌────────────────────────────┐                                                                   
│1|10|TheQuickBrownFox....   │                                                                   
└────────────────────────────┘                           In GroupValuesRows, each input row is   
                                                         copied into Row format (simplified      
┌───────────┐                                            version shown here), including the      
│2|20|Foo   │                                            entire string content                   
└───────────┘                                                                                    

┌────────────────────────────────────┐                                                           
│3|30|SomeOtherVeryLargeString...    │                                                           
└────────────────────────────────────┘                                                           

One downside of this approach is that for "large" strings, a substantial amount of copying is required simply to check if the group is already present

Describe alternatives you've considered

The idea is to use a modified version of the group keys where the fixed length part still uses row format, but the variable length columns use an approach like in GroupValuesByes

Something like

 ┌────┐   ┌────────────────┐                        
 │1|10│   │    offset 0    │             ┌─────────┐
 └────┘   │     len 3      │             │FooTheQui│
          └────────────────┘             │ckBrownFo│
 ┌────┐   ┌────────────────┐             │x...SomeO│
 │2|20│   │    offset 3    │             │therVeryL│
 └────┘   │    len 100     │             │argeStrin│
          └────────────────┘             │gs       │
 ┌────┐                                  │         │
 │3|30│   ┌────────────────┐             └─────────┘
 └────┘   │   offset 103   │                data    
          │    len 200     │                        
Use Rows  └────────────────┘               out of   
  for                                       line    
 fixed        offsets +                    buffer   
  part     lengths for each                 for     
            variable part                           

Additional context

No response

yjshen commented 4 months ago

String/binary prefix stored in place similar to ArrowBytesMap might still be a valid plus since it allows us to avoid chase pointers sometimes.

alamb commented 4 months ago

String/binary prefix stored in place similar to ArrowBytesMap might still be a valid plus since it allows us to avoid chase pointers sometimes.

Yes, indeed -- I think an approach similar to or maybe even using ArrowBytesMap would be valuable to explore. The approach in ArrowBytesMap minimizes copies (each output string is copied once, and the final results is emitted without copying)

alamb commented 4 months ago

BTW I may play around with this approach as a fun side project if/when I have time. In general, my high level strategy would be to hack up GroupValuesRows with this approach enough to try and validate that it would actually improve performance

If it did, then I would spend the time obsessing over / optimizing the corner cases

alamb commented 3 months ago

BTW the DuckDB paper (which I have not yet read) seems to describe a very similar layout for variable length strings: https://duckdb.org/2024/03/29/external-aggregation

Screenshot 2024-04-02 at 8 51 55 AM

jayzhan211 commented 1 month ago

I will take a look on this first 👀

jayzhan211 commented 4 weeks ago

I try to an approach that take multiple columns into consideration but found that the time spend on insert_accounted largely increase. Rough idea is split columns into two, fixed width (primitives) and variable length (string, binary). Fixed width columns are converted to arrow::Rows, others follow the idea like ArrowBytesMap.

self.map.insert_accounted(
    new_header,
    |header| header.hash,
    &mut self.map_size,
);

The hash entry includes vector like

struct Entry<O>
where
    O: OffsetSizeTrait,
{
    /// hash of the value (stored to avoid recomputing it in hash table check)
    hash: u64,

   /// each variable length column's offset or inline(short string)
    offset_or_inline: Vec<usize>,

    /// each variable length column's length. None for null.
    len: Vec<Option<O>>,
    group_id: usize,
}

It seems that we need avoid adding Vec into hash entry, so ArrowBytesMap's idea couldn't help much :(

Ref: https://github.com/apache/datafusion/pull/10937

I will try converting variable length column info (maybe group values index) from to ArrayRef, and convert them to Rows together 🤔 .

jayzhan211 commented 3 weeks ago

Another approach #10976 though still beaten by trivial Row approach but the time is close compare to #10937

alamb commented 3 weeks ago

https://github.com/apache/datafusion/pull/10976 is a cool approach. I have been thinking about this more, especially in context of @jayzhan211 's comment here https://github.com/apache/datafusion/issues/10918#issuecomment-2177464410

Here is one potential (intermediate) phase that might be worth exploring. Specifically, change the output type of the first phase of grouping to be StringView -- the reason is that this might help performance but would keep the required changes localized to HashAggregateExec 🤔

                              output DataType from the    
                              final phase could remain    
┌─────────────────────────┐   String                      
│HashAggregateExec        │                               
│(AggregateMode::Final)   │                               
│                         │                               
└─────────────────────────┘                               
             ▲                                            
             │                                            
┌─────────────────────────┐                               
│CoalesceBatches          │   Implement                   
└─────────────────────────┘   StringView/BinaryView       
             ▲                support in RepartitionExec  
             │                and CoalesceBatches         
             │                                            
┌─────────────────────────┐                               
│RepartitionExec          │                               
└─────────────────────────┘                               
             ▲               output DataType of all       
             │               String/Binary *GROUP*        
             │               columns be                   
┌─────────────────────────┐                               
│    HashAggregateExec    │                               
│(AggregateMode::Partial) │                               
└─────────────────────────┘                               
             ▲                                            
             │                                            
             │                                            
             │                                            
       .───────────.                                      
     ,'             `.                                    
    (      Input      )                                   
     '─.           ,─'                                    
        `─────────'