r2dbc / r2dbc-mssql

R2DBC Driver for Microsoft SQL Server using TDS (Tabular Data Stream) Protocol
Apache License 2.0
178 stars 32 forks source link

Random `IllegalReferenceCountException: refCnt: 0` #245

Closed dstepanov closed 8 months ago

dstepanov commented 2 years ago

We have a randomly failing test on Github Actions, unfortunately, no steps to reproduce.

Versions

mp911de commented 2 years ago

This looks as if the connection was disconnected earlier on and still some activity happens later on. The refCnt: 0 comes from the buffer that was read by Netty for decoding. Do you observe any preceding error messages in the log?

dstepanov commented 2 years ago

I haven't seen anything, waiting for another fail to check again

donard-commedagh commented 2 years ago

I'm getting a similar but not predictably repeatable error, which might be the same problem:

15:40:00.842 [reactor-tcp-nio-2] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
Created at:
    io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:402)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
    io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
    io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
    io.r2dbc.mssql.message.token.RowToken.doDecodePlp(RowToken.java:259)
    io.r2dbc.mssql.message.token.RowToken.decodeColumnData(RowToken.java:199)
    io.r2dbc.mssql.message.token.NbcRowToken.doDecode(NbcRowToken.java:123)
    io.r2dbc.mssql.message.token.NbcRowToken.decode(NbcRowToken.java:61)
    io.r2dbc.mssql.message.token.Tabular.lambda$decodeFunction$0(Tabular.java:203)
    io.r2dbc.mssql.message.token.Tabular$TabularDecoder.decode(Tabular.java:413)
    io.r2dbc.mssql.client.ConnectionState$4$1.decode(ConnectionState.java:206)
    io.r2dbc.mssql.client.StreamDecoder.withState(StreamDecoder.java:116)
    io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:88)
    io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
    io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:297)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
    reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:250)
    reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
    reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
    reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
    reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
    io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
    io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
    io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
    io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    java.base/java.lang.Thread.run(Thread.java:832)

My stack is: My app > komapper > r2dbc-mssql > SQL Server

The stack trace doesn't touch my code so I can't prove what function is triggering it.

However, I think it is due to reading from a Blob object as when I comment out the Blob reading code it never triggers.

val buffer = ByteBuffer.allocate(65536)
blobObject.stream().collect<ByteBuffer>(buffer::put)

Note: I am caching the result of the above code as Blobs can only be read once. I have no reason to believe that a second thread/co-routine might be trying to read before the result is cached.

From io/r2dbc/spi/Blob.java

    /**
     * Returns the content stream as a {@link Publisher} emitting {@link ByteBuffer} chunks.
     * <p>
     * The content stream can be consumed ("subscribed to") only once.  Subsequent consumptions result in a {@link IllegalStateException}.
     * <p>
     * Once {@link Publisher#subscribe(Subscriber) subscribed}, {@link Subscription#cancel() canceling} the subscription releases resources associated with this {@link Blob}.
     *
     * @return a {@link Publisher} emitting {@link ByteBuffer} chunks.
     */
    Publisher<ByteBuffer> stream();

I suspect that cancel() may not be getting called properly once the Blob is fully read, but I can't figure out how to test this.

dstepanov commented 1 year ago

The issue is still there:

