apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.3k stars 3.66k forks source link

"java.lang.NullPointerException" occurs when using APPROX_QUANTILE_DS #11544

Open lsee9 opened 2 years ago

lsee9 commented 2 years ago

Affected Version

Description

Hello, I am trying to calculate quantiles using "APPROX_QUANTILE_DS()". but java.lang.NullPointerException occurs in my query. Exception occurs in "org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedMemBuffer".

So, I think this is due to out of memory. (There is not enough memory available for the operation) However, increasing the memory does not solve the problem.

Also, the problem only occurs when using some service codes (e.g. 'top', 'cafe')

What I'm curious about is:

  1. Is it a memory problem? Or do you think there is another cause?
  2. For k=128, how much memory is needed for the quantile operation?
  3. Should I use a different aggregator to compute the quantile?

I don't have any good ideas to solve the problem :(

my query:

SELECT COALESCE("mytable".country, '_') AS country,
  (APPROX_QUANTILE_DS("mytable".quantile_duration, 0.9)) AS quantile
FROM "mytable"
WHERE ("mytable".service_code = 'top')
AND __time >= '2021-06-01' AND __time <= '2021-06-01'
GROUP BY COALESCE("mytable".country, '_')

datasource configuration:

full log:

at  org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedMemBuffer(DirectUpdateDoublesSketch.java:254)
at  org.apache.datasketches.quantiles.DirectUpdateDoublesSketch.growCombinedBuffer(DirectUpdateDoublesSketch.java:238)
at  org.apache.datasketches.quantiles.DoublesMergeImpl.mergeInto(DoublesMergeImpl.java:84)
at  org.apache.datasketches.quantiles.DoublesUnionImpl.updateLogic(DoublesUnionImpl.java:200)
at  org.apache.datasketches.quantiles.DoublesUnionImpl.update(DoublesUnionImpl.java:118)
at  org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchMergeAggregator.updateUnion(DoublesSketchMergeAggregator.java:80)
at  org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchMergeBufferAggregator.aggregate(DoublesSketchMergeBufferAggregator.java:66)
at  org.apache.druid.query.aggregation.AggregatorAdapters.aggregateBuffered(AggregatorAdapters.java:164)
at  org.apache.druid.query.groupby.epinephelinae.AbstractBufferHashGrouper.aggregate(AbstractBufferHashGrouper.java:161)
at  org.apache.druid.query.groupby.epinephelinae.SpillingGrouper.aggregate(SpillingGrouper.java:172)
at  org.apache.druid.query.groupby.epinephelinae.ConcurrentGrouper.aggregate(ConcurrentGrouper.java:269)
at  org.apache.druid.query.groupby.epinephelinae.Grouper.aggregate(Grouper.java:85)
at  org.apache.druid.query.groupby.epinephelinae.RowBasedGrouperHelper.lambda$createGrouperAccumulatorPair$2(RowBasedGrouperHelper.java:332)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
at  org.apache.druid.java.util.common.guava.ConcatSequence.lambda$accumulate$0(ConcatSequence.java:41)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.FilteringAccumulator.accumulate(FilteringAccumulator.java:41)
at  org.apache.druid.java.util.common.guava.MappingAccumulator.accumulate(MappingAccumulator.java:40)
at  org.apache.druid.java.util.common.guava.BaseSequence.accumulate(BaseSequence.java:44)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.FilteredSequence.accumulate(FilteredSequence.java:45)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.ConcatSequence.accumulate(ConcatSequence.java:41)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.MappedSequence.accumulate(MappedSequence.java:43)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.LazySequence.accumulate(LazySequence.java:40)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.java.util.common.guava.SequenceWrapper.wrap(SequenceWrapper.java:55)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner$1.accumulate(SpecificSegmentQueryRunner.java:87)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner.doNamed(SpecificSegmentQueryRunner.java:171)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner.access$100(SpecificSegmentQueryRunner.java:44)
at  org.apache.druid.query.spec.SpecificSegmentQueryRunner$2.wrap(SpecificSegmentQueryRunner.java:153)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.java.util.common.guava.WrappingSequence$1.get(WrappingSequence.java:50)
at  org.apache.druid.query.CPUTimeMetricQueryRunner$1.wrap(CPUTimeMetricQueryRunner.java:78)
at  org.apache.druid.java.util.common.guava.WrappingSequence.accumulate(WrappingSequence.java:45)
at  org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:247)
at  org.apache.druid.query.groupby.epinephelinae.GroupByMergingQueryRunnerV2$1$1$1.call(GroupByMergingQueryRunnerV2.java:234)
at  java.util.concurrent.FutureTask.run(FutureTask.java:266)
at  org.apache.druid.query.PrioritizedListenableFutureTask.run(PrioritizedExecutorService.java:247)
at  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at  java.lang.Thread.run(Thread.java:748)

Any help would be greatly appreciated.

