apache / pinot

Apache Pinot - A realtime distributed OLAP datastore
https://pinot.apache.org/
Apache License 2.0
5.45k stars 1.28k forks source link

[feature] Support parallel combine and disk spill for groupBy execution #12080

Open wirybeaver opened 10 months ago

wirybeaver commented 10 months ago

When I read the source code Pinot's GroupByExecutor, I found out it lacks of the following features of Druid's GroupByV2Engine:

  1. Spill to disk for merging buffer. Druid ParallelCombiner
  2. Parallel combine when merging sorted aggregation result. Druid will create a combining thread tree for local historical nodes. Druid SpillingGrouper
  //        o         <- non-leaf node
  //     / / \ \      <- ICD = 4
  //  o   o   o   o   <- non-leaf nodes
  // / \ / \ / \ / \  <- LCD = 2
  // o o o o o o o o  <- leaf nodes

Reference: Druid GroupBy Tuning Guide

Druid seems to always sort the aggregate result by default when the Limit pushdown is not enabled as the tuning guide mentioned. I have a strong feeling that integrating DiskSpill feature allows Pinot to process large scale of data and resolve the issue of indeterministic result for groupBy without orderBy, i.e. https://github.com/apache/pinot/issues/11706. In addition, the NonLeaf stage in Multistage V2 can also adopts those two features for partitioned aggregation.

The thread model of Pinot Query execution below:

Raise this issue to solicit opinions from folks. If there's sufficient support, I will write a design doc for leaf stage group by execution.

chenboat commented 10 months ago

What is the main benefits of this feature on group-by? In what cases it will improve the status-quo Can you be more specific?

wirybeaver commented 10 months ago

disk spill: currently the num of group limit is used to avoid OOM and the return result is not accurate. Enable disk spill can get deterministic result. The Apache Druid / Doris and Velox already support disk based group by. parallel combine: speed up the combine process @chenboat

kishoreg commented 10 months ago

This is a good feature to have. Would love to review the design proposal. it will be good to write a stand-alone algorithm that allows us to leverage it in multiple places/operators

wirybeaver commented 9 months ago

I have read Druid's related source code and will share details in recent days.

wirybeaver commented 6 months ago

I did' have bandwidth to fully interpret the Druid groupBy algorithm. But here is the headline version: let say there are 8 workers. The segments are distributed across 8 workers. Each worker has a local hash table which use linear probing to resolve hash collision. Each worker is in charge of storing aggregated result for one partition. If a worker find out the aggregated key doesn't belong to the local partition, the worker would insert the aggregated row to other worker's hash table. If any one local hash table doesn't trigger the disk spilled condition, then there's no need to merge partition. However, if any of table trigger the disk spilled condition, then each worker would not distribute the aggregated row anymore. Each worker just do pre-aggregation internally. In the combination phase, the result would be merged on the fly, combining the result of each hash table's in-memory and on-disk result.

The DuckDB's has similar idea even though there are nuances: https://duckdb.org/2024/03/29/external-aggregation.html Each thread do pre-aggregation without distributing result at all. Instead each worker will partition locally and then another set of threads will merge result per partition in the 2nd approach. They also user linear probing with salt. linear probing is more disk friendly. To support resize efficiently, they implement a two-part aggregate hash table https://duckdb.org/2022/03/07/aggregate-hashtable

The Pinot groupBy currently doesn't support external aggregation and all threads write into a in-memory single concurrent hash map, which uses chaining.

The bustub implements a disk based hash table and use latch crabbing to increase concurrency. https://15445.courses.cs.cmu.edu/spring2024/project2/

@chenboat For your question, I would say achieve low-memory usage purpose for groupby. To be general, each operator can support disk based mode, e.g., external sorting.

wirybeaver commented 6 months ago

Personally, I prefer DuckDB's approach which avoids concurrency at all. The scarifying is additional merge. Here is the summary of Duck blog's: Parallel Grouped Aggregation

Parallel Aggregation Let say we have 4 threads in phase 1. Each thread focus on do aggregation on its own, each thread will do partitioning when #rows > 10000, let say 10 partitions and corresponds to10 hash tables. in phase 2, there will be 10 threads to do merge. Each thread merge a certain partition. for example, the phase2thread5 will merge the result of phase1thread1's partition5, phase1thread2’s partition5, phase1thread3’s partition5, ..

HashTable - Linear probing rather than chaining Disk Friendly. When read next entry due to hash collision, Linear probing approach can leverage cache locality.

Two layer hash table

wirybeaver commented 5 months ago

https://www.databend.com/blog/2024-04-12-towards-efficient-distributed-group-aggregation.md/

dataend execution is incredibly fast. they follow duckdb

wirybeaver commented 5 months ago

Discuss with Jackie and he suggested to single out parallel aggregation from the disk spilled feature.

wirybeaver commented 5 months ago

The doc is in progress: https://docs.google.com/document/d/1GViPFE-wEpz2LIH79a1epr0u1AGcZ4QZzcWIWW77AlM/edit?usp=sharing