opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.83k stars 1.83k forks source link

[Concurrent Segment Search] shard_min_doc_count and shard_size should not be evaluated at the slice level #8860

Closed jed326 closed 1 year ago

jed326 commented 1 year ago

Subtask for https://github.com/opensearch-project/OpenSearch/issues/7357 to focus on the test failures related to shard size parameter:

REPRODUCE WITH: ./gradlew 'null' --tests "org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountSignificantTermsTest" -Dtests.seed=A749B9CB2092798B -Dtests.locale=en-IE -Dtests.timezone=Europe/Madrid -Druntime.java=17

java.lang.AssertionError: 
Expected: <0>
     but: was <1>
Expected :<0>
Actual   :<1>
<Click to see difference>

    at __randomizedtesting.SeedInfo.seed([A749B9CB2092798B:D251D3457B327719]:0)
    at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
    at org.junit.Assert.assertThat(Assert.java:964)
    at org.junit.Assert.assertThat(Assert.java:930)
    at org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountSignificantTermsTest(TermsShardMinDocCountIT.java:103)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
REPRODUCE WITH: ./gradlew 'null' --tests "org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountTermsTest" -Dtests.seed=A749B9CB2092798B -Dtests.locale=en-IE -Dtests.timezone=Europe/Madrid -Druntime.java=17

java.lang.AssertionError: 
Expected: <0>
     but: was <2>
Expected :<0>
Actual   :<2>
<Click to see difference>

    at __randomizedtesting.SeedInfo.seed([A749B9CB2092798B:BD5353BCAD8763C8]:0)
    at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
    at org.junit.Assert.assertThat(Assert.java:964)
    at org.junit.Assert.assertThat(Assert.java:930)
    at org.opensearch.search.aggregations.bucket.TermsShardMinDocCountIT.testShardMinDocCountTermsTest(TermsShardMinDocCountIT.java:169)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    a
 - org.opensearch.search.aggregations.bucket.ShardSizeTermsIT.testShardSizeEqualsSizeDouble
 - org.opensearch.search.aggregations.bucket.ShardSizeTermsIT.testShardSizeEqualsSizeString
jed326 commented 1 year ago

shard_size and shard_min_doc_count are being evaluated at the slice level which is leading to unexpected results in the tests causing failures.

High Level Overview:

High Level Solutions:

I have a few high level solutions in mind. The big problem is with shard_size. We should have some sort of limit on the number of buckets a slice can collect, but if we ignore shard_size at the slice level then it could grow unbounded. However, introducing any sort of slice level limit on bucket count (let's call this slice_size for now), can result in behavioral changes between concurrent and non-concurrent search use cases if the number of ordinals >> the slice_size. Specifically the doc_count could be inaccurate and missing some docs from some slices. This is analogous to how shard_size behaves in the 1 shard vs multiple shard case. This is true even if we introduce slice level settings, which I don't believe we should do because the user doesn't really have any control or knowledge over how documents are distributed over slices so these controls don't really mean anything to them.

Essentially there are a few different ways we can address both shard_min_doc_count and shard_size and we can combine the solutions in different ways.

shard_min_doc_count:

  1. No changes
  2. Ignore at the slice level
  3. Ignore at the slice level and apply at shard reduce
  4. Introduce new slice level setting + [3]

shard_size:

  1. No changes
  2. Ignore at the slice level
  3. Instead of applying shard_size at the slice level, we use some default higher than shard_size. For example, shard_size defaults to 1.2*size + 10.
  4. Ignore at the slice level, apply at the shard level reduce. This requires using the same comparitor for ordering purposes.
  5. Introduce new slice level setting + [4]

Currently my recommendation is [3] for shard_min_doc_count and [3] for shard_size. I'm split on if we should reduce the number of buckets down to shard_size at the shard level reduce. The intention of shard_size is to give users a way to get more accurate results, so with solution [3] for shard_size I don't see a purpose in doing the reduce here.

jed326 commented 1 year ago

@sohami @reta @andrross Would like to get your input on this. It seems difficult to completely preserve the behavior between concurrent and non-concurrent search cases here.

sohami commented 1 year ago

@jed326 Thanks for the analysis.

The big problem is with shard_size. We should have some sort of limit on the number of buckets a slice can collect, but if we ignore shard_size at the slice level then it could grow unbounded

I didn't understand what you mean my it could grow unbounded. For sequential case as well, the shard_size limit is applied after document collection is done on the shards and before returning the output (i.e. in buildAggregation). So if we ignore the shard_size parameter at slice level but apply it before sending it to the coordinator then still the same bounds can be applied.

I think for both the shard level parameter if we ignore these at slice level and then apply it as part of reduce, it should work as expected. What are the challenges for shard_size option 4 ?

jed326 commented 1 year ago

I didn't understand what you mean my it could grow unbounded. For sequential case as well, the shard_size limit is applied after document collection is done on the shards and before returning the output (i.e. in buildAggregation).

You are right that at the shard level the same bounds would still be applied, so from the coordinator perspective concurrent and non-concurrent search would look the same. However, the unbounded growth I am concerned about here is basically the size of the priority queue at the slice level, which is also the number of buckets being returned at the slice level. Today this is bounded by the shard_size parameter, but if we ignore that then each slice would return a bucket for every single ordinal, which looks like a serious scaling issue to me. I think this is analogous to why shard_size parameter exists at all instead of just returning every single bucket to the coordinator and doing the size bound there.