2022-11-14T03:07:02.6207388Z       at io.micronaut.data.r2dbc.operations.DefaultR2dbcRepositoryOperations.blockOptional(DefaultR2dbcRepositoryOperations.java:213)
2022-11-14T03:07:02.6209345Z       at io.micronaut.data.operations.reactive.BlockingExecutorReactorRepositoryOperations.executeDelete(BlockingExecutorReactorRepositoryOperations.java:105)
2022-11-14T03:07:02.6210444Z       at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:53)
2022-11-14T03:07:02.6248347Z       at io.micronaut.data.runtime.intercept.DefaultDeleteAllInterceptor.intercept(DefaultDeleteAllInterceptor.java:35)
2022-11-14T03:07:02.6249490Z       at io.micronaut.data.intercept.DataIntroductionAdvice.intercept(DataIntroductionAdvice.java:81)
2022-11-14T03:07:02.6251260Z       at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
2022-11-14T03:07:02.6252087Z       at io.micronaut.validation.ValidatingInterceptor.intercept(ValidatingInterceptor.java:143)
2022-11-14T03:07:02.6261824Z       at io.micronaut.aop.chain.MethodInterceptorChain.proceed(MethodInterceptorChain.java:137)
2022-11-14T03:07:02.6262700Z       at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanupData(AbstractRepositorySpec.groovy:164)
2022-11-14T03:07:02.6264233Z       at io.micronaut.data.tck.tests.AbstractRepositorySpec.cleanup(AbstractRepositorySpec.groovy:176)
2022-11-14T03:07:02.6265126Z   Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
2022-11-14T03:07:02.6267089Z       at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$23(ReactorNettyClient.java:688)
2022-11-14T03:07:02.6267714Z       at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:698)
2022-11-14T03:07:02.6268345Z       at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:688)
2022-11-14T03:07:02.6268952Z       at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:366)
2022-11-14T03:07:02.6269536Z       at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
2022-11-14T03:07:02.6270209Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:321)
2022-11-14T03:07:02.6270891Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:686)
2022-11-14T03:07:02.6271552Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:274)
2022-11-14T03:07:02.6272189Z       at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158)
2022-11-14T03:07:02.6272761Z       at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:549)
2022-11-14T03:07:02.6273151Z       ... 69 more
2022-11-14T03:07:02.6283901Z   Caused by: io.netty.util.IllegalReferenceCountException: refCnt: 0
2022-11-14T03:07:02.6284470Z       at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
2022-11-14T03:07:02.6285086Z       at io.netty.buffer.PooledUnsafeDirectByteBuf.memoryAddress(PooledUnsafeDirectByteBuf.java:241)
2022-11-14T03:07:02.6285657Z       at io.netty.buffer.UnsafeByteBufUtil.setBytes(UnsafeByteBufUtil.java:524)
2022-11-14T03:07:02.6286219Z       at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:193)
2022-11-14T03:07:02.6286761Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1104)
2022-11-14T03:07:02.6287242Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1096)
2022-11-14T03:07:02.6287712Z       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1087)
2022-11-14T03:07:02.6288225Z       at io.r2dbc.mssql.client.StreamDecoder$DecoderState.initial(StreamDecoder.java:180)
2022-11-14T03:07:02.6288736Z       at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:85)
2022-11-14T03:07:02.6289228Z       at io.r2dbc.mssql.client.StreamDecoder.decode(StreamDecoder.java:64)
2022-11-14T03:07:02.6289746Z       at io.r2dbc.mssql.client.ReactorNettyClient.lambda$new$6(ReactorNettyClient.java:297)
2022-11-14T03:07:02.6290366Z       at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
2022-11-14T03:07:02.6290818Z       ... 73 more
aironi commented 1 year ago

Hi everybody!

Reporting that we are facing possibly the same problem and also the memory leak notification from Netty leak detector.

In our use case we are using r2dbc, r2dbc-pool, Reactor, Spring and we are connecting to MS SQL Server.

Leak nag:

[2023-04-25T10:57:48.525Z] huhtik. 25, 2023 1:57:48 IP. io.netty.util.ResourceLeakDetector reportTracedLeak
[2023-04-25T10:57:48.534Z] SEVERE: LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information.

Stack trace regarding refCnt:

