reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.57k stars 640 forks source link

LEAK: ByteBuf.release() was not called before it's garbage-collected - resolution: buffer is not released when DataBuffer#asInputStream() #1746

Closed Azbesciak closed 3 years ago

Azbesciak commented 3 years ago

Hello, I am expecting a strange exception; it occurs only on the production service with production load (docker/ubuntu , I tried to replicate both on the production (docker) machine (same image, different run) and on my own windows machine, also in docker for windows - no success - I executed about 30k requests with different frequency, also tried with env var IO_NETTY_LEAK_DETECTION_LEVEL="paranoid").

On the production, it shows up some short time (2-3 mins, about 1k requests executed to external service) after a server boot, when it receives requests. However, it occurs also later (for 1h about 10 times), and have a little bit different stack traces; these below:

1

[reactor-http-epoll-1] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)

2

[reactor-http-epoll-1] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:569)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)

3

io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
    reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:341)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:267)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnComplete(RateLimiterSubscriber.java:54)
    reactor.core.publisher.BaseSubscriber.onComplete(BaseSubscriber.java:197)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)

Expected Behavior

No leak, after 3 days this service consumes 2x more ram than at the beginning.

Steps to Reproduce

No idea. Maybe it is worth mentioning I cache WebClient for each url path (without query params); later it is assigned to the field, but created at the very beginning (after boot). I had this issue on spring boot 2.3.2, migrated to 2.5.2 but no difference.

Your Environment

Ubuntu 18.04.3 LTS Docker 19.03.5, build 633a0ea838 docker base image: adoptopenjdk/openjdk16:jre-16.0.1_9-alpine

OlegDokuka commented 3 years ago

@Azbesciak any code samples which can repo this? Any reference of workflow that causes this. Without seeing it is impossible to say where is the problem.

Also, the root cause can be in your code (also, it can be in reactor-core) since we can not guarantee that you discard netty buffers in your pipeline

Azbesciak commented 3 years ago

As I said - no idea. I use WebClient provision bean as below

@Bean
    fun webClient(builder: WebClient.Builder) = { url: String -> builder.baseUrl(url).build() }

Then consume it at the service provision, for example

