opensearch-project / index-management

🗃 Automate periodic data operations, such as deleting indices at a certain age or performing a rollover at a certain size
https://opensearch.org/docs/latest/im-plugin/index/
Apache License 2.0
53 stars 108 forks source link

[BUG] ISM listener for cluster state changes is running expensive nodes info call as part of listener and blocking Cluster Applier thread #1075

Open shwetathareja opened 6 months ago

shwetathareja commented 6 months ago

What is the bug? For any cluster state change, plugins can attach their listeners and execute the the desired code functionality. These listeners are executed in the ClusterApplierService#updateTask threadpool which is single threaded and blocks processing and applying of any new state updates. ISM has also registered a listener which does expensive node info call which is a broadcast call in one of its listener. On top of it, this listener is getting executed on every node which looks like un-necessary overhead. In one of the large cluster noticed, applier thread on multiple nodes was busy doing this for minutes.

https://github.com/opensearch-project/index-management/blob/6270b2cbd0a344f0e26978bbb1b35aac9cba6d20/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/PluginVersionSweepCoordinator.kt#L59

 override fun clusterChanged(event: ClusterChangedEvent) {
        if (event.nodesChanged() || event.isNewCluster) {
            skipExecution.sweepISMPluginVersion()
            initBackgroundSweepISMPluginVersionExecution()
        }

How can one reproduce the bug? Anytime cluster is bootstrapped for the first time or nodes join/ leave the cluster, this listener will be executed.

What is the expected behavior?

  1. Any expensive call shouldn't be executed in blocking fashion in the cluster state listener as it blocks other cluster state updates processing and should be moved to background thread.
  2. This listener is executed on every node so this will result in n nodes info from every node which itself is a broadcast action. why is this skip logic needs to be executed on every node?
  3. Is there an opportunity to simplify this skip logic and not depend on nodes info call?

Do you have any screenshots?

::: {da0d0d627631a963288888e4cfa7772c}{HtLQ7pYWQJu9nuKQ7z0akA}{gSQUDGL5RVKKQmIAk8YW4g}
   Hot threads at 2024-01-17T08:27:21.215Z, interval=500ms, busiestThreads=3, ignoreIdleThreads=true:

   100.3% (501.2ms out of 500ms) cpu usage by thread 'opensearch[da0d0d627631a963288888e4cfa7772c][clusterApplierService#updateTask][T#1]'
     10/10 snapshots sharing following 51 elements
       app//org.opensearch.core.common.io.stream.StreamOutput.lambda$writeOptionalArray$27(StreamOutput.java:963)
       app//org.opensearch.core.common.io.stream.StreamOutput$$Lambda$5615/0x0000000801b36258.write(Unknown Source)
       app//org.opensearch.core.common.io.stream.StreamOutput.writeArray(StreamOutput.java:932)
       app//org.opensearch.core.common.io.stream.StreamOutput.writeOptionalArray(StreamOutput.java:945)
       app//org.opensearch.core.common.io.stream.StreamOutput.writeOptionalArray(StreamOutput.java:963)
       app//org.opensearch.action.support.nodes.BaseNodesRequest.writeTo(BaseNodesRequest.java:131)
       app//org.opensearch.action.admin.cluster.node.info.NodesInfoRequest.writeTo(NodesInfoRequest.java:167)
       app//org.opensearch.action.admin.cluster.node.info.TransportNodesInfoAction$NodeInfoRequest.writeTo(TransportNodesInfoAction.java:146)
       app//org.opensearch.transport.OutboundMessage.writeMessage(OutboundMessage.java:104)
       app//org.opensearch.transport.OutboundMessage.serialize(OutboundMessage.java:81)
       app//org.opensearch.transport.OutboundHandler$MessageSerializer.get(OutboundHandler.java:235)
       app//org.opensearch.transport.OutboundHandler$MessageSerializer.get(OutboundHandler.java:221)
       app//org.opensearch.transport.OutboundHandler$SendContext.get(OutboundHandler.java:275)
       app//org.opensearch.transport.OutboundHandler.internalSend(OutboundHandler.java:197)
       app//org.opensearch.transport.OutboundHandler.sendMessage(OutboundHandler.java:192)
       app//org.opensearch.transport.OutboundHandler.sendRequest(OutboundHandler.java:129)
       app//org.opensearch.transport.TcpTransport$NodeChannels.sendRequest(TcpTransport.java:320)
       app//org.opensearch.transport.TransportService.sendRequestInternal(TransportService.java:1038)
       app//org.opensearch.transport.TransportService$$Lambda$3987/0x0000000800fc25f0.sendRequest(Unknown Source)
       com.amazonaws.elasticsearch.iam.IamTransportRequestSender.sendRequest(IamTransportRequestSender.java:94)
       com.amazonaws.elasticsearch.ccs.CrossClusterRequestInterceptor$AddHeaderSender.sendRequest(CrossClusterRequestInterceptor.java:132)
       app//org.opensearch.transport.TransportService.sendRequest(TransportService.java:924)
       app//org.opensearch.transport.TransportService.sendRequest(TransportService.java:861)
       app//org.opensearch.action.support.nodes.TransportNodesAction$AsyncAction.start(TransportNodesAction.java:264)
       app//org.opensearch.action.support.nodes.TransportNodesAction.doExecute(TransportNodesAction.java:153)
       app//org.opensearch.action.support.nodes.TransportNodesAction.doExecute(TransportNodesAction.java:70)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:218)
       org.opensearch.indexmanagement.controlcenter.notification.filter.IndexOperationActionFilter.apply(IndexOperationActionFilter.kt:39)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter.apply(FieldCapsFilter.kt:118)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter.apply(PerformanceAnalyzerActionFilter.java:78)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       app//org.opensearch.action.support.TransportAction.execute(TransportAction.java:188)
       app//org.opensearch.action.support.TransportAction.execute(TransportAction.java:107)
       app//org.opensearch.client.node.NodeClient.executeLocally(NodeClient.java:110)
       app//org.opensearch.client.node.NodeClient.doExecute(NodeClient.java:97)
       app//org.opensearch.client.support.AbstractClient.execute(AbstractClient.java:476)
       org.opensearch.indexmanagement.indexstatemanagement.SkipExecution.sweepISMPluginVersion(SkipExecution.kt:37)
       org.opensearch.indexmanagement.indexstatemanagement.PluginVersionSweepCoordinator.clusterChanged(PluginVersionSweepCoordinator.kt:61)
       app//org.opensearch.cluster.service.ClusterApplierService.callClusterStateListener(ClusterApplierService.java:625)
       app//org.opensearch.cluster.service.ClusterApplierService.callClusterStateListeners(ClusterApplierService.java:612)
       app//org.opensearch.cluster.service.ClusterApplierService.applyChanges(ClusterApplierService.java:577)
       app//org.opensearch.cluster.service.ClusterApplierService.runTask(ClusterApplierService.java:484)
       app//org.opensearch.cluster.service.ClusterApplierService$UpdateTask.run(ClusterApplierService.java:186)
       app//org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingRunnable.run(ThreadContext.java:858)
       app//org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor$TieBreakingPrioritizedRunnable.runAndClean(PrioritizedOpenSearchThreadPoolExecutor.java:282)
       app//org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor$TieBreakingPrioritizedRunnable.run(PrioritizedOpenSearchThreadPoolExecutor.java:245)
       java.base@17.0.6/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
       java.base@17.0.6/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
       java.base@17.0.6/java.lang.Thread.run(Thread.java:833)
bowenlan-amzn commented 5 months ago

This listener is executed on every node so this will result in n nodes info from every node which itself is a broadcast action. why is this skip logic needs to be executed on every node?

This listener is to figure out whether there are different versions of ISM plugin in the cluster and stop the node from execution if there are. Each node does this separately.

bowenlan-amzn commented 1 month ago

As a short term solution, we can try switch thread for doing the node info call, in kotlin way thread would be coroutine instead. Long term solution is to provide this mechanism of knowing cluster upgrade status from core probably.