r2dbc / r2dbc-pool

Connection Pooling for Reactive Relational Database Connectivity
https://r2dbc.io
Apache License 2.0
337 stars 55 forks source link

Connection pool hangs after DB restart #138

Open konsmak opened 3 years ago

konsmak commented 3 years ago

Bug Report

Hello. The application I'm working on is unable to obtain a connection from the pool when the DB is restarted while the app is under high load. It's not always reproducible, this problem only happens after the following exception is thrown:

description=Cannot exchange messages because the connection is closed,
cause=io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Cannot exchange messages because the connection is closed

Please see the full stack trace below. Can be a duplicate for the https://github.com/r2dbc/r2dbc-pool/issues/107 which is closed with no fix.

Please let me know if you need more information from me or steps on how to reproduce it. As noticed earlier, this only happens when the application is processing a high number of requests.

Versions

Current Behavior

Stack trace ``` io.r2dbc.postgresql.client.ReactorNettyClient$PostgresConnectionClosedException: Cannot exchange messages because the connection is closed at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.lambda$addConversation$2(ReactorNettyClient.java:797) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at reactor.core.publisher.FluxCreate.subscribe(FluxCreate.java:94) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) ~[reactor-core-3.4.2.jar:3.4.2] at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.2.jar:3.4.2] at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) ~[reactor-core-3.4.2.jar:3.4.2] at io.r2dbc.postgresql.util.FluxDiscardOnCancel.subscribe(FluxDiscardOnCancel.java:49) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:173) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1836) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreInner.onError(MonoIgnoreThen.java:243) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:139) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Operators$MonoSubscriber.onError(Operators.java:1836) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onError(MonoIgnoreThen.java:315) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Operators.error(Operators.java:196) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.Mono.subscribe(Mono.java:4046) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onError(MonoIgnoreElements.java:83) ~[reactor-core-3.4.2.jar:3.4.2] at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:92) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:92) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:202) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxCreate$BaseSink.error(FluxCreate.java:453) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:781) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxCreate$BufferAsyncSink.error(FluxCreate.java:726) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:230) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:206) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxCreate$SerializedFluxSink.error(FluxCreate.java:182) ~[reactor-core-3.4.2.jar:3.4.2] at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.onError(ReactorNettyClient.java:746) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.close(ReactorNettyClient.java:1018) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:518) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:506) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient.access$200(ReactorNettyClient.java:94) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onComplete(ReactorNettyClient.java:909) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:212) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:269) ~[reactor-core-3.4.2.jar:3.4.2] at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:457) ~[reactor-netty-core-1.0.3.jar:1.0.3] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:258) ~[reactor-netty-core-1.0.3.jar:1.0.3] at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:393) ~[reactor-netty-core-1.0.3.jar:1.0.3] at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:396) ~[reactor-netty-core-1.0.3.jar:1.0.3] at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:452) ~[reactor-netty-core-1.0.3.jar:1.0.3] at reactor.netty.channel.ChannelOperations.onInboundClose(ChannelOperations.java:413) ~[reactor-netty-core-1.0.3.jar:1.0.3] at reactor.netty.channel.ChannelOperationsHandler.channelInactive(ChannelOperationsHandler.java:74) ~[reactor-netty-core-1.0.3.jar:1.0.3] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelInputClosed(ByteToMessageDecoder.java:389) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelInactive(ByteToMessageDecoder.java:354) ~[netty-codec-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.r2dbc.postgresql.client.ReactorNettyClient$EnsureSubscribersCompleteChannelHandler.channelInactive(ReactorNettyClient.java:535) ~[r2dbc-postgresql-0.8.6.RELEASE.jar:0.8.6.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:819) ~[netty-transport-4.1.58.Final.jar:4.1.58.Final] at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.58.Final.jar:4.1.58.Final] at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[netty-common-4.1.58.Final.jar:4.1.58.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384) ~[netty-transport-native-epoll-4.1.58.Final-linux-x86_64.jar:4.1.58.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.58.Final.jar:4.1.58.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.58.Final.jar:4.1.58.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.58.Final.jar:4.1.58.Final] at java.lang.Thread.run(Thread.java:834) [?:?] ```

Table schema

Input Code ```sql -- your SQL here; ```

Steps to reproduce

Input Code ```java // your code here; ```

Expected behavior/code

Possible Solution

Additional context

mp911de commented 3 years ago

How do you configure the connection pool? Specifically, do you configure ValidationDepth or a validation query?

konsmak commented 3 years ago

I used the pool with default settings and the number of connections reduced to 1 because it's easier to reproduce the issue that way. Can still reproduce it with 10 connections, which is the default value, but it takes more DB restarts to get into a bad state. It looks like each DB restart causes a few connections to get stale. As suggested, I tried adding the following validation properties, but they didn't make any difference:

validationQuery: "SELECT 1"
validationDepth: REMOTE

I've also noticed that this happens only when I use transactions, disabling them fixes the issue. Transactions are enabled in the following way:

val txOperator = TransactionalOperator.create(txManager)
val flux = /* flux assembly */
flux.`as`(txOperator::transactional)
konsmak commented 2 years ago

Hi. Could you please let me know whether there are any updates on this issue or maybe you could advise what to look at so I could try debugging it myself? Thank you!

rplowright commented 2 years ago

Hi, are there any updates on this issue?

mp911de commented 2 years ago

Right now, I don't have a clue whether the driver has an issue or whether the pool is missing a guard somewhere. Also, restarting a server while a connection is active (and therefore, the connection breaks) should result in an error like PostgresConnectionClosedException. However, after the server is back up again, PostgresConnectionClosedException should go away as the pool should identify that the connection is not valid anymore.

judeKim commented 2 years ago

I also had the same case. However, it was resolved after updating to the latest version. @konsmak Please refer below to update the version and test it.

common : postgreSQL 13.5

before environment

current environment (resolved)

amorozov commented 9 months ago

Likely the pool should intercept errors of these kinds (in my case io.r2dbc.spi.R2dbcNonTransientResourceException) and simply re-establish connection(s). At least I would expect it to behave this way.

I can easily reproduce the error if I run r2dbc connection to PG over an SSH tunnel. Just restart the tunnel, and all current connections are effectively broken yet not closed.

menakaprabu commented 8 months ago

Hi @mp911de , Thanks for finding the root cause and posting it. I am getting the same issue "org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection nested exception is io.r2dbc.spi.R2dbcNonTransientResourceException: Connection validation failed" in MSSQL. I am using below libraries(Version).

io.r2dbc r2dbc-pool 1.0.0.RELEASE
    <dependency>
        <groupId>io.r2dbc</groupId>
        <artifactId>r2dbc-mssql</artifactId>
        <version>1.0.0.RELEASE</version>
    </dependency>

r2dbc-spi-1.0.0.RELEASE.jar Please help.