@Configuration
internal class ServiceConfiguration(
        private val client: (String) -> WebClient,
        private val rateLimiterProvider: RateLimiterProvider,
)  {
    // lazy because it can be used by multiple beans
    private val service by lazy {
        SomeService(
            client(someUrl),
            rateLimiterProvider
        )
    }

    @Bean("some-service")
    override fun myService() = service 

and inside the service I execute get calls like this

client.get()
          .uri {
              params.forEach { (name, value) -> it.queryParam(name, value) } // params is Array<Pair<String, String>>
              it.build()
          }.retrieve()
          .bodyToMono(resultClass)
          .transformDeferred(RateLimiterOperator.of(rateLimiterProvider[javaClass.simpleName]))

Rate limiter below, whitout this I also noticed some leaks... but to be honest, these are more noticable after adding it

class RateLimiterProvider(private val spec: RateLimiterSpec) {
    private val requestLimiters = ConcurrentHashMap<String, RateLimiter>()
    operator fun get(key: String) = requestLimiters.computeIfAbsent(key) {
        val instanceSpec = spec.default
        RateLimiter.of(
            key,
            RateLimiterConfig.custom()
                .limitForPeriod(instanceSpec.limit) // 200
                .timeoutDuration(instanceSpec.timeout) // 10m
                .limitRefreshPeriod(instanceSpec.period) // 1s
                .build()
        )
    }
}

As mentioned above, service(s) can also be used in batch mode (the client sends multiple requests as one, the response is merged as json-stream). This is represented below

override fun batch(queries: Flux<BatchRequestEntry>) =
            queries.publishOn(scheduler)
                    .flatMap { query ->
                        when {
                            conditionA(query)-> serviceA.invoke(query) // Mono
                            conditionB(query)-> serviceB.invokeMany(query).toMono() // it returns Flux without it
                            conditionC(query) -> serviceC.invoke(query).toMono()
                            else -> Mono.empty()
                        }.map {
                            BatchResultEntry(query.id, result = it)
                        }.onErrorResume { mapError(it, query) }
                    }

    private fun mapError(it: Throwable?, query: BatchRequestEntry): Mono<BatchResultEntry> {
        val e = when (it) {
            is WebClientResponseException -> webClientResponseExceptionManager.parse(it)
            else -> it
        }
        return Mono.just(BatchResultEntry(query.id, error = e))
    }

Here scheduler is created with Schedulers.newParallel("batch", 10)

OlegDokuka commented 3 years ago

@Azbesciak can you try to repro this leak with an enabled logger:

<logger name="_reactor.netty.channel.LeakDetection" level="DEBUG" />

Also, it can be very useful if you can create a small example app that mimics the code you have in your production app but shows the code that causes the issue in isolation. Even though it will not reproduce the problem it can give us an opportunity to play around and see what could be the problem and where is it.

Thanks

Azbesciak commented 3 years ago

reactor.netty.channel.LeakDetection

@Azbesciak can you try to repro this leak with an enabled logger:

<logger name="_reactor.netty.channel.LeakDetection" level="DEBUG" />

Is it somehow possible to set via environment properties? I would not like to rebuild the whole image (I do not have logger specific config file). I set env properties like below

      REACTOR_NETTY_CHANNEL_LEAK_DETECTION: "debug"
      LOGGING_LEVEL_REACTOR_IPC_NETTY: "debug"
      IO_NETTY_LEAK_DETECTION_LEVEL: "paranoid"

but no difference, stack trace is still the same

2021-07-19 19:14:05.731 ERROR 1 --- [reactor-http-epoll-4] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
2021-07-19 19:14:05.732 ERROR 1 --- [reactor-http-epoll-4] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
    reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
2021-07-19 19:16:11.504 ERROR 1 --- [reactor-http-epoll-3] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:569)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
2021-07-19 19:18:16.931 ERROR 1 --- [reactor-http-epoll-2] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
    reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:341)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:267)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnComplete(RateLimiterSubscriber.java:54)
    reactor.core.publisher.BaseSubscriber.onComplete(BaseSubscriber.java:197)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1817)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
2021-07-19 19:19:14.140 ERROR 1 --- [reactor-http-epoll-2] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$1.run(AbstractEpollChannel.java:425)
    io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)

also, about the code sample - it is really all that I am using; I nearly copy-pasted all the service connection logic. myService bean is used in the controller just like this, and nothing more. I use Here API via HTTPS, but it rather does not matter.

Azbesciak commented 3 years ago

I changed env props to

      LOGGING_LEVEL_REACTOR_NETTY: "debug"
      IO_NETTY_LEAK_DETECTION_LEVEL: "paranoid"

Below is the whole log till the first leak. leak_log.txt

violetagg commented 3 years ago

@Azbesciak you need to set DEBUG level for that specific logger _reactor.netty.channel.LeakDetection. Note that it starts with _ and that's because we don't want this to be enabled with the rest of Reactor Netty logs, so this LOGGING_LEVEL_REACTOR_NETTY: "debug will not set the needed debug level for leak detection.

Azbesciak commented 3 years ago

@Azbesciak you need to set DEBUG level for that specific logger _reactor.netty.channel.LeakDetection. Note that it starts with _ and that's because we don't want this to be enabled with the rest of Reactor Netty logs, so this LOGGING_LEVEL_REACTOR_NETTY: "debug will not set the needed debug level for leak detection.

Yes, I saw it, and asked about it.

Is it somehow possible to set via environment properties? I would not like to rebuild the whole image (I do not have logger specific config file).

violetagg commented 3 years ago

@Azbesciak you need to set DEBUG level for that specific logger _reactor.netty.channel.LeakDetection. Note that it starts with _ and that's because we don't want this to be enabled with the rest of Reactor Netty logs, so this LOGGING_LEVEL_REACTOR_NETTY: "debug will not set the needed debug level for leak detection.

Yes, I saw it, and asked about it.

Is it somehow possible to set via environment properties? I would not like to rebuild the whole image (I do not have logger specific config file).

Are you able to add -Dlogging.level._reactor.netty=debug, I don't think that the env variable will work for logger starting with _ :(

