Open jtuglu-netflix opened 3 weeks ago
Use a configurable (per-query, per-segment) fixed size byte amount that aggregators can take up.
This part of PR description confused me a little bit because the configuration is really at a query level and not at a segment level (which would be a bit hard to set the right limit for).
Use a configurable (per-query, per-segment) fixed size byte amount that aggregators can take up.
This part of PR description confused me a little bit because the configuration is really at a query level and not at a segment level (which would be a bit hard to set the right limit for).
Since this is initialized per-runner (which can be across different segments on different historicals, etc.), this is technically unique per (query-id, segment-id), or more generally per SpecificSegmentQueryRunner
.
This is now tracks at the (query, historical) level.
Adds a way to monitor aggregator map entry increases for TopN queries. This is in response to a problem we've seen in queries where a topN query is done with an expensive aggregator on a high-cardinality dimension. I've reproduced this problem locally, and confirmed that this fixes the issue in most cases. The other cases are outlined in Drawbacks.
Description
Approach
Use a configurable per (query, historical) fixed size byte amount that aggregators can take up. The byte count is maintained at a segment-level during each query runner's pass through a segment (getting result sequence). I opted for this instead of doing a %-of-heap based approach as in high-traffic scenarios, there could be multiple queries racing to allocate some memory for aggregators, and these could all read say, 5% of total available heap (let's say this is permissible % to allocate). If we're already at 80-90%, this could result poorly. Instead, using a fixed amount is a bit more cumbersome, but at least guarantees a realistic upper-bound on how much memory N concurrent queries could theoretically use. Changing to either approach (or another) is easy enough. I found the latter performed more consistently in local testing with artificially low heap sizes and with parallel queries. Another alternative I was thinking was a shared buffer that queries can "borrow" from for doing their queries, where this pool would be shared amongst all running queries. This is a bit like what GroupBy does.
Drawbacks
processing.numMergeBuffers
. I could use this as an "automated" way of determining the available buffer size instead of hardcoding a default constant, for example:some % of maxJVMHeap / # max parallel queries
Release note
Key changed/added classes in this PR
extensions-contrib/distinctcount/src/test/java/org/apache/druid/query/aggregation/distinctcount/DistinctCountTopNQueryTest.java
processing/src/main/java/org/apache/druid/query/topn/BaseTopNAlgorithm.java
processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java
processing/src/main/java/org/apache/druid/query/topn/TopNAggregatorResourceHelper.java
processing/src/main/java/org/apache/druid/query/topn/TopNQuery.java
processing/src/main/java/org/apache/druid/query/topn/TopNQueryEngine.java
processing/src/main/java/org/apache/druid/query/topn/TopNQueryQueryToolChest.java
processing/src/main/java/org/apache/druid/query/topn/TopNQueryRunnerFactory.java
processing/src/test/java/org/apache/druid/query/topn/TopNQueryRunnerFailureTest.java
processing/src/test/java/org/apache/druid/segment/CursorHolderPreaggTest.java
processing/src/test/java/org/apache/druid/segment/incremental/IncrementalIndexCursorFactoryTest.java
This PR has: