opensearch-project / ml-commons

ml-commons provides a set of common machine learning algorithms, e.g. k-means, or linear regression, to help developers build ML related features within OpenSearch.
Apache License 2.0
83 stars 118 forks source link

[BUG] Conversational search random timeout exception #2334

Open ylwu-amzn opened 2 months ago

ylwu-amzn commented 2 months ago

What is the bug? Test on a multi-node cluster and see timeout exception randomly frequently

{
  "error": {
    "root_cause": [
      {
        "type": "timeout_exception",
        "reason": "java.util.concurrent.TimeoutException: Timeout waiting for task."
      }
    ],
    "type": "timeout_exception",
    "reason": "java.util.concurrent.TimeoutException: Timeout waiting for task.",
    "caused_by": {
      "type": "timeout_exception",
      "reason": "Timeout waiting for task."
    }
  },
  "status": 500
}

How can one reproduce the bug? Steps to reproduce the behavior:

  1. Follow this doc https://opensearch.org/docs/latest/search-plugins/conversational-search/

What is the expected behavior? From error log, it's caused by https://github.com/opensearch-project/ml-commons/blob/2.11/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L98

            GetInteractionsResponse response = client
                .execute(GetInteractionsAction.INSTANCE, new GetInteractionsRequest(conversationId, maxResults, from))
                .actionGet(DEFAULT_TIMEOUT_IN_MILLIS);

Should not use actionGet which will block thread. Suggest change to use action listener

What is your host/environment?

Do you have any screenshots? No

Do you have any additional context? Error log

[2024-04-17T22:08:54,044][WARN ][r.suppressed             ] [1a96c8c1276b3e88ebb2d33448e12e1e] path: /test_population_data/_search, params: {pretty=true, search_pipeline=demo_rag_pipeline, index=test_population_data}
SearchPipelineProcessingException[OpenSearchTimeoutException[java.util.concurrent.TimeoutException: Timeout waiting for task.]; nested: TimeoutException[Timeout waiting for task.];]; nested: OpenSearchTimeoutException[java.util.concurrent.TimeoutException: Timeout waiting for task.]; nested: TimeoutException[Timeout waiting for task.];
        at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:199)
        at org.opensearch.search.pipeline.PipelinedRequest.transformResponse(PipelinedRequest.java:31)
        at org.opensearch.action.search.TransportSearchAction.lambda$executeRequest$0(TransportSearchAction.java:453)
        at org.opensearch.core.action.ActionListener$1.onResponse(ActionListener.java:82)
        at org.opensearch.core.action.ActionListener$5.onResponse(ActionListener.java:268)
        at org.opensearch.action.search.AbstractSearchAsyncAction.sendSearchResponse(AbstractSearchAsyncAction.java:707)
        at org.opensearch.action.search.ExpandSearchPhase.run(ExpandSearchPhase.java:132)
        at org.opensearch.action.search.SearchPhase.recordAndRun(SearchPhase.java:59)
        at org.opensearch.action.search.AbstractSearchAsyncAction.executePhase(AbstractSearchAsyncAction.java:456)
        at org.opensearch.action.search.AbstractSearchAsyncAction.executeNextPhase(AbstractSearchAsyncAction.java:440)
        at org.opensearch.action.search.FetchSearchPhase.moveToNextPhase(FetchSearchPhase.java:298)
        at org.opensearch.action.search.FetchSearchPhase.lambda$innerRun$1(FetchSearchPhase.java:138)
        at org.opensearch.action.search.CountedCollector.countDown(CountedCollector.java:66)
        at org.opensearch.action.search.ArraySearchPhaseResults.consumeResult(ArraySearchPhaseResults.java:61)
        at org.opensearch.action.search.CountedCollector.onResult(CountedCollector.java:74)
        at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:243)
        at org.opensearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:238)
        at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:59)
        at org.opensearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:44)
        at org.opensearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:70)
        at org.opensearch.action.search.SearchTransportService$ConnectionCountingHandler.handleResponse(SearchTransportService.java:744)
        at org.opensearch.transport.TransportService$6.handleResponse(TransportService.java:897)
        at org.opensearch.security.transport.SecurityInterceptor$RestoringTransportResponseHandler.handleResponse(SecurityInterceptor.java:419)
        at org.opensearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1516)
        at org.opensearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:411)
        at org.opensearch.transport.InboundHandler.handleResponse(InboundHandler.java:403)
        at org.opensearch.transport.InboundHandler.messageReceived(InboundHandler.java:168)
        at org.opensearch.transport.InboundHandler.inboundMessage(InboundHandler.java:123)
        at org.opensearch.transport.TcpTransport.inboundMessage(TcpTransport.java:770)
        at org.opensearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:175)
        at org.opensearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:150)
        at org.opensearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:115)
        at org.opensearch.transport.netty4.Netty4MessageChannelHandler.channelRead(Netty4MessageChannelHandler.java:95)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:280)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1471)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1334)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1383)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: OpenSearchTimeoutException[java.util.concurrent.TimeoutException: Timeout waiting for task.]; nested: TimeoutException[Timeout waiting for task.];
        at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:96)
        at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:79)
        at org.opensearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:68)
        at org.opensearch.searchpipelines.questionanswering.generative.client.ConversationalMemoryClient.getInteractions(ConversationalMemoryClient.java:98)
        at org.opensearch.searchpipelines.questionanswering.generative.GenerativeQAResponseProcessor.processResponse(GenerativeQAResponseProcessor.java:138)
        at org.opensearch.search.pipeline.Pipeline.transformResponse(Pipeline.java:177)
        ... 60 more
Caused by: java.util.concurrent.TimeoutException: Timeout waiting for task.
        at org.opensearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:257)
        at org.opensearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:82)
        at org.opensearch.common.util.concurrent.FutureUtils.get(FutureUtils.java:94)
        ... 65 more
ylwu-amzn commented 2 months ago

Change to implement SearchResponseProcessor.processResponseAsync method should be good, @austintlee , we need to prioritize this issue. Do you have enough bandwidth ?

cc @mashah @jonfritz

ylwu-amzn commented 2 months ago

@austintlee Find some other places

https://github.com/opensearch-project/ml-commons/blob/main/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L61

        CreateConversationResponse response = client
            .execute(CreateConversationAction.INSTANCE, new CreateConversationRequest(name))
            .actionGet(DEFAULT_TIMEOUT_IN_MILLIS);

https://github.com/opensearch-project/ml-commons/blob/main/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L82C13-L82C23

        CreateInteractionResponse res = client
            .execute(
                CreateInteractionAction.INSTANCE,
                new CreateInteractionRequest(conversationId, input, promptTemplate, response, origin, additionalInfo)
            )
            .actionGet(DEFAULT_TIMEOUT_IN_MILLIS);

https://github.com/opensearch-project/ml-commons/blob/main/search-processors/src/main/java/org/opensearch/searchpipelines/questionanswering/generative/client/ConversationalMemoryClient.java#L127

            GetInteractionsResponse response = client
                .execute(GetInteractionsAction.INSTANCE, new GetInteractionsRequest(conversationId, maxResults, from))
                .actionGet(DEFAULT_TIMEOUT_IN_MILLIS);