Open shiyu-bytedance opened 2 months ago
there can be a number of drivers partialAgg(each run by a driver), but always 1 finalAgg/singleAgg
This is not the case. Final / Single agg can run multi-threaded as long as inputs are hash-partitioned on group-by keys.
@mbasmanova Thank you for the insight. Could this improvement be applicable for cases where input is not hash-partitioned on group-by keys?
Could this improvement be applicable for cases where input is not hash-partitioned on group-by keys?
To run final agg concurrently, one needs to partition data on grouping keys. This can be achieved with local or remote shuffle.
Description
Background
Currently, there can be a number of drivers partialAgg(each run by a driver), but always 1 finalAgg/singleAgg; meaning finalAgg/singleAgg always uses a single core. Our goal is to alter FinalAggregation/SingleAggregation so that it uses all cores on a node.
Current HashTable architecture![Blank diagram (1)](https://github.com/facebookincubator/velox/assets/105019964/364d6d65-3f2b-49ff-a5bb-e44627ad6a7d)
The TagPointerPair array is a contiguous alloc of memory that contains pointers of
Row
payloads in the RowContainer. TheTagPointerPair array
is not thread safe and thus can only be updated by 1 thread at a time.Row
's are allocated from the Operator's MemoryPool. The Operator's MemoryPool is thread unsafe and is a proxy to the global MemoryAllocator. The global MemoryAllocator is threadsafe through an internal mutex.Proposed HashTable architecture![Blank diagram (2)](https://github.com/facebookincubator/velox/assets/105019964/14d2c65b-e007-4b79-be99-e1e0f635aac6)
The overall idea is to eliminate thread unsafe data structures so that finalAgg can use all available cores on a node to run. Assuming a machine with 3 cores, we will create three:
RowContainer itself will remain unchanged.
New finalAgg/SingleAgg addInput flow
Upon receiving an input RowVector, finalAgg/singleAgg logic will remain unchanged until hashes of all input rows are computed. At this point, assuming a machine of 3 cores, the remaining work will be done by 3 threads, each working on a 3-way partitioned range of the total hash range.
New finalAgg/singleAgg getOutput flow
getOutput will remain essentially unchanged because it will retrieve output rows through the rowContainer::listRows() API.
Complication with spilling Writing out spills: Currently, spiller uses rowContainer::listRows() to read rows to spill; this logic can remain as is. Reading back in spills: When spills are read back in, they are merged into a brand new mergedRows_ RowContainer. This logic is orthologonal and unrelated to the proposed changes and additional logic in this doc.
Result measurements Local benchmark A local benchmark demonstrating the gains in finalAgg.addInput path will be authored and results collected. Production Similar to updateSpillFillTime, which times and collects timing for a particular section of Operator logic, we will add similar logic to time finalAgg.addInput and compare the before/after performance.
@mbasmanova @xiaoxmeng @oerl