spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.45k stars 38.08k forks source link

An autonomous transaction cancellation by a reactive chain timeout operator results in r2dbc connections leakage #32115

Open evgenyvsmirnov opened 8 months ago

evgenyvsmirnov commented 8 months ago

Greetings! Please consider the test below (reactor + r2dbc-postgresql + r2dbc-pool). A reactive chain within the test begins a transaction (outer), inserts a record, begins an autonomous transaction (inner). Inner attempts to insert a record with the same data thus being unable to proceed (outer is still in progress) is cancelled by the reactive chain timeout operator:

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.RepeatedTest
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.r2dbc.core.DatabaseClient
import org.springframework.transaction.ReactiveTransactionManager
import org.springframework.transaction.TransactionDefinition
import org.springframework.transaction.reactive.TransactionalOperator
import org.springframework.transaction.support.DefaultTransactionDefinition
import reactor.core.publisher.Mono
import java.time.Duration
import java.util.*
import java.util.concurrent.TimeoutException

class TimeoutsTest : R2dbcRepositoryTestBase() {

    @Autowired
    private lateinit var databaseClient: DatabaseClient

    @Autowired
    private lateinit var transactionManager: ReactiveTransactionManager

    //@Test
    @RepeatedTest(value = 30, failureThreshold = 1)
    fun testTimeout() {
    // Jdbc is used for the sake of DB initialization only .
        SERVER.getJdbcOperations()
            ?.execute(
                """
                    DROP TABLE IF EXISTS users_TimeoutsTest;

                    CREATE TABLE IF NOT EXISTS users_TimeoutsTest (
                         userId SERIAL PRIMARY KEY,
                         userName VARCHAR NOT NULL,
                         email VARCHAR NOT NULL,
                         CONSTRAINT unique_user_TimeoutsTest UNIQUE (userName),
                         CONSTRAINT unique_email_TimeoutsTest UNIQUE (email)
                    );
                """.trimIndent()
            )

        val outerTransactionalOperator = TransactionalOperator.create(
            transactionManager,
            DefaultTransactionDefinition() .apply {
                isolationLevel = TransactionDefinition.ISOLATION_READ_COMMITTED
            }
        )

        val result: Mono<String> =
            databaseClient.sql("INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1000, 'test', 'test@example.com') RETURNING userId")
                .map { row, _ -> row.get(0).toString() }
                .one()
                .flatMap {
                    val innerTransactionalOperator = TransactionalOperator.create(
                        transactionManager,
                        DefaultTransactionDefinition() .apply {
                            isolationLevel = TransactionDefinition.ISOLATION_REPEATABLE_READ
                            propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW
                        }
                    )
                    databaseClient.sql("INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1001, 'test', 'test@example.com') RETURNING userId")
                        .map { row, _ -> row.get(0).toString() }
                        .one()
                        .`as`(innerTransactionalOperator::transactional)
                        .timeout(Duration.ofSeconds(5))
                        .onErrorResume(TimeoutException::class.java) { Mono.just("Inner failure") }
                }
                .`as`(outerTransactionalOperator::transactional)
                .onErrorResume {
                    databaseClient.sql("SELECT COUNT(*) FROM users_TimeoutsTest WHERE userId=1000")
                        .map { row, _ -> row.get(0) }
                        .one()
                        .map { if (it == 1) "Inner failure" else "Outer failure" }
                }

        assertEquals("Inner failure", result.block())
    }
}

Environment:

  1. springframework: 6.1.2
  2. reactor-core: 3.6.1
  3. r2dbc-pool: 1.0.1
  4. r2dbc-postgresql: 1.0.3