jihoonson commented 2 years ago

Hi @lsee9, thank you for the report. I would call this a bug of Druid because Druid should have returned a better error than NPE. To answer your questions, I think your assessment is correct about the lack of memory. Please see https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch.html for the memory space required per k. I'm not sure why the query exploded in your case though because data seems pretty small. What was druid.processing.buffer.sizeBytes set to in your test? You could try either with a smaller k or a bigger buffer size.

lsee9 commented 2 years ago

Hi @jihoonson, thank you for your reply:) I'll tell you what I tried.

My druid spec:

druid.processing.buffer.sizeBytes=1GiB
druid.processing.numMergeBuffers=10
druid.processing.numThreads=19 (20 core machine)
MaxDirectMemorySize=30g
heap size=32g

BUT, For k = 128, the problem still occurs. Should I increase more??

And, I tried with a smaller k (k = 32, 64). Then no error occurs.(I'm going to check a wider time range(more data)). But the deviation of the values ​​is too large. (e.g. quantile 0 ~ 2000 in some cases) So I don't know if I can use this.

If you have any good ideas, please reply!

jihoonson commented 2 years ago

Hmm, were there lots of values per group-by key by any chance? What does this query return? (BTW, I copied the time filter from your comment, but is that correct? It is identical to __time = '2021-06-01')

SELECT COALESCE("mytable".country, '_') AS country, count(*)
FROM "mytable"
WHERE ("mytable".service_code = 'top')
AND __time >= '2021-06-01' AND __time <= '2021-06-01'
GROUP BY COALESCE("mytable".country, '_')
lsee9 commented 2 years ago

Yes!! the time filter is correct. And NPE occurs when I do my query and turn off auto limit.

The result of running the query you said(ORDER BY count DESC):

{"country":"kr","EXPR$1":490}
{"country":"us","EXPR$1":221}
{"country":"jp","EXPR$1":173}
{"country":"ca","EXPR$1":165}
{"country":"au","EXPR$1":155}
{"country":"de","EXPR$1":147}
{"country":"vn","EXPR$1":138}
{"country":"sg","EXPR$1":130}
{"country":"th","EXPR$1":127}
{"country":"hk","EXPR$1":123}
{"country":"nz","EXPR$1":122}
{"country":"gb","EXPR$1":115}
{"country":"ph","EXPR$1":112}
{"country":"tw","EXPR$1":111}
{"country":"id","EXPR$1":108}
...
{"country":"re","EXPR$1":6}
{"country":"ye","EXPR$1":6}
{"country":"bm","EXPR$1":4}
{"country":"gy","EXPR$1":4}
{"country":"li","EXPR$1":4}
{"country":"mc","EXPR$1":4}
{"country":"tc","EXPR$1":4}
{"country":"kp","EXPR$1":3}
{"country":"ad","EXPR$1":2}
{"country":"so","EXPR$1":2}
{"country":"gw","EXPR$1":1}
{"country":"mq","EXPR$1":1}
{"country":"sy","EXPR$1":1}
total num country: 200

each is not so much...

lsee9 commented 2 years ago

☝️ The above comment is the druid table result. This is the value after already rolling-up with quantilesDoublesSketch and becoming ingestion.

The number of rows in the original table is as follows. query:

SELECT
  country,
  SUM("count") AS total_num_rows_original
FROM "mytable"
WHERE __time >= '2021-06-01' AND __time <= '2021-06-01' AND service_code = 'top'
GROUP BY 1
ORDER BY 2 DESC

query result:

{"country":"kr","total_num_rows_original":1082227280}
{"country":"us","total_num_rows_original":10978845}
{"country":"jp","total_num_rows_original":2896190}
{"country":"ca","total_num_rows_original":2767109}
{"country":"au","total_num_rows_original":1862148}
{"country":"vn","total_num_rows_original":1718031}
{"country":"nz","total_num_rows_original":575751}
{"country":"de","total_num_rows_original":556492}
{"country":"sg","total_num_rows_original":536305}
{"country":"id","total_num_rows_original":425479}
{"country":"hk","total_num_rows_original":373920}
{"country":"ph","total_num_rows_original":364786}
{"country":"","total_num_rows_original":361175}
{"country":"th","total_num_rows_original":360037}
{"country":"my","total_num_rows_original":333746}
{"country":"gb","total_num_rows_original":324027}
{"country":"mx","total_num_rows_original":240169}
{"country":"ae","total_num_rows_original":237182}
...
{"country":"ad","total_num_rows_original":3}
{"country":"gw","total_num_rows_original":3}
{"country":"so","total_num_rows_original":3}
{"country":"mq","total_num_rows_original":1}
{"country":"sy","total_num_rows_original":1}

If total aggregation is performed, the number of original rows is about 81 billion, up to 20 times the value of N in table(https://datasketches.apache.org/docs/Quantiles/OrigQuantilesSketch.html)

But the number of bytes required is 2^36 ~ 2^37 about 81 billion rows, increasing by 1 KB on a log scale. Based on this calculation, 30KB to 32KB seems to be sufficient.

jihoonson commented 2 years ago

I think I see what's going on :slightly_smiling_face:. Does your original query work if you add an extra filter of country <> 'kr'?

lsee9 commented 2 years ago

Yes, it does work if I add extra filter country <> 'kr' ! What do you think is the problem here? Is it the big size of the original rows of table??

jihoonson commented 2 years ago

Yes, I think the problem is too many items per country. Druid uses a fixed-size buffer per row to keep the sketch (DoublesSketch). Since the buffer size is fixed but Druid doesn't know the number of items in advance, it estimates the buffer size to be large enough to hold one billion items in the sketch. So, when you have less items than one billion, the sketch can fit in the buffer and everything works well. The interesting part is when you have more items than one billion. In that case, Druid lets the sketch allocate extra heap memory to hold those items that don't fit in the buffer. However, DoublesSketch is not working as we expected and throws NPE when it tries to allocate more memory. This issue is filed in https://github.com/apache/datasketches-java/issues/358.

As a workaround, you could use other functions to compute approximate quantiles, such as DS_QUANTILES_SKETCH or APPROX_QUANTILE. Note that APPROX_QUANTILE uses the deprecated approximate histogram aggregator and its accuracy might be not great.

lsee9 commented 2 years ago

Yes, I understand! Thanks for your help. I'll try the other function you suggested 😄 .

AlexanderSaydakov commented 2 years ago

Druid version 0.21.1 uses datasketches-java-1.3.0-incubating and datasketches-memory-1.2.0-incubating Would it be possible to try reproducing this with the current code in master, which uses datasketches-java-2.0.0 and datasketches-memory-1.3.0?

AlexanderSaydakov commented 2 years ago

Could someone point to the code that allocates this memory for BufferAggregator please?

AlexanderSaydakov commented 2 years ago

if rebuilding Druid is an option, I would suggest increasing this constant: https://github.com/apache/druid/blob/e9d964d504cb510226d58b2caa299cecfec99e15/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java#L66 It will increase the size of pre-allocated buffers in BufferAggregator, but not drastically. Sketches grow very slowly at that point. I suggest this as a temporary measure until we figure out how to fix this and go through release cycles.

jihoonson commented 2 years ago

Hi @AlexanderSaydakov, thank you for taking a look. It does fail in the Druid master branch. You can easily reproduce it by running DoublesSketchAggregatorTest.buildingSketchesAtQueryTime() after setting DoublesSketchAggregatorFactory.MAX_STREAM_LENGTH to something very low, like 10.

Could someone point to the code that allocates this memory for BufferAggregator please?

Those buffers are allocated in DruidProcessingModule.

leerho commented 2 years ago

See comments in datasketches-java/issues#358.

lsee9 commented 2 years ago

Hi, @leerho, thank you for your reply.

As suggested at datasketches-java/issues#358, If you can support a temporary hacked jar, I'd love to get it!

Please let me know if it is feasible:)

AlexanderSaydakov commented 2 years ago

As Lee Rhodes said, it might take quite a while to fix the root cause and go through release cycles for datasketches-memory and datasketches-java. Therefore I would suggest using the workaround that I mentioned above, namely increasing the MAX_STREAM_LENGTH constant. It affects the size pre-allocated for each sketch in the BufferAggregator. The assumption was that due to data fragmentation across multiple dimensions with power-law distribution only a small number of sketches will reach that size and move to on-heap memory. Since this mechanism is broken now, let's set a much higher limit until it is fixed. And let's do it quickly before 0.22 branch is created. I can do a pull request if we agree on the value.

Here is the size of one slot in the BufferAggregator in bytes for the default sketch parameter K=128 for different values of MAX_STREAM_LENGTH: 1B (current): 24608 10B: 28704 100B: 31776 1T: 34848

I suggest setting to 1T.

jihoonson commented 2 years ago

@leerho @AlexanderSaydakov, do you have a rough schedule for the new release of datasketches-memory and datasketches-java? If it's going to take long, perhaps we could add a config that can temporarily live for a couple of Druid releases to control the size of MAX_STREAM_LENGTH. We could use the current size as default, but users could override it if needed to avoid this error.

AlexanderSaydakov commented 2 years ago

This can take weeks if not months. datasketches-memory is being prepared for a major release, which is not quite ready yet, and datasketches-java depends on it, which means a sequential process with voting stage for each and so on. I like your suggestion to make this parameter configurable. It might be useful even after we fix the root cause. So if you know how to do it quickly, please go ahead.

jihoonson commented 2 years ago

@AlexanderSaydakov thanks, sounds good. I will make a PR soon.

jihoonson commented 2 years ago

I created https://github.com/apache/druid/pull/11574.