apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.2k stars 1.17k forks source link

Manage group values and states by blocks in aggregation #11931

Open Rachelint opened 2 months ago

Rachelint commented 2 months ago

Is your feature request related to a problem or challenge?

Now we manage the group values and the aggregation states by a single big vector growing constantly. This solution is simple to impl, but really leads to some extra cpu cost according to the cpu profile. Maybe we should manage them by blocks like duckdb.

Describe the solution you'd like

It may be a big work, I want to finish it through following steps:

The general design is similar as #7065 , but introduce it into GroupValues, not only GroupAccumulators.

Describe alternatives you've considered

No response

Additional context

The cpu cost flamegraph: https://github.com/Rachelint/drawio-store/blob/main/cpucosts0811.png

Rachelint commented 2 months ago

take

2010YOUY01 commented 2 months ago

Is the plan to manage group values and states in two different kinds of blocks, or a unified block?

There are so many good optimizations in the aggregation code now, they make the implementation a bit hard to understand, I was thinking managing group values + states in a single block could also be a good cleanup

Rachelint commented 2 months ago

Is the plan to manage group values and states in two different kinds of blocks, or a unified block?

There are so many good optimizations in the aggregation code now, they make the implementation a bit hard to understand, I was thinking managing group values + states in a single block could also be a good cleanup

Plan to make them two kinds of blocks, because group values is the inner struct in GroupValues, and states is the one in different GroupAccumulator, seems hard to manage them in a same place.

The design is similar as #7065 , but introduce it into GroupValues, not only GroupAccumulators.

The reason why doing it is according to the cpu flamegraph, the growing of the big single group values in GroupValues and states in respecitve GroupAccumulator seems really cost cpu(due to copy caused by growing).

Rachelint commented 2 months ago

The sketch's detailed design

1. When will the blocked method triggered?

2. Introduce new emit modes used in blocked method

It can support emit multiple blocks in GroupValuess and GroupAccumulators now:

/// Describes how many rows should be emitted during grouping.
#[derive(Debug, Clone, Copy)]
pub enum EmitTo {
    /// Emit all groups
    All,
    /// Emit only the first `n` groups and shift all existing group
    /// indexes down by `n`.
    ///
    /// For example, if `n=10`, group_index `0, 1, ... 9` are emitted
    /// and group indexes '`10, 11, 12, ...` become `0, 1, 2, ...`.
    First(usize),
    /// Emit all groups managed by blocks
    AllBlocks,
    /// Emit only the first `n` group blocks,
    /// similar as `First`, but used in blocked `GroupValues` and `GroupAccumulator`.
    ///
    /// For example, `n=3`, `block size=4`, finally 12 groups will be returned.
    FirstBlocks(usize),
}

For incrementally development for blocked method for so many detailed GroupValues and GroupAccumulator impls. This sketch pr did a lot of compatibility works, and combinations are allowed:

3. Introduce GroupIndices to do communication between GroupValues and GroupAccumulator

One of the problem is how to let GroupAccumulator know the if group indices is flat or blocked? I introduce GroupIndices to make it, but it indeed leads to api change for GroupAccumulator.

pub enum GroupIndices<'a> {
    Flat(&'a [u64]),
    Blocked(&'a [u64]),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GroupIndicesType {
    Flat,
    Blocked,
}

impl GroupIndicesType {
    pub fn typed_group_indices<'a>(&self, indices: &'a [u64]) -> GroupIndices<'a> {
        match self {
            GroupIndicesType::Flat => GroupIndices::Flat(indices),
            GroupIndicesType::Blocked => GroupIndices::Blocked(indices),
        }
    }
}
jayzhan211 commented 2 months ago

/// For example, n= 10, block size=4, n will be aligned to 12, /// and finally 3 blocks will be returned. FirstBlocks(usize)

I think emitting with "n" blocks is much more straightforward. n = 3, block size = 4. emit 3 * 4 = 12 elements

Rachelint commented 2 months ago

/// For example, n= 10, block size=4, n will be aligned to 12, /// and finally 3 blocks will be returned. FirstBlocks(usize)

I think emitting with "n" blocks is much more straightforward. n = 3, block size = 4. emit 3 * 4 = 12 elements

It seems indeed more clear! I have switched to this in codes.

Rachelint commented 2 months ago

@alamb I have finished the draft framework for blocked aggregation intermediate management, and we can incrementally impl blocked method for different GroupAccumulators and GroupValues on it. Minding have a quick look?

The general design can see: https://github.com/apache/datafusion/issues/11931#issuecomment-2283176521

And the pr is here: https://github.com/apache/datafusion/pull/11943

cc @jayzhan211 @2010YOUY01 @JasonLi-cn