elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
69.87k stars 24.72k forks source link

ES|QL: broken "duplicate" aggs at the coordinator/data nodes boundary #113393

Open astefan opened 3 weeks ago

astefan commented 3 weeks ago

Description

  1. from employees | stats cd1=count_distinct(salary, 3000), cd2=count_distinct(salary, 3000 + 1000 - 1000), cd3=count_distinct(salary, 1000)

fails with

        "type": "illegal_argument_exception",
        "reason": null,
        "stack_trace": "java.lang.IllegalArgumentException at org.elasticsearch.server@9.0.0-SNAPSHOT/org.elasticsearch.search.aggregations.metrics.HyperLogLogPlusPlus.merge(HyperLogLogPlusPlus.java:170)
at org.elasticsearch.compute.aggregation.HllStates$SingleState.merge(HllStates.java:115)
at org.elasticsearch.compute.aggregation.CountDistinctIntAggregator.combineIntermediate(CountDistinctIntAggregator.java:33)
at org.elasticsearch.compute.aggregation.CountDistinctIntAggregatorFunction.addIntermediateInput(CountDistinctIntAggregatorFunction.java:143)
at org.elasticsearch.compute.aggregation.Aggregator.processPage(Aggregator.java:41)
at org.elasticsearch.compute.operator.AggregationOperator.addInput(AggregationOperator.java:99)
at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:266)

The problem is visible on the data nodes which know that the sink they need to write to should have three channels (one for each aggregation), but on the coordinator node (in the AggregateMapper) there are only two intermediate attributes created (the first two count_distincts are identical so one attribute).

At that point in code (AggregateMapper) unexpectedly(?) "duplicated" aggregations are present. In the logical optimizer there is a rule that deduplicates identical aggregates - ReplaceStatsAggExpressionWithEval - but it's doing that before any folding takes place.

  1. thanks to @ivancea for uncovering this one

from employees | stats m = median(salary_change), p50 = percentile(salary_change, 50), count = count(salary_change)

fails with

        "type": "class_cast_exception",
        "reason": "class org.elasticsearch.compute.data.BytesRefVectorBlock cannot be cast to class org.elasticsearch.compute.data.LongBlock (org.elasticsearch.compute.data.BytesRefVectorBlock and org.elasticsearch.compute.data.LongBlock are in unnamed module of loader java.net.FactoryURLClassLoader @1c8e8fed)",
        "stack_trace": "java.lang.ClassCastException: class org.elasticsearch.compute.data.BytesRefVectorBlock cannot be cast to class org.elasticsearch.compute.data.LongBlock (org.elasticsearch.compute.data.BytesRefVectorBlock and org.elasticsearch.compute.data.LongBlock are in unnamed module of loader java.net.FactoryURLClassLoader @1c8e8fed)
at org.elasticsearch.compute.aggregation.CountAggregatorFunction.addIntermediateInput(CountAggregatorFunction.java:117)
at org.elasticsearch.compute.aggregation.Aggregator.processPage(Aggregator.java:41)
at org.elasticsearch.compute.operator.AggregationOperator.addInput(AggregationOperator.java:99)
at org.elasticsearch.compute.operator.Driver.runSingleLoopIteration(Driver.java:266)

For this one the intermediate attributes are deduplicated in the AggregateMapper and median and percentile are considered identical because the median is rewritten as percentile(salary, 50).

At the logical optimizer level, again ReplaceStatsAggExpressionWithEval cannot do the deduplication of aggregations because SubstituteSurrogates runs after it.

elasticsearchmachine commented 3 weeks ago

Pinging @elastic/es-analytical-engine (Team:Analytics)

not-napoleon commented 3 weeks ago

I wrote a CSV test that reproduces the first of these using only a ROW command to eliminate any data dependency:

count distinct bug
ROW salary = 5.2 
| STATS cd1=count_distinct(salary, 1000), cd2=count_distinct(salary, 3000 - 1000 + 1000), cd3=count_distinct(salary, 3000)
;

cd1:long|cd2:long|cd3:long
1       |1       |1
;