elastic / elasticsearch

Free and Open Source, Distributed, RESTful Search Engine
https://www.elastic.co/products/elasticsearch
Other
69.91k stars 24.73k forks source link

FetchSearchResult may leak reference on coordinate node when data nodes disconnect #115056

Open ywangd opened 1 day ago

ywangd commented 1 day ago

The following leak report for FetchSearchResult is observed in an integration test

[2024-10-16T02:23:16,282][ERROR][o.e.t.LeakTracker        ] [[Cleaner-0]] LEAK: resource was not cleaned up before it was garbage-collected.
Recent access records: 
#1:
    in [elasticsearch[node_t0][transport_worker][T#3]][testRandomActivities {seed=[269733E3AE266DE6:753C53E86144F107]}]
    org.elasticsearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:437)
    org.elasticsearch.transport.InboundHandler.handleResponse(InboundHandler.java:382)
    org.elasticsearch.transport.InboundHandler.executeResponseHandler(InboundHandler.java:149)
    org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:124)
    org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:98)
    org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:822)
    org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:125)
    org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:97)
    org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:62)
    org.elasticsearch.transport.netty4.Netty4MessageInboundHandler.channelRead(Netty4MessageInboundHandler.java:58)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    java.base/java.lang.Thread.run(Thread.java:1570)
#2:
    in [elasticsearch[node_t0][transport_worker][T#3]][testRandomActivities {seed=[269733E3AE266DE6:753C53E86144F107]}]
    org.elasticsearch.action.search.ArraySearchPhaseResults.consumeResult(ArraySearchPhaseResults.java:47)
    org.elasticsearch.action.search.CountedCollector.onResult(CountedCollector.java:49)
    org.elasticsearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:230)
    org.elasticsearch.action.search.FetchSearchPhase$2.innerOnResponse(FetchSearchPhase.java:225)
    org.elasticsearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:34)
    org.elasticsearch.action.search.SearchActionListener.onResponse(SearchActionListener.java:19)
    org.elasticsearch.action.ActionListenerResponseHandler.handleResponse(ActionListenerResponseHandler.java:49)
    org.elasticsearch.action.search.SearchTransportService$ConnectionCountingHandler.handleResponse(SearchTransportService.java:713)
    org.elasticsearch.transport.TransportService$UnregisterChildTransportResponseHandler.handleResponse(TransportService.java:1766)
    org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.handleResponse(TransportService.java:1498)
    org.elasticsearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:433)
    org.elasticsearch.transport.InboundHandler.handleResponse(InboundHandler.java:382)
    org.elasticsearch.transport.InboundHandler.executeResponseHandler(InboundHandler.java:149)
    org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:124)
    org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:98)
    org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:822)
    org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:125)
    org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:97)
    org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:62)
    org.elasticsearch.transport.netty4.Netty4MessageInboundHandler.channelRead(Netty4MessageInboundHandler.java:58)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    java.base/java.lang.Thread.run(Thread.java:1570)
Created at:
    in [elasticsearch[node_t0][transport_worker][T#3]][testRandomActivities {seed=[269733E3AE266DE6:753C53E86144F107]}]
    org.elasticsearch.search.fetch.FetchSearchResult.<init>(FetchSearchResult.java:34)
    org.elasticsearch.action.ActionListenerResponseHandler.read(ActionListenerResponseHandler.java:64)
    org.elasticsearch.action.ActionListenerResponseHandler.read(ActionListenerResponseHandler.java:26)
    org.elasticsearch.transport.TransportService$UnregisterChildTransportResponseHandler.read(TransportService.java:1785)
    org.elasticsearch.transport.TransportService$UnregisterChildTransportResponseHandler.read(TransportService.java:1754)
    org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.read(TransportService.java:1489)
    org.elasticsearch.transport.TransportService$ContextRestoreResponseHandler.read(TransportService.java:1476)
    org.elasticsearch.transport.InboundHandler.doHandleResponse(InboundHandler.java:419)
    org.elasticsearch.transport.InboundHandler.handleResponse(InboundHandler.java:382)
    org.elasticsearch.transport.InboundHandler.executeResponseHandler(InboundHandler.java:149)
    org.elasticsearch.transport.InboundHandler.messageReceived(InboundHandler.java:124)
    org.elasticsearch.transport.InboundHandler.inboundMessage(InboundHandler.java:98)
    org.elasticsearch.transport.TcpTransport.inboundMessage(TcpTransport.java:822)
    org.elasticsearch.transport.InboundPipeline.forwardFragments(InboundPipeline.java:125)
    org.elasticsearch.transport.InboundPipeline.doHandleBytes(InboundPipeline.java:97)
    org.elasticsearch.transport.InboundPipeline.handleBytes(InboundPipeline.java:62)
    org.elasticsearch.transport.netty4.Netty4MessageInboundHandler.channelRead(Netty4MessageInboundHandler.java:58)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:689)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:652)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    java.base/java.lang.Thread.run(Thread.java:1570)

It seems that FetchSearchResult arrived after closing ArraySearchPhaseResults is not released correctly. This may happen when fetching data from multiple nodes and a node disconnect while the rest of requests are still in-flight.

elasticsearchmachine commented 1 day ago

Pinging @elastic/es-search (Team:Search)