Exception: IllegalReferenceCountException: refCnt: 0
Stack: java.lang.reflect.InvocationTargetException
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at com.microsoft.azure.functions.worker.broker.JavaMethodInvokeInfo.invoke(JavaMethodInvokeInfo.java:22)
    at com.microsoft.azure.functions.worker.broker.EnhancedJavaMethodExecutorImpl.execute(EnhancedJavaMethodExecutorImpl.java:22)
    at com.microsoft.azure.functions.worker.chain.FunctionExecutionMiddleware.invoke(FunctionExecutionMiddleware.java:19)
    at com.microsoft.azure.functions.worker.chain.InvocationChain.doNext(InvocationChain.java:21)
    at com.microsoft.azure.functions.worker.broker.JavaFunctionBroker.invokeMethod(JavaFunctionBroker.java:125)
    at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:34)
    at com.microsoft.azure.functions.worker.handler.InvocationRequestHandler.execute(InvocationRequestHandler.java:10)
    at com.microsoft.azure.functions.worker.handler.MessageHandler.handle(MessageHandler.java:44)
    at com.microsoft.azure.functions.worker.JavaWorkerClient$StreamingMessagePeer.lambda$onNext$0(JavaWorkerClient.java:94)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.springframework.dao.DataAccessResourceFailureException: executeMany; SQL [< secrets >]; null
    at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:231)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxOnErrorResume] :
    reactor.core.publisher.Flux.onErrorMap
    org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:149)
Error has been observed at the following site(s):
    *_____Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:149)
    |_                    ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:85)
    |_          Flux.name ⇢ at < secrets >
    |_           Flux.tag ⇢ at
    |_       Flux.metrics ⇢ at
    |_           Flux.map ⇢ at
    *_________Flux.concat ⇢ at
    |_          Flux.name ⇢ at
    |_           Flux.tag ⇢ at
    |_       Flux.metrics ⇢ at
    |_       Flux.flatMap ⇢ at
    |_       Flux.flatMap ⇢ at
    |_     Flux.publishOn ⇢ at
    |_       Flux.flatMap ⇢ at
    |_       Flux.flatMap ⇢ at
    |_           Flux.map ⇢ at
    |_ Flux.switchIfEmpty ⇢ at
    *________Flux.flatMap ⇢ at
    |_ Flux.switchIfEmpty ⇢ at
    *________Flux.flatMap ⇢ at
    |_        Flux.filter ⇢ at
    |_          Flux.sort ⇢ at
    |_          Flux.skip ⇢ at
    |_          Flux.take ⇢ at
    |_                    ⇢ at
    |_     Flux.doOnError ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunction(SimpleFunctionRegistry.java:834)
    |_          Flux.from ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputPublisherIfNecessary(SimpleFunctionRegistry.java:1415)
    |_           Flux.map ⇢ at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.convertOutputPublisherIfNecessary(SimpleFunctionRegistry.java:1415)