Azbesciak commented 3 years ago

@violetagg I am affraid that this -Dlogging.level._reactor.netty=debug does not work. I added this flag to my entrypoint like below

 ENTRYPOINT  [  "java",  "-Dlogging.file.name=./log/app.log",  "-Dlogging.file.max-history=10",  "-Dlogging.level._reactor.netty=debug",  "-jar",  "/app.jar"]

but no difference. later I also added -Dio.netty.leakDetectionLevel=paranoid -Dreactor.netty.channel.LeakDetection=debug -D_reactor.netty.channel.LeakDetection=debug but again - no change.

still the same log (nothing before and after)

2021-07-20 19:15:46.353 ERROR 1 --- [reactor-http-epoll-1] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
    reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:83)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
2021-07-20 19:15:46.353 ERROR 1 --- [reactor-http-epoll-3] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:569)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
Azbesciak commented 3 years ago

To be sure it is not RateLimiter fault, I removed it - unfortunately, no change (notice that it was in the stacktrace also)

2021-07-21 04:14:31.811 ERROR 1 --- [reactor-http-epoll-1] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1798)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
    io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
    io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Unknown Source)
violetagg commented 3 years ago

@Azbesciak It looks like the two JVM settings are not applied

If this is applied -Dio.netty.leakDetectionLevel=paranoid you should be able to see more the stack traces from Netty - see here for example https://netty.io/wiki/reference-counted-objects.html#troubleshooting-buffer-leaks

With -Dlogging.level._reactor.netty.channel.LeakDetection=debug you should see even more stack traces from Reactor Netty

Azbesciak commented 3 years ago

Ok, I believe that is true, but look at this comment and one before - when I removed one setting, log appeared, so maybe these are in conflict? I cannot experiment on production during the day, and it is also cumbersome :/ What should I expect when it would work (how this log looks like)? I saw reactor debug logs (mentioned comment) but I guess you do not expect this.

violetagg commented 3 years ago

@Azbesciak

I cannot experiment on production during the day, and it is also cumbersome :/

Yeah I understand, it is a tricky one :worried:

What should I expect when it would work (how this log looks like)? I saw reactor debug logs (mentioned comment) but I guess you do not expect this.

Take a look at this issue https://github.com/reactor/reactor-netty/issues/1603#issue-857920607

The first stack is with the standard settings, then the author added -Dio.netty.leakDetectionLevel=advance - you can see that there are many stacks after Recent access records:

When you enable the logger _reactor.netty.channel.LeakDetection then you will be able to see hints like this one

#1:
    Hint: [id: 0x2c17d156, L:/127.0.0.1:62487 - R:localhost/127.0.0.1:8080] Buffered ByteBufHolder in Inbound Flux Queue
...
#2:
    Hint: 'reactor.right.reactiveBridge' will handle the message from this point.
violetagg commented 3 years ago

@Azbesciak Were you able to enable the leak detection in Netty/Reactor Netty?

Azbesciak commented 3 years ago

Hi, I did not have time for that, no option in the coming days, and taking into account that I already tried some combinations of parameters, and these did not work, no idea when I can provide it. But, screenshot from yesterday... geocode_service_mem_leak

violetagg commented 3 years ago

