r2dbc / r2dbc-mssql

R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
Apache License 2.0
183 stars 32 forks source link

Eager buffer allocation in `TdsEncoder.writeChunkedMessage(…)` can lead to memory leaks #195

Closed ajobra76 closed 3 years ago

ajobra76 commented 3 years ago

Using SQLServer 2012 Java 8 Default Configuration in Spring Boot 2.4.3 and Spring Data Reactive save method for Entity containing large String written to nvarchar(max) column leads to

2021-03-25 09:14:28.089 ERROR 11984 --- [actor-tcp-nio-2] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:385)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:115)
    io.r2dbc.mssql.client.TdsEncoder.writeChunkedMessage(TdsEncoder.java:265)
    io.r2dbc.mssql.client.TdsEncoder.doWriteFragment(TdsEncoder.java:216)
    io.r2dbc.mssql.client.TdsEncoder.write(TdsEncoder.java:180)
    io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
    io.r2dbc.mssql.client.ssl.TdsSslHandler.write(TdsSslHandler.java:242)
    io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717)
    io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:709)
    io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:792)
    io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:702)
    reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:321)
    reactor.netty.channel.MonoSendMany$SendManyInner.trySchedule(MonoSendMany.java:423)
    reactor.netty.channel.MonoSendMany$SendManyInner.onNext(MonoSendMany.java:223)
    reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:177)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
    reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
    reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:177)
    reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkSubscriber.emitNext(PlpEncoded.java:307)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkSubscriber.drain(PlpEncoded.java:249)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkSubscriber.onNext(PlpEncoded.java:224)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkSubscriber.onNext(PlpEncoded.java:146)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:127)
    reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:169)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkSubscriber.request(PlpEncoded.java:349)
    reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:162)
    reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2193)
    reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2067)
    reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkSubscriber.onSubscribe(PlpEncoded.java:201)
    reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    reactor.core.publisher.FluxJust.subscribe(FluxJust.java:68)
    reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62)
    io.r2dbc.mssql.codec.PlpEncoded$ChunkOperator.subscribe(PlpEncoded.java:141)
    reactor.core.publisher.Flux.subscribe(Flux.java:8156)
    reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
    reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
    reactor.core.publisher.Flux.subscribe(Flux.java:8156)
    reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
    reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218)
    reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164)
    reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86)
    reactor.core.publisher.Flux.subscribe(Flux.java:8156)
    reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onComplete(FluxConcatArray.java:208)
    reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)
    reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
    reactor.core.publisher.Flux.subscribe(Flux.java:8156)
    reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102)
    reactor.core.publisher.Mono.subscribe(Mono.java:4046)
    reactor.netty.NettyOutbound.subscribe(NettyOutbound.java:337)
    reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448)
    reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250)
    reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
    reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
    reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
    reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
    reactor.core.publisher.FluxCreate$IgnoreSink.next(FluxCreate.java:618)
    reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154)
    io.r2dbc.mssql.client.ReactorNettyClient.lambda$null$11(ReactorNettyClient.java:576)
    reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
    reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
    reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:61)
    reactor.core.publisher.Flux.subscribe(Flux.java:8156)
    reactor.core.publisher.Flux.subscribeWith(Flux.java:8329)
    reactor.core.publisher.Flux.subscribe(Flux.java:8126)
    reactor.core.publisher.Flux.subscribe(Flux.java:8050)
    io.r2dbc.mssql.client.ReactorNettyClient.lambda$null$13(ReactorNettyClient.java:569)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onSubscribe(FluxPeek.java:161)
    reactor.core.publisher.EmitterProcessor.subscribe(EmitterProcessor.java:199)
    reactor.core.publisher.Flux.subscribe(Flux.java:8156)
    reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:195)
    reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:160)
    io.r2dbc.mssql.client.ReactorNettyClient$ExchangeRequest$1.onSuccess(ReactorNettyClient.java:753)
    io.r2dbc.mssql.client.ReactorNettyClient$RequestQueue.run(ReactorNettyClient.java:665)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:264)
    reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212)
    reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:133)
    reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:199)
    reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
    reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
    reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
    reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
    io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:244)
    io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:204)
    io.r2dbc.mssql.message.token.Tabular$TabularDecoder.decode(Tabular.java:424)
    io.r2dbc.mssql.client.ConnectionState$4$1.decode(ConnectionState.java:206)
    io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:137)
    io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:109)
    io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:254)
    reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:184)
    reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:267)
    reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:377)
    reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:381)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
    io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.lang.Thread.run(Thread.java:748)