Original Stack Trace:
        at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:231)
        at org.springframework.r2dbc.core.DefaultDatabaseClient.lambda$inConnectionMany$8(DefaultDatabaseClient.java:150)
        at reactor.core.publisher.Flux.lambda$onErrorMap$28(Flux.java:7123)
        at reactor.core.publisher.Flux.lambda$onErrorResume$29(Flux.java:7176)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredError(FluxUsingWhen.java:398)
        at reactor.core.publisher.FluxUsingWhen$RollbackInner.onComplete(FluxUsingWhen.java:475)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:278)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:200)
        at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:145)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:163)
        at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
        at reactor.core.publisher.Operators.error(Operators.java:198)
        at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onError(FluxUsingWhen.java:364)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:842)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
        at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1717)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
        at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
        at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
        at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:159)
        at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:130)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:488)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:421)
        at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:185)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.deferredComplete(FluxUsingWhen.java:392)
        at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:527)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onComplete(FluxPeekFuseable.java:277)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2205)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
        at reactor.pool.SimpleDequePool.maybeRecycleAndDrain(SimpleDequePool.java:531)
        at reactor.pool.SimpleDequePool$QueuePoolRecyclerInner.onComplete(SimpleDequePool.java:761)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.pool.SimpleDequePool$QueuePoolRecyclerMono.subscribe(SimpleDequePool.java:873)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:46)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:238)
        at reactor.core.publisher.MonoSupplier$MonoSupplierSubscription.request(MonoSupplier.java:148)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onSubscribe(MonoIgnoreElements.java:72)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:163)
        at reactor.core.publisher.MonoSupplier.subscribe(MonoSupplier.java:48)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263)
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103)
        at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:84)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onError(FluxMapFuseable.java:142)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.onError(FluxFilterFuseable.java:162)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onError(FluxFilterFuseable.java:382)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onError(FluxMapFuseable.java:340)
        at reactor.core.publisher.Operators.error(Operators.java:198)
        at reactor.core.publisher.MonoError.subscribe(MonoError.java:53)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55)
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:4444)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onComplete(FluxUsingWhen.java:384)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:846)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:608)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:465)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:152)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.checkTerminated(FluxWindowPredicate.java:540)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drainLoop(FluxWindowPredicate.java:488)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.drain(FluxWindowPredicate.java:432)
        at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onComplete(FluxWindowPredicate.java:312)
        at reactor.core.publisher.SerializedSubscriber.onComplete(SerializedSubscriber.java:146)
        at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.cancelMainAndComplete(FluxTakeUntilOther.java:200)
        at reactor.core.publisher.FluxTakeUntilOther$TakeUntilOtherSubscriber.onComplete(FluxTakeUntilOther.java:128)
        at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:272)
        at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:86)
        at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
        at io.r2dbc.mssql.RpcQueryMessageFlow$OnCursorComplete.run(RpcQueryMessageFlow.java:686)
        at io.r2dbc.mssql.RpcQueryMessageFlow.onDone(RpcQueryMessageFlow.java:387)
        at io.r2dbc.mssql.RpcQueryMessageFlow.handleMessage(RpcQueryMessageFlow.java:377)
        at io.r2dbc.mssql.RpcQueryMessageFlow.handleMessage(RpcQueryMessageFlow.java:315)
        at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$1(RpcQueryMessageFlow.java:141)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:488)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:113)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.onNext(FluxPeekFuseable.java:503)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.onNext(FluxHandleFuseable.java:504)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
        at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:471)
        at reactor.core.publisher.SinkManyEmitterProcessor.tryEmitNext(SinkManyEmitterProcessor.java:269)
        at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
        at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
        at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:295)
        at io.r2dbc.mssql.client.ReactorNettyClient$1.next(ReactorNettyClient.java:214)
        at io.r2dbc.mssql.client.ReactorNettyClient$2.onNext(ReactorNettyClient.java:326)
        at io.r2dbc.mssql.client.ReactorNettyClient$2.onNext(ReactorNettyClient.java:315)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:292)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:401)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:411)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
        at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionException
    at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$24(ReactorNettyClient.java:692)
    Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below:
Assembly trace from producer [reactor.core.publisher.FluxPeek] :
    reactor.core.publisher.Flux.doOnSubscribe
    io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$18(ReactorNettyClient.java:628)
Error has been observed at the following site(s):
    *_____Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.lambda$exchange$18(ReactorNettyClient.java:628)
    *_______Mono.flatMapMany ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:652)
    |_           Flux.handle ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:652)
    |_ Flux.doAfterTerminate ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
    |_        Flux.doFinally ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
    |_       Flux.doOnCancel ⇢ at io.r2dbc.mssql.client.ReactorNettyClient.exchange(ReactorNettyClient.java:662)
    |_           Flux.handle ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:137)
    |_           Flux.filter ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:143)
    |_       Flux.doOnCancel ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:144)
    |_    Flux.doOnSubscribe ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:146)
    |_                       ⇢ at io.r2dbc.mssql.util.Operators.discardOnCancel(Operators.java:60)
    |_      Flux.doOnDiscard ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.lambda$exchange$3(RpcQueryMessageFlow.java:149)
    |_        Flux.transform ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:149)
    |_   Flux.takeUntilOther ⇢ at io.r2dbc.mssql.RpcQueryMessageFlow.exchange(RpcQueryMessageFlow.java:149)
    |_                       ⇢ at io.r2dbc.mssql.MssqlStatementSupport.potentiallyAttachTimeout(MssqlStatementSupport.java:100)
    |_      Flux.windowUntil ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.lambda$execute$8(ParametrizedMssqlStatement.java:139)
    |_              Flux.map ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.lambda$execute$8(ParametrizedMssqlStatement.java:139)
    *_____________Flux.defer ⇢ at io.r2dbc.mssql.ParametrizedMssqlStatement.execute(ParametrizedMssqlStatement.java:119)
    |_             Flux.from ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$getResultFunction$6(DefaultDatabaseClient.java:398)
    |_             Flux.cast ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient$DefaultGenericExecuteSpec.lambda$getResultFunction$6(DefaultDatabaseClient.java:399)
    |_            checkpoint ⇢ SQL  < secrets > [DatabaseClient]
    |_          Flux.flatMap ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.lambda$all$1(DefaultFetchSpec.java:87)
    *_________Flux.usingWhen ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:137)