@Azbesciak We need somehow to enabled those options otherwise we cannot proceed with the investigation ... :(

Azbesciak commented 3 years ago

@violetagg I have 'good' and bad news. First, I replicated this on my local windows machine. The only thing that was required was a high number of concurrent requests task, let say 20k at once. I have rate limiter 200/s - these were scheduled. But now bad news. First I run just with --debug, log was exactly like the all before. then I added logback config like below (notice last node), without --debug

<configuration>

    <property name="HOME_LOG" value="log2/app.log"/>

    <appender name="FILE-ROLLING" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${HOME_LOG}</file>

        <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
            <fileNamePattern>logs/archived/app.%d{yyyy-MM-dd}.%i.log.gz</fileNamePattern>
            <!-- each archived file, size max 10MB -->
            <maxFileSize>10MB</maxFileSize>
            <!-- total size of all archive files, if total size > 20GB, it will delete old archived file -->
            <totalSizeCap>20GB</totalSizeCap>
            <!-- 60 days to keep -->
            <maxHistory>60</maxHistory>
        </rollingPolicy>

        <encoder>
            <pattern>%d %p %c{1.} [%t] %m%n</pattern>
        </encoder>
    </appender>
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <appender-ref ref="FILE-ROLLING" />
    </appender>

    <root level="debug">
        <appender-ref ref="ASYNC"/>
    </root>

    <logger name="_reactor.netty.channel.LeakDetection" level="DEBUG" />

</configuration>

... did not really help, to be honest, nothing changed. I also added -Dlogging.level._reactor.netty=debug - no big difference. After adding -Dio.netty.leakDetectionLevel=paranoid (it was notified in logs) ... and also no difference

2021-07-31 09:49:51,346 DEBUG org.springframework.http.codec.json.Jackson2JsonDecoder [reactor-http-nio-3] [79a70eb0] [b0789712-4, L:/<internal-ip> - R:<address>/<external-ip>] Decoded [...]
2021-07-31 09:49:51,346 DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions [reactor-http-nio-3] [79a70eb0] Cancel signal (to close connection)
2021-07-31 09:49:51,346 DEBUG org.springframework.http.codec.json.Jackson2JsonEncoder [reactor-http-nio-3] [106d7b0f-1, L:/[0:0:0:0:0:0:0:1]:8083 - R:/[0:0:0:0:0:0:0:1]:64386] Encoding [..., bou (truncated)...]
2021-07-31 09:49:51,346 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:b0789712, L:/<internal-ip> - R:<address>/<external-ip>] onStateChange(GET{uri=<address>, connection=PooledConnection{channel=[id: 0xb0789712, L:/192.168.5.64:64586 - R:<address>/<external-ip>]}}, [response_completed])
2021-07-31 09:49:51,346 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:b0789712, L:/<internal-ip> - R:<address>/<external-ip>] onStateChange(GET{uri=<address>, connection=PooledConnection{channel=[id: 0xb0789712, L:/192.168.5.64:64586 - R:<address>/<external-ip>]}}, [disconnecting])
2021-07-31 09:49:51,346 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:b0789712, L:/<internal-ip> - R:<address>/<external-ip>] Releasing channel
2021-07-31 09:49:51,346 DEBUG reactor.netty.resources.PooledConnectionProvider [reactor-http-nio-3] [id:b0789712, L:/<internal-ip> - R:<address>/<external-ip>] Channel cleaned, now: 7 active connections, 193 inactive connections and 0 pending acquire requests.
2021-07-31 09:49:51,346 ERROR io.netty.util.ResourceLeakDetector [reactor-http-nio-4] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:831)
2021-07-31 09:49:51,347 DEBUG reactor.netty.http.client.HttpClientOperations [reactor-http-nio-3] [id:a4d4bc8e-4, L:/<internal-ip> - R:<address>/<external-ip>] Received response (auto-read:false) : [Cache-Control=no-store, must-revalidate, Content-Type=application/json, Date=Sat, 31 Jul 2021 07:49:53 GMT, Expires=Sun, 26 Apr 1970 20:00:00 GMT, Pragma=no-cache, Server=openresty, X-Request-Id=REQ-51ea2ff7-ec4a-4a94-a2c7-710a9cc9b76f, transfer-encoding=chunked, Connection=keep-alive]
2021-07-31 09:49:51,347 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:a4d4bc8e-4, L:/<internal-ip> - R:<address>/<external-ip>] onStateChange(GET{uri=<address>, connection=PooledConnection{channel=[id: 0xa4d4bc8e, L:/<internal-ip> - R:<address>/<external-ip>]}}, [response_received])
2021-07-31 09:49:51,347 DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions [reactor-http-nio-3] [9c84504] [a4d4bc8e-4, L:/<internal-ip> - R:<address>/<external-ip>] Response 200 OK
2021-07-31 09:49:51,347 DEBUG reactor.netty.channel.FluxReceive [reactor-http-nio-3] [id:a4d4bc8e-4, L:/<internal-ip> - R:<address>/<external-ip>] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
2021-07-31 09:49:51,347 ERROR io.netty.util.ResourceLeakDetector [reactor-http-nio-4] LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71)
    org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39)
    org.springframework.http.codec.json.AbstractJackson2Encoder.encodeStreamingValue(AbstractJackson2Encoder.java:280)
    org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$1(AbstractJackson2Encoder.java:168)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:543)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:712)
    reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmit(FluxFlatMap.java:569)
    reactor.core.publisher.FluxFlatMap$FlatMapInner.onNext(FluxFlatMap.java:984)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:421)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    io.github.resilience4j.reactor.ratelimiter.operator.RateLimiterSubscriber.hookOnNext(RateLimiterSubscriber.java:42)
    reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:388)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)
    reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:295)
    reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:159)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
    reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:401)
    reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:420)
    reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:474)
    reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:685)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
    io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1234)
    io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1280)
    io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
    io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:831)
