Closed rogtejada closed 2 years ago
I applied a change that ensures cancel signals do not interrupt the allocation sequence of connections. Can you test against 0.8.8
(r2dbc-pool-0.8.8.BUILD-20211027.135219-2.jar
) snapshots available from OSS Sonatype to verify whether you can reproduce the issue or whether it is fixed?
@mp911de
We have repeated the tests against the new version, the connection keeps geeting "stucked".
pid | datname | usename | application_name | backend_start | query_start | query |
---|---|---|---|---|---|---|
7463 | postgres | postgres | sample | 2021-10-27 11:51:31.626 | 2021-10-27 11:51:34.253 | SELECT 1 |
7464 | postgres | postgres | sample | 2021-10-27 11:51:31.626 | 2021-10-27 11:51:34.490 | SELECT 1 |
7465 | postgres | postgres | sample | 2021-10-27 11:51:34.505 | 2021-10-27 11:51:37.468 | SELECT 1 |
LOGFILE:
https://github.com/jfrossetto/connection-stuck/blob/stress-cancel-pool088/log_connectionStuck2.txt
I'm still analyzing what is going on. My thesis right now is that we've uncovered a bigger issue that might span across all involved components. I suspect that we've addressed one issue in R2DBC Pool already, but there's more involved such as Spring's Transaction management that can potentially lead to resource leaks due to cancel interrupting active flows.
I had a look at the involved components. Both, Spring's ConnectionFactoryUtils
and AbstractReactiveTransactionManager
perform a number of operations on the connection and the transaction so in consequence a cancel signal can easily interrupt such operations. That results in a connection being allocated somewhere and dropped within the processing.
I think we did everything we could on the pooling side. Now it's time to tackle the issue within Spring Framework.
New issue on Spring Framework https://github.com/spring-projects/spring-data-r2dbc/issues/674
@mp911de
We have created a new project without spring dependencies and the connection keeps getting stucked.
You can find the source code here: https://github.com/jfrossetto/r2dbc-stress-cancel
pid | datname | usename | application_name | backend_start | query_start | query |
---|---|---|---|---|---|---|
3065 | postgres | postgres | sample | 2021-11-01 10:02:41.308 | 2021-11-01 10:02:45.159 | SELECT 1 |
3064 | postgres | postgres | sample | 2021-11-01 10:02:41.307 | 2021-11-01 10:02:46.358 | SELECT 1 |
3066 | postgres | postgres | sample | 2021-11-01 10:02:45.640 | 2021-11-01 10:02:50.739 | SELECT 1 |
The logfile for the tests: https://github.com/jfrossetto/r2dbc-stress-cancel/blob/master/log_stresscancel.txt
Bug Report
Versions
Current Behavior
This issue was first opened on r2dbc-postgresql, some discussion can be seen there. https://github.com/pgjdbc/r2dbc-postgresql/issues/459
We are facing some issues related to stuck connections in the pool. After some investigation we could identify the problem happening in our search flow. Our frontend has a combobox with some kind of "debounce" which cancel resquests while user keep typing and sends only one request to our server. When these requests gets cancelled right after the validation query have runned establishing the connection as healthy, and before the real SQL query starts the connections never gets released. This is a very specific moment on the chain and its hard to reproduce.
We have found this post, that seems to be related but the suggested answer doesn't really helps, and the posted code sample does not leads to reproduce our problem. https://stackoverflow.com/questions/68407202/r2dbc-pool-connection-not-released-after-cancel
We could reproduce the behavior by calling any endpoint thats fetchs data from database and using a breakpoint after the connection gets established and before the query starts, then we force the request's cancel and release the breakpoint in our application, this way the connection always gets stuck.
The last query runned by the connection is always the validation query, in our case a simple "SELECT 1":
The breakpoint was placed on the class DefaultFetchSpec from org.springframework.r2dbc.core, inside the public Flux all() method, but this is not a consistent way to reproduce it.
Another way we was able to reproduce it was by "stressing" the application and simulating several consecutives "network failures" forcing the cancel of the request, the log file produced by this approach can de found here https://github.com/rogtejada/connection-stuck/blob/main/log_connectionStuck.txt
Analyzing the logs seems that when the connection gets "stuck" the FluxDiscardOnCancel does not show up in the logs All connections are stuck with field query filled with value SELECT 1
The following stack strace does not shows in our original implementation, but when switching to the connection just as shows in the pool readme it happens.
Stack trace (when creating the connection just as specified on driver README)
``` org.springframework.dao.DataAccessResourceFailureException: Failed to obtain R2DBC Connection; nested exception is io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException: [53300] sorry, too many clients already at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:88) ~[spring-r2dbc-5.3.7.jar:5.3.7] Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: Error has been observed at the following site(s): |_ checkpoint ⇢ Handler com.example.connectionstuck.UserController#sample(List, ServerWebExchange) [DispatcherHandler] |_ checkpoint ⇢ HTTP GET "/v1/sample?ids=3,2" [ExceptionHandlingWebHandler] Stack trace: at org.springframework.r2dbc.connection.ConnectionFactoryUtils.lambda$getConnection$0(ConnectionFactoryUtils.java:88) ~[spring-r2dbc-5.3.7.jar:5.3.7] at reactor.core.publisher.Mono.lambda$onErrorMap$31(Mono.java:3488) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.Operators.error(Operators.java:197) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.Operators.error(Operators.java:197) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:106) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.Operators.error(Operators.java:197) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.MonoError.subscribe(MonoError.java:52) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.Mono.subscribe(Mono.java:4150) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.MonoDelayUntil$DelayUntilTrigger.onError(MonoDelayUntil.java:514) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:202) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:124) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:326) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154) ~[reactor-core-3.4.6.jar:3.4.6] at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:735) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:986) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7.jar:1.0.7] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7.jar:1.0.7] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7.jar:1.0.7] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7.jar:1.0.7] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) ~[netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:4.1.65.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:4.1.65.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:4.1.65.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na] Caused by: io.r2dbc.postgresql.ExceptionFactory$PostgresqlNonTransientResourceException: sorry, too many clients already at io.r2dbc.postgresql.ExceptionFactory.createException(ExceptionFactory.java:99) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.ExceptionFactory.handleErrorResponse(ExceptionFactory.java:111) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:102) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:326) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxCreate$SerializedFluxSink.next(FluxCreate.java:154) ~[reactor-core-3.4.6.jar:3.4.6] at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.emit(ReactorNettyClient.java:735) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:986) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:860) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:767) ~[r2dbc-postgresql-0.8.8.RELEASE.jar:0.8.8.RELEASE] at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:118) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220) ~[reactor-core-3.4.6.jar:3.4.6] at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:280) ~[reactor-netty-core-1.0.7.jar:1.0.7] at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:389) ~[reactor-netty-core-1.0.7.jar:1.0.7] at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401) ~[reactor-netty-core-1.0.7.jar:1.0.7] at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) ~[reactor-netty-core-1.0.7.jar:1.0.7] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final] at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final] at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) ~[netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:4.1.65.Final] at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) ~[netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:4.1.65.Final] at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) ~[netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:4.1.65.Final] at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final] at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na] ```Table schema
Input Code
```sql -- your SQL here; ```Steps to reproduce
Seems like the timing of the request's cancel is what causes the bug, that makes it hard to reproduce. So we are not managed to find a consistent way to reproduce it.
Input Code
```java final ConnectionFactory connectionFactory = new PostgresqlConnectionFactory( PostgresqlConnectionConfiguration.builder() .host(host) .port(Integer.parseInt(port)) .username(username) .password(password) .database(database) .connectTimeout(Duration.ofMillis(Long.parseLong(connectionTimeout))) .fetchSize(1000) .preparedStatementCacheQueries(-1) .schema("public") .tcpKeepAlive(false) .tcpNoDelay(true) .options(options) .applicationName(applicationName) .build()); return new ConnectionPool( ConnectionPoolConfiguration.builder(connectionFactory) .maxIdleTime(Duration.ofMillis(Long.parseLong(idleTimeout))) .initialSize(Integer.parseInt(initialSize)) .maxSize(Integer.parseInt(maxSize)) .acquireRetry(3) .maxLifeTime(Duration.ofMillis(Long.parseLong(maxLifetime))) .validationQuery("SELECT 1") .build()); ```Expected behavior/code
I would expect that the connection will always get released no matter which moment the requests gets canceled
Possible Solution
There was an interesting discussion running here https://github.com/r2dbc/r2dbc-spi/issues/35
Maybe it could help in this problem.
Additional context