The outcomes:

  1. Always see the following stacktrace:

    java.lang.IllegalStateException: No value for key [ConnectionPool[PostgreSQL]] bound to context
    at org.springframework.transaction.reactive.TransactionSynchronizationManager.unbindResource(TransactionSynchronizationManager.java:151) ~[spring-tx-6.1.2.jar:6.1.2]
    at org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$doCleanupAfterCompletion$14(R2dbcTransactionManager.java:350) ~[spring-r2dbc-6.1.2.jar:6.1.2]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:45) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:53) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4512) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:103) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onError(MonoIgnoreThen.java:279) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Operators.error(Operators.java:198) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:49) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:264) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:204) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:210) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2231) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206) ~[reactor-core-3.6.1.jar:3.6.1]
    at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
    at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716) ~[r2dbc-postgresql-1.0.3.RELEASE.jar:1.0.3.RELEASE]
    at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224) ~[reactor-core-3.6.1.jar:3.6.1]
    at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.14.jar:1.1.14]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
    at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
  2. the exchange with the database is the following :

        372     3.019813    127.0.0.1   56603   127.0.0.1   32799   PGSQL   110 >Q ----------> BEGIN ISOLATION LEVEL READ COMMITTED, READ WRITE
        373     3.019838    127.0.0.1   32799   127.0.0.1   56603   TCP 56  32799 → 56603 [ACK] Seq=1 Ack=55 Win=6373 Len=0 TSval=3610326549 TSecr=3240910966
        386     3.034130    127.0.0.1   32799   127.0.0.1   56603   PGSQL   73  <C/Z
        387     3.034157    127.0.0.1   56603   127.0.0.1   32799   TCP 56  56603 → 32799 [ACK] Seq=55 Ack=18 Win=6370 Len=0 TSval=3240910980 TSecr=3610326563
        434     3.038892    127.0.0.1   56603   127.0.0.1   32799   PGSQL   177 >Q ----------> INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1000, 'test', 'test@example.com') RETURNING userId
        435     3.038927    127.0.0.1   32799   127.0.0.1   56603   TCP 56  32799 → 56603 [ACK] Seq=18 Ack=176 Win=6372 Len=0 TSval=3610326568 TSecr=3240910985
        460     3.064530    127.0.0.1   32799   127.0.0.1   56603   PGSQL   125 <T/D/C/Z
        461     3.064544    127.0.0.1   56603   127.0.0.1   32799   TCP 56  56603 → 32799 [ACK] Seq=176 Ack=87 Win=6369 Len=0 TSval=3240911010 TSecr=3610326593
        711     3.093204    127.0.0.1   56601   127.0.0.1   32799   PGSQL   111 >Q ----------> BEGIN ISOLATION LEVEL REPEATABLE READ, READ WRITE (another connection)
        712     3.093236    127.0.0.1   32799   127.0.0.1   56601   TCP 56  32799 → 56601 [ACK] Seq=1 Ack=56 Win=6373 Len=0 TSval=200857867 TSecr=298528505
        729     3.104023    127.0.0.1   32799   127.0.0.1   56601   PGSQL   73  <C/Z
        730     3.104031    127.0.0.1   56601   127.0.0.1   32799   TCP 56  56601 → 32799 [ACK] Seq=56 Ack=18 Win=6370 Len=0 TSval=298528516 TSecr=200857878
        731     *REF*       127.0.0.1   56601   127.0.0.1   32799   PGSQL   177 >Q ----------> INSERT INTO users_TimeoutsTest (userId, userName, email) VALUES (1001, 'test', 'test@example.com') RETURNING userId
        732     0.000010    127.0.0.1   32799   127.0.0.1   56601   TCP 56  32799 → 56601 [ACK] Seq=18 Ack=177 Win=6372 Len=0 TSval=200857878 TSecr=298528516
        1155    5.006252    127.0.0.1   56601   127.0.0.1   32799   PGSQL   70  >Q ----------> ROLLBACK (inner)
        1156    5.006272    127.0.0.1   32799   127.0.0.1   56601   TCP 56  32799 → 56601 [ACK] Seq=18 Ack=191 Win=6371 Len=0 TSval=200862884 TSecr=298533522
        1179    5.017974    127.0.0.1   56603   127.0.0.1   32799   PGSQL   68  >Q ----------> COMMIT (outer)
        1180    5.017995    127.0.0.1   32799   127.0.0.1   56603   TCP 56  32799 → 56603 [ACK] Seq=87 Ack=188 Win=6371 Len=0 TSval=3610331651 TSecr=3240916068
        1186    5.047064    127.0.0.1   32799   127.0.0.1   56603   PGSQL   74  <C/Z
        1187    5.047120    127.0.0.1   56603   127.0.0.1   32799   TCP 56  56603 → 32799 [ACK] Seq=188 Ack=105 Win=6369 Len=0 TSval=3240916098 TSecr=3610331681
        1190    5.050271    127.0.0.1   32799   127.0.0.1   56601   PGSQL   289 <E/Z
        1191    5.050313    127.0.0.1   56601   127.0.0.1   32799   TCP 56  56601 → 32799 [ACK] Seq=191 Ack=251 Win=6367 Len=0 TSval=298533567 TSecr=200862929
        1204    5.054634    127.0.0.1   32799   127.0.0.1   56601   PGSQL   76  <C/Z
  3. Once in a while the test fails: the chain produces «Outer failure» (some concurrency issue?)

  4. The last and the worst: Should the RepeatedTest is used the test hangs after POOL_SIZE repetitions because a connection can’t be gained from the pool. The screenshot with the state of the pool below suggests that one of two connections is not returned into the pool during every test run. Снимок экрана 2024-01-25 в 14 09 41

sdeleuze commented 8 months ago

Can you please provide a reproducer as an attached archive or a link to a repository? If a database is needed, please provide a docker-compose.yml.

evgenyvsmirnov commented 8 months ago

gh32115.zip The attached archive contains the aforementioned test. The test employs testcontainers (postgresql 15.5). A short summary in the test's displayname summarizes its logic.

pkgonan commented 5 months ago

@evgenyvsmirnov @sdeleuze I have same problem. How did you solve this?

pkgonan commented 5 months ago

I think I'm having this problem because I'm using Propagation.REQUIRES_NEW.

@Transactional(propagation = Propagation.REQUIRES_NEW)

evgenyvsmirnov commented 5 months ago

@evgenyvsmirnov @sdeleuze I have same problem. How did you solve this?

I haven't – I've just reported it.