apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.4k stars 1.21k forks source link

[EPIC] Improve aggregate performance with adaptive sizing in accumulators / avoiding reallocations in accumulators #7065

Open alamb opened 1 year ago

alamb commented 1 year ago

UPDATE: Now that @Rachelint has begun work on #11931 I turned this ticket into an "epic" (aka I'll link related tasks here)

Is your feature request related to a problem or challenge?

Making aggregations fast in datafusion helps its adoption and makes it even cooler

As part of the new #6904 work, @yjshen had an idea https://github.com/apache/arrow-datafusion/pull/6800#discussion_r1251142165 that could avoid a copy in the accumulator implementations:

Describe the solution you'd like

Adaptive sizing(perhaps?): How would the hash table header and states in each accumulator initialize and grow their sizes afterward?

Here is the structure of the current group operator

                                         ┌──────────────┐   ┌──────────────┐   ┌──────────────┐
                                         │┌────────────┐│   │┌────────────┐│   │┌────────────┐│
    ┌─────┐                              ││accumulator ││   ││accumulator ││   ││accumulator ││
    │  5  │                              ││     0      ││   ││     0      ││   ││     0      ││
    ├─────┤                              ││ ┌────────┐ ││   ││ ┌────────┐ ││   ││ ┌────────┐ ││
    │  9  │                              ││ │ state  │ ││   ││ │ state  │ ││   ││ │ state  │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │     │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │  1  │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │     │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    └─────┘                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ └────────┘ ││   ││ └────────┘ ││   ││ └────────┘ ││
                                         │└────────────┘│   │└────────────┘│   │└────────────┘│
    Hash Table                           └──────────────┘   └──────────────┘   └──────────────┘

                                          New NewAccumulator                                   

stores "group indexes"                     There is one GroupsAccumulator per aggregate           
which are indexes into                     (NOT PER GROUP). Internally, each                   
Vec<GroupState>                            GroupsAccumulator manages the state for                
                                           multiple groups                                     

The implementation of this, such as Average is to use a Vec<T>. While this approach is simple to implement, it also means that as the Vec grows, the accumulated vales may be copied (up to 2 times on average, given a doubling strategy)

An alternate, suggested by @yjshen is to segment the state into fixed-sized vectors, allocate a new vector at a time, fill it until full, then create a new vector for upcoming new states.

                                         ┌──────────────┐   ┌──────────────┐   ┌──────────────┐
                                         │┌────────────┐│   │┌────────────┐│   │┌────────────┐│
    ┌─────┐                              ││accumulator ││   ││accumulator ││   ││accumulator ││
    │  5  │                              ││     AGG    ││   ││     SUM    ││   ││     0      ││
    ├─────┤                              ││ ┌────────┐ ││   ││ ┌────────┐ ││   ││ ┌────────┐ ││
    │  9  │                              ││ │ state- │ ││   ││ │ state- │ ││   ││ │ state  │ ││
    ├─────┤                              ││ │segment-│ ││   ││ │segment-│ ││   ││ │        │ ││
    │     │                              ││ │   1    │ ││   ││ │   1    │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │  1  │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    ├─────┤                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    │     │                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
    └─────┘                              ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ │        │ ││   ││ │        │ ││   ││ │        │ ││
                                         ││ └────────┘ ││   ││ └────────┘ ││   ││ └────────┘ ││
                                         ││            ││   ││            ││   │└────────────┘│
    Hash Table                           ││ ┌────────┐ ││   ││ ┌────────┐ ││   └──────────────┘
                                         ││ │ state- │ ││   ││ │ state- │ ││                   
                                         ││ │segment-│ ││   ││ │segment-│ ││                   
                                         ││ │   2    │ ││   ││ │   2    │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ │        │ ││   ││ │        │ ││                   
                                         ││ └────────┘ ││   ││ └────────┘ ││                   
                                         │└────────────┘│   │└────────────┘│                   
                                         └──────────────┘   └──────────────┘                   

Thru this segmented approach, we could avoid memory copy for each resize, which the number of resizing would be great for high cardinality aggs, and grows the size more predictably.

But admittedly, this approach would also bring complexity for both header pointer management and update span multiple vectors.

Implementation Steps:

my-vegetable-has-exploded commented 1 year ago

It seems like deque in cpp ? deque in cpp But what does adaptive mean in with adaptive sizing? thanks, @alamb

alamb commented 1 year ago

But what does adaptive mean in with adaptive sizing?

I think @yjshen meant something like storing the values in Vec<Vec<T>> rather than Vec<T>

And when more space was needed, create a new Vec::with_capacity rather than push to the existing Vec (which might reallocate and copy)

avantgardnerio commented 4 months ago

We (Coralogix) may want to tackle this but combine with ColumnStatistics::distinct_count to only allocate once.

alamb commented 4 months ago

Using statistics to improve the allocation performance certainly seems like a good idea -- though I am not sure the default distinct statistics are very reliable

Rachelint commented 3 months ago

take

Rachelint commented 3 months ago

Working on an poc for performance improvement which is related to this.