opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.02k stars 1.67k forks source link

[BUG] Cluster Manager task throttler blocks network threads #13741

Open Bukhtawar opened 1 month ago

Bukhtawar commented 1 month ago

Describe the bug

"opensearch[b869f183befc74cff9f3b5572821ec21][transport_worker][T#7]" #49 daemon prio=5 os_prio=0 cpu=83095452.97ms elapsed=286686.74s tid=0x0000fffe34008180 nid=0x5638 waiting for monitor entry  [0x0000fffef0315000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at java.util.concurrent.ConcurrentHashMap.computeIfPresent(java.base@17.0.9/ConcurrentHashMap.java:1819)
        - waiting to lock <0x00000011e9000000> (a java.util.concurrent.ConcurrentHashMap$Node)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler.onBeginSubmit(ClusterManagerTaskThrottler.java:221)
        at org.opensearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:85)
        at org.opensearch.cluster.service.MasterService.submitStateUpdateTasks(MasterService.java:998)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:373)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:351)
        at org.opensearch.snapshots.SnapshotsService.innerUpdateSnapshotState(SnapshotsService.java:3856)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3951)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3913)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.masterOperation(TransportClusterManagerNodeAction.java:177)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.clusterManagerOperation(TransportClusterManagerNodeAction.java:186)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.lambda$doStart$1(TransportClusterManagerNodeAction.java:292)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction$$Lambda$6044/0x0000008801c64240.accept(Unknown Source)
        at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
        at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.doStart(TransportClusterManagerNodeAction.java:289)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.tryAction(TransportClusterManagerNodeAction.java:239)
        at org.opensearch.action.support.RetryableAction$1.doRun(RetryableAction.java:139)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
        at org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
        at org.opensearch.action.support.RetryableAction.run(RetryableAction.java:117)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:200)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:88)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:218)
        at org.opensearch.indexmanagement.controlcenter.notification.filter.IndexOperationActionFilter.apply(IndexOperationActionFilter.kt:39)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter.apply(FieldCapsFilter.kt:118)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.security.filter.SecurityFilter.apply0(SecurityFilter.java:294)
        at org.opensearch.security.filter.SecurityFilter.apply(SecurityFilter.java:165)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter.apply(PerformanceAnalyzerActionFilter.java:78)
        at org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
        at org.opensearch.action.support.TransportAction.execute(TransportAction.java:188)
        at org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:133)
        at org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:129)
        at org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceivedDecorate(SecuritySSLRequestHandler.java:210)
        at org.opensearch.security.transport.SecurityRequestHandler.messageReceivedDecorate(SecurityRequestHandler.java:323)
        at org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceived(SecuritySSLRequestHandler.java:158)
        at org.opensearch.security.OpenSearchSecurityPlugin$6$1.messageReceived(OpenSearchSecurityPlugin.java:852)
        at org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor$interceptHandler$1.messageReceived(RollupInterceptor.kt:114)
        at org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportRequestHandler.messageReceived(PerformanceAnalyzerTransportRequestHandler.java:43)
        at org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:106)
        at org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:271)
        at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:144)
        at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:127)
        at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:770)
        at org.opensearch.transport.netty4.Netty4MessageChannelHandler$$Lambda$5488/0x00000088013adaa0.accept(Unknown Source)
