elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
1.19k stars 24.85k forks source link

[ML] GetDatafeedRunningStateAction can fail if multiple local datafeed tasks exist #104160

Open droberts195 opened 10 months ago

droberts195 commented 10 months ago

The following exception trace was observed in a serverless project:

java.lang.IllegalStateException: Duplicate key datafeed-v3_linux_system_user_discovery (attempted merging values org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction$Response$RunningState@7d2c9ddc and org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction$Response$RunningState@131739)
    at java.base/java.util.stream.Collectors.duplicateKeyException(Collectors.java:135)
    at java.base/java.util.stream.Collectors.lambda$uniqKeysMapAccumulator$1(Collectors.java:182)
    at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
    at java.base/java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:179)
    at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1858)
    at java.base/java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:762)
    at java.base/java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:276)
    at java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1708)
    at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:509)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:499)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:921)
    at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:682)
    at org.elasticsearch.xcore@8.13.0/org.elasticsearch.xpack.core.ml.action.GetDatafeedRunningStateAction$Response.fromResponses(GetDatafeedRunningStateAction.java:154)
    at org.elasticsearch.ml@8.13.0/org.elasticsearch.xpack.ml.action.TransportGetDatafeedRunningStateAction.newResponse(TransportGetDatafeedRunningStateAction.java:77)
    at org.elasticsearch.ml@8.13.0/org.elasticsearch.xpack.ml.action.TransportGetDatafeedRunningStateAction.newResponse(TransportGetDatafeedRunningStateAction.java:38)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.tasks.TransportTasksAction$1.onCompletion(TransportTasksAction.java:146)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.tasks.TransportTasksAction$1.onCompletion(TransportTasksAction.java:91)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:268)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.CancellableFanOut.lambda$run$0(CancellableFanOut.java:63)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.CancellableFanOut$SubtasksCompletionHandler.run(CancellableFanOut.java:197)
    at org.elasticsearch.base@8.13.0/org.elasticsearch.core.AbstractRefCounted$1.closeInternal(AbstractRefCounted.java:118)
    at org.elasticsearch.base@8.13.0/org.elasticsearch.core.AbstractRefCounted.decRef(AbstractRefCounted.java:70)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.support.RefCountingRunnable.close(RefCountingRunnable.java:112)
    at org.elasticsearch.base@8.13.0/org.elasticsearch.core.IOUtils.close(IOUtils.java:71)
    at org.elasticsearch.base@8.13.0/org.elasticsearch.core.Releasables.close(Releasables.java:34)
    at org.elasticsearch.base@8.13.0/org.elasticsearch.core.Releasables.closeExpectNoException(Releasables.java:58)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListenerImplementations$2.run(ActionListenerImplementations.java:49)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListenerImplementations$RunAfterActionListener.onResponse(ActionListenerImplementations.java:262)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:49)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.TransportService$UnregisterChildTransportResponseHandler.handleResponse(TransportService.java:1709)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1425)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:433)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.handleResponse(InboundHandler.java:382)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.executeResponseHandler(InboundHandler.java:147)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:122)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:96)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:825)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:124)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:96)
    at org.elasticsearch.server@8.13.0/org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:61)
    at org.elasticsearch.transport.netty4@8.13.0/org.elasticsearch.transport.netty4.Netty4MessageInboundHandler.channelRead(Netty4MessageInboundHandler.java:48)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.codec@4.1.94.Final/io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.handler@4.1.94.Final/io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1383)
    at io.netty.handler@4.1.94.Final/io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1246)
    at io.netty.handler@4.1.94.Final/io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1295)
    at io.netty.codec@4.1.94.Final/io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)
    at io.netty.codec@4.1.94.Final/io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)
    at io.netty.codec@4.1.94.Final/io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.transport@4.1.94.Final/io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.transport@4.1.94.Final/io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.transport@4.1.94.Final/io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.transport@4.1.94.Final/io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
    at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
    at io.netty.transport@4.1.94.Final/io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.common@4.1.94.Final/io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.common@4.1.94.Final/io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at java.base/java.lang.Thread.run(Thread.java:1583)

The consequence of the GetDatafeedRunningStateAction failing is that the whole GetDatafeedStatsAction fails.

Analysis of the logs shows that the cause of the duplicate was as follows:

  1. Datafeed was force-stopped - persistent task is deleted from cluster state
  2. Local persistent task executor on ML node notices and cancels the corresponding local task
  3. Local cancellation takes a long time (10+ minutes) due to repeated circuit breaker exceptions on search and indexing nodes leading to repeated retries - usually cancelling the local task would be much quicker, which is why this issue hasn't been seen before
  4. Datafeed is restarted before the first local task is cancelled - this is possible because lack of a persistent task in cluster state is taken to mean that the datafeed isn't running
  5. A second local task for the same datafeed is started, hence there are now two running on the ML node, one cancelled, one starting up
  6. Get datafeed stats is called during this overlap period, calls get datafeed running state as a sub-action, and that gets results from both tasks, leading to the duplicate key error

We can avoid this problem by sorting the responses by creation time and only keeping the state for the most recent task if there is more than one.

elasticsearchmachine commented 10 months ago

Pinging @elastic/ml-core (Team:ML)