Original Stack Trace:
        at io.r2dbc.mssql.client.ReactorNettyClient.lambda$handleConnectionError$24(ReactorNettyClient.java:692)
        at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:702)
        at io.r2dbc.mssql.client.ReactorNettyClient.handleConnectionError(ReactorNettyClient.java:692)
        at io.r2dbc.mssql.client.ReactorNettyClient.resumeError(ReactorNettyClient.java:370)
        at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:351)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.request(FluxFlattenIterable.java:303)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
        at reactor.core.publisher.SinkManyEmitterProcessor.drain(SinkManyEmitterProcessor.java:483)
        at reactor.core.publisher.SinkManyEmitterProcessor$EmitterInner.drainParent(SinkManyEmitterProcessor.java:615)
        at reactor.core.publisher.FluxPublish$PubSubInner.request(FluxPublish.java:602)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:112)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.request(FluxHandleFuseable.java:653)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
        at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.request(FluxDoFinally.java:140)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableConditionalSubscriber.request(FluxPeekFuseable.java:437)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableConditionalSubscriber.request(FluxHandleFuseable.java:653)
        at reactor.core.publisher.FluxFilterFuseable$FilterFuseableSubscriber.request(FluxFilterFuseable.java:191)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144)
        at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.request(FluxDiscardOnCancel.java:115)
        at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.request(FluxContextWrite.java:136)
        at reactor.core.publisher.FluxTakeUntilOther$TakeUntilMainSubscriber.request(FluxTakeUntilOther.java:182)
        at reactor.core.publisher.SerializedSubscriber.request(SerializedSubscriber.java:151)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:684)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
        at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:835)
        at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:259)
        at reactor.core.publisher.FluxFlatMap$FlatMapInner.request(FluxFlatMap.java:1008)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:729)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319)
        at reactor.core.publisher.Operators$DeferredSubscription.request(Operators.java:1717)
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305)
        at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276)
        at reactor.core.publisher.FluxMetrics$MetricsSubscriber.request(FluxMetrics.java:204)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:588)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.request(FluxFlatMap.java:347)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:790)
        at reactor.core.publisher.FluxFlatMap$FlatMapMain.innerComplete(FluxFlatMap.java:894)
        at reactor.core.publisher.FluxFlatMap$FlatMapInner.onComplete(FluxFlatMap.java:997)
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:159)
        at reactor.core.publisher.MonoMetricsFuseable$MetricsFuseableSubscriber.onNext(MonoMetricsFuseable.java:130)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82)
        at reactor.core.publisher < clipped by Azure ... >
aironi commented 8 months ago

Hi all!

I believe this problem would be corrected by moving handleConnectionError(throwable) from linre 370 to 381:

https://github.com/r2dbc/r2dbc-mssql/blob/main/src/main/java/io/r2dbc/mssql/client/ReactorNettyClient.java#L370

Please confirm.

mp911de commented 8 months ago

Changing order will complete the request queue first and then discard the pending response handlers. It will guarantee that no new request will be accepted and I think, that is a useful change. I'm not sure whether it will fix the issue though.

Snapshot builds are available now, please retest.

aironi commented 8 months ago