82.2% (8.2s out of 10s) cpu usage by thread 'opensearch[b869f183befc74cff9f3b5572821ec21][transport_worker][T#6]'
     6/10 snapshots sharing following 87 elements
       java.base@17.0.9/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1819)
       app//org.opensearch.cluster.service.ClusterManagerTaskThrottler.onBeginSubmit(ClusterManagerTaskThrottler.java:221)
       app//org.opensearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:85)
       app//org.opensearch.cluster.service.MasterService.submitStateUpdateTasks(MasterService.java:998)
       app//org.opensearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:373)
       app//org.opensearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:351)
       app//org.opensearch.snapshots.SnapshotsService.innerUpdateSnapshotState(SnapshotsService.java:3856)
       app//org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3951)
       app//org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3913)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.masterOperation(TransportClusterManagerNodeAction.java:177)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.clusterManagerOperation(TransportClusterManagerNodeAction.java:186)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.lambda$doStart$1(TransportClusterManagerNodeAction.java:292)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction$$Lambda$6044/0x0000008801c64240.accept(Unknown Source)
       app//org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
       app//org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
       app//org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.doStart(TransportClusterManagerNodeAction.java:289)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.tryAction(TransportClusterManagerNodeAction.java:239)
       app//org.opensearch.action.support.RetryableAction$1.doRun(RetryableAction.java:139)
       app//org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
       app//org.opensearch.common.util.concurrent.OpenSearchExecutors$DirectExecutorService.execute(OpenSearchExecutors.java:412)
       app//org.opensearch.action.support.RetryableAction.run(RetryableAction.java:117)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:200)
       app//org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.doExecute(TransportClusterManagerNodeAction.java:88)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:218)
       org.opensearch.indexmanagement.controlcenter.notification.filter.IndexOperationActionFilter.apply(IndexOperationActionFilter.kt:39)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.indexmanagement.rollup.actionfilter.FieldCapsFilter.apply(FieldCapsFilter.kt:118)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.security.filter.SecurityFilter.apply0(SecurityFilter.java:294)
       org.opensearch.security.filter.SecurityFilter.apply(SecurityFilter.java:165)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       org.opensearch.performanceanalyzer.action.PerformanceAnalyzerActionFilter.apply(PerformanceAnalyzerActionFilter.java:78)
       app//org.opensearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:216)
       app//org.opensearch.action.support.TransportAction.execute(TransportAction.java:188)
       app//org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:133)
       app//org.opensearch.action.support.HandledTransportAction$TransportHandler.messageReceived(HandledTransportAction.java:129)
       org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceivedDecorate(SecuritySSLRequestHandler.java:210)
       org.opensearch.security.transport.SecurityRequestHandler.messageReceivedDecorate(SecurityRequestHandler.java:323)
       org.opensearch.security.ssl.transport.SecuritySSLRequestHandler.messageReceived(SecuritySSLRequestHandler.java:158)
       org.opensearch.security.OpenSearchSecurityPlugin$6$1.messageReceived(OpenSearchSecurityPlugin.java:852)
       org.opensearch.indexmanagement.rollup.interceptor.RollupInterceptor$interceptHandler$1.messageReceived(RollupInterceptor.kt:114)
       org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportRequestHandler.messageReceived(PerformanceAnalyzerTransportRequestHandler.java:43)
       app//org.opensearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:106)
       app//org.opensearch.transport.InboundHandler.handleRequest(InboundHandler.java:271)
