Closed ashking94 closed 8 months ago
On checking more, the Circuit Breaker for one of the threads is not getting reset back to zero. Following is the stacktrace:
[java.base/java.lang.Thread.getStackTrace(Thread.java:1602), org.opensearch.common.breaker.ChildMemoryCircuitBreaker.addWithoutBreaking(ChildMemoryCircuitBreaker.java:222), org.opensearch.common.util.BigArrays.adjustBreaker(BigArrays.java:481), org.opensearch.common.util.AbstractArray.close(AbstractArray.java:62), org.opensearch.common.util.io.IOUtils.close(IOUtils.java:89), org.opensearch.common.lease.Releasables.close(Releasables.java:70), org.opensearch.gateway.PersistedClusterStateService$DocumentBuffer.close(PersistedClusterStateService.java:999), org.opensearch.gateway.PersistedClusterStateService$Writer.updateMetadata(PersistedClusterStateService.java:793), org.opensearch.gateway.PersistedClusterStateService$Writer.writeIncrementalStateAndCommit(PersistedClusterStateService.java:690), org.opensearch.gateway.GatewayMetaState$LucenePersistedState.setLastAcceptedState(GatewayMetaState.java:609), org.opensearch.cluster.coordination.CoordinationState.handlePublishRequest(CoordinationState.java:452), org.opensearch.cluster.coordination.Coordinator.handlePublishRequest(Coordinator.java:451), org.opensearch.cluster.coordination.PublicationTransportHandler.acceptState(PublicationTransportHandler.java:224), org.opensearch.cluster.coordination.PublicationTransportHandler.handleIncomingPublishRequest(PublicationTransportHandler.java:206), org.opensearch.cluster.coordination.PublicationTransportHandler.lambda$new$0(PublicationTransportHandler.java:117), org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:106), org.opensearch.transport.InboundHandler$RequestHandler.doRun(InboundHandler.java:480), org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:911), org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52), java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128), java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628), java.base/java.lang.Thread.run(Thread.java:829)]
Exception received on test failure is as follows:
The random seed to reproduce the test faster is -Dtests.seed=F3F7FF2B9B35932E
java.lang.AssertionError: Request breaker not reset to 0 on node: node_s0
Expected: <0L>
but: was <6553888L>
at __randomizedtesting.SeedInfo.seed([221689B6AE8054B1:9A44B4F6414086FB]:0)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
at org.junit.Assert.assertThat(Assert.java:964)
at org.opensearch.test.InternalTestCluster.lambda$ensureEstimatedStats$42(InternalTestCluster.java:2691)
at org.opensearch.test.OpenSearchTestCase.assertBusy(OpenSearchTestCase.java:1086)
at org.opensearch.test.OpenSearchTestCase.assertBusy(OpenSearchTestCase.java:1059)
at org.opensearch.test.InternalTestCluster.ensureEstimatedStats(InternalTestCluster.java:2689)
at org.opensearch.test.TestCluster.assertAfterTest(TestCluster.java:104)
at org.opensearch.test.InternalTestCluster.assertAfterTest(InternalTestCluster.java:2744)
at org.opensearch.test.OpenSearchIntegTestCase.afterInternal(OpenSearchIntegTestCase.java:619)
at org.opensearch.test.OpenSearchIntegTestCase.cleanUpCluster(OpenSearchIntegTestCase.java:2289)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1750)
at com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:996)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.apache.lucene.tests.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:48)
at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
at org.apache.lucene.tests.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:45)
at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
at com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:817)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:468)
at com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:947)
at com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:832)
at com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:883)
at com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:894)
at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.apache.lucene.tests.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:38)
at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.apache.lucene.tests.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
at org.apache.lucene.tests.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:47)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
at java.base/java.lang.Thread.run(Thread.java:1623)
Suppressed: java.lang.AssertionError: Request breaker not reset to 0 on node: node_s0
The test failure is received only in the case of concurrent search, the above exception hints to a memory leak happening in one of the threads. On looking deeper, following looks to be the problem leading to the memory leak:
The test CardinalityWithRequestBreakerIT.testRequestBreaker
creates an index, adds multiple documents to the index, adds a Circuit Breaker Limit Setting and runs a search query with Terms aggregator and Cardinality aggregator as sub aggregator. The search query in the test is expected to throw CircuitBreakingException and ensure that the Circuit Breaker is reset back to zero.
In the concurrent search case, search thread spins up multiple index_searcher threads to search across multiple threads which comes from lucene. Each search thread has an associated SearchContext that contains the releasables object(in this case the releasables are GlobalOrdinalsStringTermsAggregator and CardinalityAggregator per slice). When the search operation is getting executed, the memory for operations during search are allocated by the BigArrays and accounted by the CircuitBreaker. When the CircuitBreakingException is thrown by one of the index_searcher threads, lucene doesn't wait for the other threads to complete execution and the exception is thrown to the search thread. The SearchContext.close() is then called that closes the releasables and all the aggregators. However, in the interim if there is any allocation on the BigArrays by another index_searcher thread, that will not be accounted by the CircuitBreaker as SearchContext is already closed. This leads to memory leak in the test.
This theory is validated by adding a sleep before throwing the exception which solves the issue: https://github.com/neetikasinghal/OpenSearch/commit/f38531a6444725ef007dbc527756ffe54f8a1c3e
The solution for the memory leak can be solved by one of the following choices:
Early terminate the slices for concurrent search if CircuitBreakingException is thrown. If there is CircuitBreakingException exception, thrown by one of the slices, then a flag maintained in SearchContext is set and thus the other slices are terminated early, hence avoiding the memory leak. For reference, https://github.com/opensearch-project/OpenSearch/pull/11731/files shows a draft implementation of the solution (Validated by 500 successful test runs). This solution is limited to CircuitBreakingException and there could be memory leaks with other exception types that have not been yet discovered. Also, there can be a race condition in this case also, where one of the index_searcher thread has reached to an allocation stage when the other thread has thrown an exception leading to memory leak. This is hence not a recommended solution.
When operations are parallelized, like query rewrite, or search, or createWeight, one of the tasks may throw an exception. In that case lucene doesn't wait for all tasks to be completed before re-throwing the exception that were caught. Lucene has already solved this via https://github.com/apache/lucene/commit/1200ecce3a299f798095e04584cc11ac530ddea8. However, this commit is not present in lucene 9.8.0 but in lucene 9.9.0
We need to upgrade to lucene 9.9.0 to take this commit in OpenSearch. If we take this commit, we should be able to solve this and any other issue that can potentially exist in not waiting for other threads to complete although one of the threads has already thrown an exception. I validated this making a new class TaskExecutor.java
in OpenSearch and adding a simplistic logic to wait for other threads as follows:
class TaskExecutor {
private final Executor executor;
private volatile boolean circuitBreakerTripped;
private ExecutionException e;
TaskExecutor(Executor executor) {
this.executor = Objects.requireNonNull(executor, "Executor is null");
circuitBreakerTripped = false;
}
/**
* Execute all the tasks provided as an argument, wait for them to complete and return the
* obtained results.
*
* @param tasks the tasks to execute
* @return a list containing the results from the tasks execution
* @param <T> the return type of the task execution
*/
final <T> List<T> invokeAll(Collection<RunnableFuture<T>> tasks) throws IOException {
for (Runnable task : tasks) {
executor.execute(task);
}
final List<T> results = new ArrayList<>();
for (Future<T> future : tasks) {
try {
results.add(future.get());
} catch (InterruptedException e) {
throw new ThreadInterruptedException(e);
} catch (ExecutionException e) {
circuitBreakerTripped = true;
this.e = e;
// throw IOUtils.rethrowAlways(e.getCause());
}
}
if (circuitBreakerTripped) {
throw IOUtils.rethrowAlways(e.getCause());
}
return results;
}
}
The above solution is also validated by 500 successful test runs.
Hence, I would recommend option 2, which also looks to be a cleaner solution coming from upstream.
@sohami @andrross @reta I would love to hear your thoughts on this.
Thanks @neetikasinghal for looking into this. Taking Lucene side of changes makes sense to me instead of re-implementing it in OpenSearch. Also there is a follow-up PR in lucene to make it cancel already running tasks too. Ref here which will further improve it. We can keep an eye on that for future releases as an improvement.
sohami @andrross @reta I would love to hear your thoughts on this.
Thanks a lot @neetikasinghal , I side with you (and @sohami ) here to rely on Apache Lucene 9.9.x (the https://github.com/opensearch-project/OpenSearch/pull/11421 should be integrated soon).
The reported test have failed in one of the PR builds - https://build.ci.opensearch.org/job/gradle-check/32188/. Reopening this issue.
I am able to reproduce this with seed: -Dtests.seed=998015FD102898B9. This is not specific to concurrent search though, I am able to reproduce this by turning off the concurrent search flag as well. This looks to be a different problem than before, the nested exception is as follows:
Expected: <0L>
but: was <568L>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18)
at org.junit.Assert.assertThat(Assert.java:964)
at org.opensearch.test.InternalTestCluster.lambda$ensureEstimatedStats$42(InternalTestCluster.java:2705)
at org.opensearch.test.OpenSearchTestCase.assertBusy(OpenSearchTestCase.java:1077)
... 44 more
java.lang.RuntimeException: 1 arrays have not been released
at org.opensearch.common.util.MockBigArrays.ensureAllArraysAreReleased(MockBigArrays.java:88)
at org.opensearch.test.OpenSearchTestCase.checkStaticState(OpenSearchTestCase.java:633)
at org.opensearch.test.OpenSearchTestCase.after(OpenSearchTestCase.java:431)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at com.carrotsearch.randomizedtesting.RandomizedRunner.invoke(RandomizedRunner.java:1750)
at com.carrotsearch.randomizedtesting.RandomizedRunner$10.evaluate(RandomizedRunner.java:996)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.apache.lucene.tests.util.TestRuleSetupTeardownChained$1.evaluate(TestRuleSetupTeardownChained.java:48)
at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
at org.apache.lucene.tests.util.TestRuleThreadAndTestName$1.evaluate(TestRuleThreadAndTestName.java:45)
at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
at com.carrotsearch.randomizedtesting.ThreadLeakControl.forkTimeoutingTask(ThreadLeakControl.java:817)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$3.evaluate(ThreadLeakControl.java:468)
at com.carrotsearch.randomizedtesting.RandomizedRunner.runSingleTest(RandomizedRunner.java:947)
at com.carrotsearch.randomizedtesting.RandomizedRunner$5.evaluate(RandomizedRunner.java:832)
at com.carrotsearch.randomizedtesting.RandomizedRunner$6.evaluate(RandomizedRunner.java:883)
at com.carrotsearch.randomizedtesting.RandomizedRunner$7.evaluate(RandomizedRunner.java:894)
at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.apache.lucene.tests.util.TestRuleStoreClassName$1.evaluate(TestRuleStoreClassName.java:38)
at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
at com.carrotsearch.randomizedtesting.rules.NoShadowingOrOverridesOnMethodsRule$1.evaluate(NoShadowingOrOverridesOnMethodsRule.java:40)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at org.apache.lucene.tests.util.TestRuleAssertionsRequired$1.evaluate(TestRuleAssertionsRequired.java:53)
at org.apache.lucene.tests.util.AbstractBeforeAfterRule$1.evaluate(AbstractBeforeAfterRule.java:43)
at org.apache.lucene.tests.util.TestRuleMarkFailure$1.evaluate(TestRuleMarkFailure.java:44)
at org.apache.lucene.tests.util.TestRuleIgnoreAfterMaxFailures$1.evaluate(TestRuleIgnoreAfterMaxFailures.java:60)
at org.apache.lucene.tests.util.TestRuleIgnoreTestSuites$1.evaluate(TestRuleIgnoreTestSuites.java:47)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at com.carrotsearch.randomizedtesting.rules.StatementAdapter.evaluate(StatementAdapter.java:36)
at com.carrotsearch.randomizedtesting.ThreadLeakControl$StatementRunner.run(ThreadLeakControl.java:368)
at java.base/java.lang.Thread.run(Thread.java:829)
I will dive deeper into this to check further.
I am able to figure out the root-cause of the memory leak happening.
During the execution of the index search in the test, GlobalOrdinalsStringTermsAggregator
is initialized that has an initialization of a collection strategy in its constructor.
The collection strategy initialization flow is as follows:
new RemapGlobalOrds()
-> LongKeyedBucketOrds.build()
-> new FromSingle(bigArrays)
-> new ReorganizingLongHash(bigArrays)
-> ReorganizingLongHash
constructor
In the ReorganizingLongHash's constructor, there are two big arrays initialized whose memory is accounted by the Circuit Breaker here.
In the happy case scenario, the GlobalOrdinalsStringTermsAggregator is initialized which initializes the collectionStrategy and the arrays in ReorganizingLongHash's constructor are accounted by the CircuitBreaker. When a CircuitBreakingException is hit on any other code flow, the SearchContext.close() is called which further calls close on GlobalOrdinalsStringTermsAggregator and since the collectionStrategy is not null, close is called on ReorganizingLongHash's arrays as well, accounted by the CircuitBreaker and hence there is no memory leak. However, when the CircuitBreakingException happens during the initialization of the keys array in ReorganizingLongHash's constructor, then the collection strategy is null and hence the ReorganizingLongHash's close is not called which leads to tables array in ReorganizingLongHash's constructor not getting closed, not accounted by the CircuitBreaker and hence leading to memory leak.
In order to deal with this, close needs to be explicitly called in ReorganizingLongHash's constructor when an exception is encountered. This is done as part of the PR https://github.com/opensearch-project/OpenSearch/pull/11953
@reta @andrross would appreciate your feedback on this.
Describe the bug
org.opensearch.search.aggregations.metrics.CardinalityWithRequestBreakerIT.testRequestBreaker
is flaky.To Reproduce org.opensearch.search.aggregations.metrics.CardinalityWithRequestBreakerIT.testRequestBreaker {p0={"search.concurrent_segment_search.enabled":"true"}} org.opensearch.search.aggregations.metrics.CardinalityWithRequestBreakerIT.testRequestBreaker {p0={"search.concurrent_segment_search.enabled":"false"}}
Expected behavior Test should always pass.
Plugins Please list all plugins currently enabled.
Screenshots If applicable, add screenshots to help explain your problem.
Host/Environment (please complete the following information):
Additional context CI - https://build.ci.opensearch.org/job/gradle-check/25992/