Changing order will complete the request queue first and then discard the pending response handlers. It will guarantee that no new request will be accepted and I think, that is a useful change. I'm not sure whether it will fix the issue though.

Snapshot builds are available now, please retest.

Thanks, I did test this with my custom snapshot that I built locally and the refCnt disappeared. Instead I got the "connection closed" stack trace. We are not still sure why connection gets closed. We do perform a lot of queries to SQL Server and have only one troublesome use case where a lot of data is fetched with multiple complex queries. But that seems to be a different problem and this bug only hid the actual stack trace. Anyways, since I am not familiar with this package, I was not completely sure if this was a proper fix or not, hence the comment instead of PR :slightly_smiling_face:

I will test with your snapshot as soon as possible! Thanks again!

mp911de commented 8 months ago

Instead I got the "connection closed" stack trace.

I think this is actually an improvement. Connection closed can happen when there is too much pressure on the connection (i.e. server TCP recv buffers exhausted). Can you check how much activity is going on, especially parallel activity? Switching to concatMap instead of using flatMap or large prefetch sizes should get you there.

Once I get the confirmation from your side that the fix is appropriate, I'll close the ticket here.

aironi commented 8 months ago

Bad news: I attempted my use case with 1.1.0-BUILD-SNAPSHOT but it seems that I am hitting #276 and to my old eyes it seems that the queries get mixed up between connections like mentioned in #273 . I am using a pool.

Good news: I could not see the refCnt problem with this version.

For me, the error is the following (real parameter names renamed):

[2023-11-22T17:54:19.789Z] Exception: ExceptionFactory.MssqlNonTransientException: The parameterized query '(@P0_actualId nvarchar(4000),@P1_anotherId nvarchar(4000))SELECT ' expects the parameter '@P1_anotherId', which was not supplied.
...

[2023-11-22T17:54:19.790Z] Caused by: org.springframework.r2dbc.UncategorizedR2dbcException: executeMany; SQL [SELECT TOP(1) * FROM TABLE WITH(NOLOCK) WHERE ActualId = (:actualId)]; The parameterized query '(@P0_actualId nvarchar(4000),@P1_anotherId nvarchar(4000))SELECT ' expects the parameter '@P1_anotherId', which was not supplied.
[2023-11-22T17:54:19.790Z]  at org.springframework.r2dbc.connection.ConnectionFactoryUtils.convertR2dbcException(ConnectionFactoryUtils.java:245)
[2023-11-22T17:54:19.790Z]  Suppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: 
[2023-11-22T17:54:19.791Z] Assembly trace from producer [reactor.core.publisher.FluxOnErrorResume] :
[2023-11-22T17:54:19.791Z]  reactor.core.publisher.Flux.onErrorMap
[2023-11-22T17:54:19.791Z]  org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:151)
[2023-11-22T17:54:19.791Z] Error has been observed at the following site(s):
[2023-11-22T17:54:19.791Z]  *_____Flux.onErrorMap ⇢ at org.springframework.r2dbc.core.DefaultDatabaseClient.inConnectionMany(DefaultDatabaseClient.java:151)
[2023-11-22T17:54:19.791Z]  |_                    ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.all(DefaultFetchSpec.java:83)
[2023-11-22T17:54:19.791Z]  |_        Flux.buffer ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:62)
[2023-11-22T17:54:19.791Z]  |_       Flux.flatMap ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:63)
[2023-11-22T17:54:19.792Z]  |_          Flux.next ⇢ at org.springframework.r2dbc.core.DefaultFetchSpec.one(DefaultFetchSpec.java:73)
[2023-11-22T17:54:19.792Z]  |_          Mono.name ⇢ at com.mycode.MyRepositoryImpl.findXyzSql(MyRepositoryImpl.java:123)

BUT, the query in question does not even specify the anotherId in SQL.

I will comment this also to #276.

We are using 1.0.0.RELEASE in our released solution at the moment.

mp911de commented 8 months ago

Thanks a lot for confirming. Closing this one as done.

The other issues are different ones that we need to sort out.