Open Rachelint opened 1 month ago
take
I will push this forward after tasks listed in https://github.com/apache/datafusion/issues/11680#issuecomment-2368735093 finished
@waynexia may be also interested about this?
Introduce the partitioned hashtable in partial aggregation, and we partition the datafusion before inserting them into hashtable. And we push them into final aggregation partition by partition after, rather than split them again in repartition, and merge them again in coalesce.
I'm not clear on how this proposal works. Could you please explain why it provides performance benefits compared to partial aggregation, exchange, and final aggregation? Is the proposal aimed explicitly at accelerating high cardinality aggregation, or is it intended to enhance aggregation performance?
Introduce the partitioned hashtable in partial aggregation, and we partition the datafusion before inserting them into hashtable. And we push them into final aggregation partition by partition after, rather than split them again in repartition, and merge them again in coalesce.
I'm not clear on how this proposal works. Could you please explain why it provides performance benefits compared to partial aggregation, exchange, and final aggregation? Is the proposal aimed explicitly at accelerating high cardinality aggregation, or is it intended to enhance aggregation performance?
I think it enhances aggregation performance generally?
Currently we can think GroupValues
and GroupAccumulator
uses a single Vec
to manage intermediate states in partial aggr
.
After finishing work in partial aggr
, we pass the batch
to exchange
, then we recompute the hashes
of batch
. Actually the hashes
have been computed in GroupValues
, the this recomputing is the first avoidable cpu cost
.
Then we split the batch
to multiple batches
, according to the partition nubmers
computed from hashes
. The splitting needs to creating multiple new batches
to hold the values from the source batch
, and need to copy data into them, and that is the second avoidable cpu cost
.
Finally, before passing data to final aggr
of the partition, we need to copy the splitted small batches
of the partition to buffer in coalesce
firstly, until the buffer large enough (usually the default batch size 8192), and that is the third avoidable cpu cost
.
After using partitioned approach in GroupValues
and GroupAccumulator
:
We can naturally reuse the computed hashes
in GroupValues
when we calculating the partition numbers
of the batches
.
We store the intermediate states in partial aggr
partition by partition. And when we submit them to final aggr
, we just submit them partition by partition, rather than splitting first and merging after.
I think our goal is to combine partial + repartition + final into single operator, and partial + repartition fusing is the first step of this. After that we could try doing final aggr step as well.
I think our goal is to combine partial + repartition + final into single operator, and partial + repartition fusing is the first step of this. After that we could try doing final aggr step as well.
Yes, it may be attractive if we combine them by someway, we seem to have chance to do more optimizations. 🤔
record batch
from partial
internal states. final
final
's internal states. It seems to be expensive for bytes and string? Maybe we can pass the internal states directly to final
, and avoid some copies?
Is your feature request related to a problem or challenge?
I impl a poc https://github.com/apache/datafusion/pull/12526, and found this idea can actually improve performance.
But for some reasons stated in https://github.com/apache/datafusion/issues/11680#issuecomment-2368735093
I think this improvement is not so suitable to be pushed forward currently.
Just file an issue to track it.
Describe the solution you'd like
partial aggregation
, and we partition the datafusion before inserting them into hashtable.final aggregation
partition by partition after, rather than split them again inrepartition
, and merge them again incoalesce
.Describe alternatives you've considered
No response
Additional context
No response