2021-07-31 09:49:51,347 DEBUG reactor.netty.http.client.HttpClientOperations [reactor-http-nio-3] [id:a4d4bc8e-4, L:/<internal-ip> - R:<address>/<external-ip>] Received last HTTP packet
2021-07-31 09:49:51,347 DEBUG org.springframework.http.codec.json.Jackson2JsonDecoder [reactor-http-nio-3] [9c84504] [a4d4bc8e-4, L:/<internal-ip> - R:<address>/<external-ip>] Decoded [...]
2021-07-31 09:49:51,347 DEBUG org.springframework.web.reactive.function.client.ExchangeFunctions [reactor-http-nio-3] [9c84504] Cancel signal (to close connection)
2021-07-31 09:49:51,347 DEBUG org.springframework.http.codec.json.Jackson2JsonEncoder [reactor-http-nio-3] [106d7b0f-1, L:/[0:0:0:0:0:0:0:1]:8083 - R:/[0:0:0:0:0:0:0:1]:64386] Encoding [...]
2021-07-31 09:49:51,347 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:a4d4bc8e, L:/<internal-ip> - R:<address>/<external-ip>] onStateChange(GET{uri=<address>, connection=PooledConnection{channel=[id: 0xa4d4bc8e, L:/<internal-ip> - R:<address>/<external-ip>]}}, [response_completed])
2021-07-31 09:49:51,347 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:a4d4bc8e, L:/<internal-ip> - R:<address>/<external-ip>] onStateChange(GET{uri=<address>, connection=PooledConnection{channel=[id: 0xa4d4bc8e, L:/<internal-ip> - R:<address>/<external-ip>]}}, [disconnecting])
2021-07-31 09:49:51,347 DEBUG reactor.netty.resources.DefaultPooledConnectionProvider [reactor-http-nio-3] [id:a4d4bc8e, L:/<internal-ip> - R:<address>/<external-ip>] Releasing channel

You can give me the exact logger config and env parameters you want, I can run it and we will see... PS: with -Dio.netty.leakDetectionLevel=advance instead of paranoid no difference.

Azbesciak commented 3 years ago

@violetagg ? Did you see my question about the exact required run configuration? Unfortunately, as you see above I was not able to receive the required result with described steps. I can also send you directly some code parts with requests - it is probably redundant (the exact code, requests, and keys), but...

violetagg commented 3 years ago

@Azbesciak It seems that Spring Boot 2.5 has a property spring.netty.leak-detection=paranoid https://github.com/spring-projects/spring-boot/issues/27104 This property overrides the setting -Dio.netty.leakDetectionLevel=paranoid, so you need to specify the property above.

Azbesciak commented 3 years ago

@OlegDokuka did you see https://github.com/reactor/reactor-netty/issues/1746#issuecomment-883873831?

OlegDokuka commented 3 years ago

@Azbesciak yeah, noticed that after, thus removed my comment then. My bad

Azbesciak commented 3 years ago

Ok, did not notice that you removed. @violetagg I will check it in next 2 days, hope tomorrow . Thanks.

OlegDokuka commented 3 years ago

@Azbesciak Let me write a test around the same set of operators as in your original issue, to see if a leak may happen on that level. I lean toward the probability that something is on the reactor-core side rather than on the reactor-netty one

rstoyanchev commented 3 years ago