"opensearch[b869f183befc74cff9f3b5572821ec21][transport_worker][T#4]" #46 daemon prio=5 os_prio=0 cpu=80430471.06ms elapsed=289343.25s tid=0x0000fffe7ee12660 nid=0x5635 runnable  [0x0000fffef0614000]
   java.lang.Thread.State: RUNNABLE
        at java.lang.Object.notifyAll(java.base@17.0.9/Native Method)
        at org.apache.logging.log4j.core.async.TimeoutBlockingWaitStrategy.signalAllWhenBlocking(TimeoutBlockingWaitStrategy.java:104)
        - locked <0x0000001001000b10> (a java.lang.Object)
        at com.lmax.disruptor.MultiProducerSequencer.publish(MultiProducerSequencer.java:218)
        at com.lmax.disruptor.RingBuffer.translateAndPublish(RingBuffer.java:990)
        at com.lmax.disruptor.RingBuffer.tryPublishEvent(RingBuffer.java:538)
        at org.apache.logging.log4j.core.async.AsyncLoggerConfigDisruptor.tryEnqueue(AsyncLoggerConfigDisruptor.java:387)
        at org.apache.logging.log4j.core.async.AsyncLoggerConfig.logToAsyncDelegate(AsyncLoggerConfig.java:157)
        at org.apache.logging.log4j.core.async.AsyncLoggerConfig.log(AsyncLoggerConfig.java:138)
        at org.apache.logging.log4j.core.config.LoggerConfig.log(LoggerConfig.java:560)
        at org.apache.logging.log4j.core.config.AwaitCompletionReliabilityStrategy.log(AwaitCompletionReliabilityStrategy.java:82)
        at org.apache.logging.log4j.core.Logger.log(Logger.java:163)
        at org.apache.logging.log4j.spi.AbstractLogger.tryLogMessage(AbstractLogger.java:2168)
        at org.apache.logging.log4j.spi.AbstractLogger.logMessageTrackRecursion(AbstractLogger.java:2122)
        at org.apache.logging.log4j.spi.AbstractLogger.logMessageSafely(AbstractLogger.java:2105)
        at org.apache.logging.log4j.spi.AbstractLogger.logMessage(AbstractLogger.java:2009)
        at org.apache.logging.log4j.spi.AbstractLogger.logIfEnabled(AbstractLogger.java:1878)
        at org.apache.logging.log4j.spi.AbstractLogger.warn(AbstractLogger.java:2757)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler.lambda$onBeginSubmit$0(ClusterManagerTaskThrottler.java:227)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler$$Lambda$6671/0x0000008801f25c08.apply(Unknown Source)
        at java.util.concurrent.ConcurrentHashMap.computeIfPresent(java.base@17.0.9/ConcurrentHashMap.java:1828)
        - locked <0x00000011e9000000> (a java.util.concurrent.ConcurrentHashMap$Node)
        at org.opensearch.cluster.service.ClusterManagerTaskThrottler.onBeginSubmit(ClusterManagerTaskThrottler.java:221)
        at org.opensearch.cluster.service.TaskBatcher.submitTasks(TaskBatcher.java:85)
        at org.opensearch.cluster.service.MasterService.submitStateUpdateTasks(MasterService.java:998)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTasks(ClusterService.java:373)
        at org.opensearch.cluster.service.ClusterService.submitStateUpdateTask(ClusterService.java:351)
        at org.opensearch.snapshots.SnapshotsService.innerUpdateSnapshotState(SnapshotsService.java:3856)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3951)
        at org.opensearch.snapshots.SnapshotsService$UpdateSnapshotStatusAction.clusterManagerOperation(SnapshotsService.java:3913)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.masterOperation(TransportClusterManagerNodeAction.java:177)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction.clusterManagerOperation(TransportClusterManagerNodeAction.java:186)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction.lambda$doStart$1(TransportClusterManagerNodeAction.java:292)
        at org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction$AsyncSingleAction$$Lambda$6044/0x0000008801c64240.accept(Unknown Source)
        at org.opensearch.action.ActionRunnable$2.doRun(ActionRunnable.java:89)
        at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)

Related component

Cluster Manager

To Reproduce

  1. Go to '...'
  2. Click on '....'
  3. Scroll down to '....'
  4. See error

Expected behavior

  1. Network threads shouldn't get blocked.
  2. Task submission should be faster

Additional Details

Plugins Please list all plugins currently enabled.

Screenshots If applicable, add screenshots to help explain your problem.

Host/Environment (please complete the following information):

Additional context Add any other context about the problem here.

peternied commented 1 month ago

[Triage - attendees 1 2 3 4 5 6 7 8 9 10] @Bukhtawar thank you for creating this issue to track this problem. In the future please include more details so its more clear how to identify this issue and how to know if it is resolved

rajiv-kv commented 3 weeks ago

Looking at the stack-trace of thread dump, the workers are waiting for lock on the throttling-key (update-snapshot-state), and the worker holding the lock is busy logging to file. The log is output at WARN level whenever task is throttled just before failing the transport call.

One possible fix to avoid lock contention, is to move out the following two code blocks outside of compute-if-absent

In that way the critical section is limited to incrementing the request count and not any other computation. This would avoid transport-workers getting blocked, if all of them happen to enqueue the tasks that belong to same throttling-key.

@Bukhtawar @shwetathareja - Thoughts ?

Bukhtawar commented 3 weeks ago

Do you think we should move the task submission and throttling logic off network threads to avoid getting into retry loops and stalling transport?

rajiv-kv commented 3 weeks ago

I see, the task submission involves the following two operations

Let me know if i am missing something.

Both of these operations, looks to be light-weight based on the current implementation and should not incur time. I think we can profile submitTask and evaluate if some of the operations needs to be moved to a background thread (working on the snapshot of PendingTask queue). For instance, the throttling decider can be moved to background thread and submitTask can only enforce the throttling decision.

On the retry of request once throttled, i think this should happen from the caller, and not add retry handlers in cluster-manager. We can have cluster-manager send additional signals about PendingTaskQueue once the request is throttled, which can be used by caller to decide on retries.