Open yuzefovich opened 3 years ago
We have marked this issue as stale because it has been inactive for 18 months. If this issue is still relevant, removing the stale label or adding a comment will keep it active. Otherwise, we'll close it in 10 days to keep the issue queue tidy. Thank you for your contribution to CockroachDB!
57817 added the ability to spill to disk when the hash aggregator exceeds its memory limit (which is
workmem/2
, 32 MiB by default). In order to be able to spill the aggregation we have added aspillingQueue
that tracks all input tuples (the query can use another half ofworkmem
limit) which is needed in order to "export all buffered" tuples (the concept that ourdiskSpiller
infrastructure requires). The tracking does have a performance hit (the microbenchmarks showed from 20-30% to 2x hit, I also saw 30% hit on TPCH Q1) which is incurred even when the aggregator doesn't need to fallback to the external strategy.The external aggregator uses the hash-based partitioner which applies the logic of Grace hash join where "join" is replaced with "aggregation". It works by fully consuming the input source, dividing it up into partitions based on the grouping columns, and aggregates each partition separately (using the in-memory hash aggregator); the partitions are split recursively until they fit under the limit or they need to be processed using the "external sort + ordered aggregation" strategy.
Ideally, we would refactor that code so that the hash aggregator can spill to disk if necessary without incurring the cost of tracking the whole input set of tuples. The difficulty there is transforming the in-memory state of the hash aggregator (that it has accumulated before hitting the limit) into a form that the external aggregator can somehow consume. That in-memory state consists of two main parts:
buckets
)ht.vals
which correspond 1:1 tobuckets
).One option for how to export this state is to transform each bucket with its grouping column values into a tuple (in effect, we would be "synthesizing" or "serializing" a "fake" tuple), then the external aggregator would consume these serialized tuples as if they were coming from the input source itself.
The complication here is that some intermediate results cannot be expressed as a single column value (for example, for
AVG
we need to export bothsum
andcount
). The crux of the approach here would be to figure out how to do this serialization for each of the aggregate functions (including the ones handled by the vectorized default aggregate function) and make it play nicely with the existing code of the hash aggregator.Another option is not trying to export the state for later consumption of the external aggregator as if it came from the input source, but to somehow "inject" the relevant state into the hash aggregator for each of the partitions. What I imagine here is roughly as follows:
buckets
andht.vals
) are exposed for direct accessbuckets
into each partition, that data is not spilled to disk; instead, the external aggregator keeps in-memory state about the assignment of the "fake partitions" to match the actual partitions made later (and updates the fake ones on the recursive repartitioning accordingly)buckets
andvals
(based on the "fake partition" assignment) are injected into it, and then the in-memory aggregator runs on the spilled tuples coming from the actual partitions.This approach seems a bit hacky and breaks the abstraction barriers a bit more than the first one. It is also currently not clear to me how we will implement the fallback strategy of the "external sort + ordered aggregator" - we don't have the full tuple set to sort there. Probably we could implement by handling a partition one group at a time (choose a single tuple, then iterate over all tuples looking for matches, somehow mark all of these tuples as processed - maybe write a new partition without the processed tuples - and feed the matches into the ordered aggregator).
Jira issue: CRDB-3292