@Azbesciak can you confirm the media type for the content? It looks like it should be a streaming format but just double checking this is the case as we are looking at code paths and possibilities.

Azbesciak commented 3 years ago

Yes, it is stream+json

rstoyanchev commented 3 years ago

@Azbesciak I have a potential fix in the Spring Framework. If you're able to give it a try with Spring Framework 5.3.10-SNAPSHOT that would be great.

Azbesciak commented 3 years ago

Unfortunately, it did not change a lot - only that I noticed that it is easier to invoke the leak, but it is just a personal observation. I set spring.netty.leak-detection: paranoid and it worked, thanks @violetagg :) Below logs with paranoid and my service's dependencies

leak_log.txt dependencies.txt

violetagg commented 3 years ago

@Azbesciak This is the last component that accessed the buffer (see below). I cannot comment the code with package uniopti.route.spring.response, but io.netty.buffer.ByteBufInputStream if this is not created with releaseOnClose, the code must ensure buffer release. Also I don't know whether you have some cleanup for these buffers when error/cancel happens. Such cleanups can be achieved with doOnDiscard for example.

    io.netty.buffer.AdvancedLeakAwareByteBuf.readBytes(AdvancedLeakAwareByteBuf.java:502)
    io.netty.buffer.ByteBufInputStream.read(ByteBufInputStream.java:183)
    com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.ensureLoaded(ByteSourceJsonBootstrapper.java:539)
    com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.detectEncoding(ByteSourceJsonBootstrapper.java:133)
    com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper.constructParser(ByteSourceJsonBootstrapper.java:256)
    com.fasterxml.jackson.core.JsonFactory._createParser(JsonFactory.java:1656)
    com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:1085)
    com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3585)
    uniopti.route.spring.response.ResponseFieldsFilter$SquigglyServerResponseDecorator.asFilteredJsonString(ResponseFieldsFilter.kt:99)
    uniopti.route.spring.response.ResponseFieldsFilter$SquigglyServerResponseDecorator.processServerEventStream(ResponseFieldsFilter.kt:91)
    uniopti.route.spring.response.ResponseFieldsFilter$SquigglyServerResponseDecorator.processBuffer$lambda-2(ResponseFieldsFilter.kt:62)
Azbesciak commented 3 years ago

Thank you @violetagg and sorry - I had some plug-in I've forgotten about... The problem was in it, in DataBuffer.asInputStream() without arguments - by default it does not release resources.

jarpz commented 2 years ago

Hi I'm getting the same in an intermittent way.. I got these errors on the docker environment. Also, I run the same service on a local environment with constraints of memory XX:MaxRam, Xmx, -Dio.netty.leakDetectionLevel=paranoid ; _reactor.netty.channel.LeakDetection = DEBUG and can not get any evidence of this error.

Also doing profiling over the service I don't have evidence of memory leaks.

Here is my stacktrace:

LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.Recent access records: 
Createdat:io.netty.buffer.SimpleLeakAwareByteBuf.unwrappedDerived(SimpleLeakAwareByteBuf.java:143)
io.netty.buffer.SimpleLeakAwareByteBuf.readRetainedSlice(SimpleLeakAwareByteBuf.java:67)
io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:333)
io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:225)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1368)
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1245)
io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1282)
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:507)
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:446)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795)
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)java.base/java.lang.Thread.run(Unknown Source)

Some suggestions about how to find the cause?

@violetagg

Regards

violetagg commented 2 years ago

@jarpz The issue in this ticket was in the authors's code.

Please open new issue where we will investigate your particular problem. Please provide e reproducible example and logs where the memory leak detection is enabled.

linuxdf commented 7 months ago

[reactor-http-epoll-8] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: Created at: io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115) org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:71) org.springframework.core.io.buffer.NettyDataBufferFactory.allocateBuffer(NettyDataBufferFactory.java:39) org.springframework.core.codec.CharSequenceEncoder.encodeValue(CharSequenceEncoder.java:91) org.springframework.core.codec.CharSequenceEncoder.lambda$encode$0(CharSequenceEncoder.java:75) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107) reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:96) reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1782) reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:152) reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:374) reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:373) reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:492) reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:216) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:475) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:750)