r2dbc / r2dbc-pool

Connection Pooling for Reactive Relational Database Connectivity
https://r2dbc.io
Apache License 2.0
331 stars 55 forks source link

Connection leaks (not released) when concurrent query is canceled #198

Open agorbachenko opened 11 months ago

agorbachenko commented 11 months ago

Bug Report

Versions

Current Behavior

When you have some database query zipped with other action failing for some reason, your query is canceled, database connection is released, BUT it seems that it also affects another concurrently running query leading to its connection not being released.

See the reproducer project for the details: https://github.com/agorbachenko/r2dbc-connection-leak-demo

Output sample:

2023-09-07T10:15:44.303+03:00  INFO 34234 --- [           main] c.e.r.R2dbcConnectionLeakDemoApplication : Started R2dbcConnectionLeakDemoApplication in 3.649 seconds (process running for 4.734)
2023-09-07T10:15:44.560+03:00 DEBUG 34234 --- [actor-tcp-nio-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:44.564+03:00 DEBUG 34234 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:45.326+03:00 DEBUG 34234 --- [     parallel-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:45.331+03:00 DEBUG 34234 --- [     parallel-2] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo]
2023-09-07T10:15:45.345+03:00  INFO 34234 --- [     parallel-5] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:45.346+03:00 DEBUG 34234 --- [     parallel-5] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:45.347+03:00 DEBUG 34234 --- [     parallel-5] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:45.348+03:00 DEBUG 34234 --- [     parallel-5] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo WHERE demo.id = $1 LIMIT 2]
2023-09-07T10:15:45.364+03:00 DEBUG 34234 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:46.326+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.acquired: 0.0
2023-09-07T10:15:46.327+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.pending: 0.0
2023-09-07T10:15:46.327+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.idle: 1.0
2023-09-07T10:15:46.365+03:00 DEBUG 34234 --- [     parallel-6] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:46.366+03:00 DEBUG 34234 --- [     parallel-6] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo]
2023-09-07T10:15:46.378+03:00  INFO 34234 --- [     parallel-8] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:46.378+03:00 DEBUG 34234 --- [     parallel-8] io.r2dbc.pool.PooledConnection           : Releasing connection
2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo WHERE demo.id = $1 LIMIT 2]
2023-09-07T10:15:47.393+03:00  INFO 34234 --- [     parallel-2] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:48.322+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.acquired: 1.0
2023-09-07T10:15:48.322+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.pending: 0.0
2023-09-07T10:15:48.322+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.idle: 0.0
2023-09-07T10:15:48.413+03:00  INFO 34234 --- [     parallel-4] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:49.427+03:00  INFO 34234 --- [     parallel-6] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:50.323+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.acquired: 1.0
2023-09-07T10:15:50.323+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.pending: 0.0
2023-09-07T10:15:50.323+03:00  INFO 34234 --- [     parallel-3] c.example.r2dbc.R2dbcConnectionLeakDemo  : r2dbc.pool.idle: 0.0
2023-09-07T10:15:50.446+03:00  INFO 34234 --- [     parallel-8] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled
2023-09-07T10:15:51.464+03:00  INFO 34234 --- [     parallel-2] c.example.r2dbc.R2dbcConnectionLeakDemo  : Canceled

This connection was not released:

2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2023-09-07T10:15:46.379+03:00 DEBUG 34234 --- [     parallel-8] o.s.r2dbc.core.DefaultDatabaseClient     : Executing SQL statement [SELECT demo.* FROM demo WHERE demo.id = $1 LIMIT 2]

pg_stat_activity shows that the connection has not changed its state since then

Stack trace ``` // your stack trace here ```

Table schema

Input Code ```sql -- your SQL here; ```

Steps to reproduce

Input Code ```java // your code here; ```

Expected behavior/code

Possible Solution

Additional context

jivkodobrev commented 7 months ago

@agorbachenko Hi Alexei - I ran your repro application but was not able to reproduce the issue. It ran for about 10 minutes on my machine locally. Built with Java 18 via gradle bootRun Is there something specific that needs to be done to reproduce?

We have an issue which seems similar (#206) and I was hoping to observe the repro to understand the problem. Thanks in advance!

UPDATE: I actually was able to reproduce the issue, it ran for about 25 minutes and ran into the problem. I will investigate and will keep you posted here.

cc @SimoneGiusso

SimoneGiusso commented 6 months ago

I actually was able to reproduce the issue, it ran for about 25 minutes and ran into the problem.

btw if you change the delaySubscription in the runSomethingBadPeriodically method to 100ms or even 10ms the issue will manifest after few seconds. I also propose to remove the using of the spring DefaultDatabaseClient just to make it lighter and to exclude issue coming from spring. I also used 1 thread:

package com.example.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.List;

import static io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;

@Component
public class R2dbcConnectionLeakDemo {

    private final ConnectionFactory pooledConnectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(DRIVER, "pool")
        .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
        .option(HOST, "127.0.0.1")
        .option(PORT, 5432)
        .option(USER, "postgres")
        .option(PASSWORD, "demo")
        .option(DATABASE, "postgres")
        .option(MAX_SIZE, 1)
        .option(INITIAL_SIZE, 1)
        .build());

    @EventListener(ApplicationReadyEvent.class)
    private void runGoodQueryPeriodically() {
        Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.* FROM demo WHERE demo.id = 1 LIMIT 2")
                    .execute()),
                Connection::close)
            .delaySubscription(Duration.ofMillis(1000), Schedulers.single())
            .repeat()
            .subscribe();
    }

    @EventListener(ApplicationReadyEvent.class)
    private void runSomethingBadPeriodically() {
        getDemoEntitiesAsList()
            .zipWith(runSomethingFailing())
            .delaySubscription(Duration.ofMillis(10), Schedulers.single())
            // resume on error to repeat
            .onErrorResume(e -> Mono.empty())
            .repeat()
            .subscribe();
    }

    private Mono<List<Integer>> getDemoEntitiesAsList() {
        return Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.id FROM demo")
                    .execute()),
                Connection::close)
            .map(result -> result.map(readable -> readable.get(0, Integer.class)))
            .flatMapMany(Flux::from)
            .collectList();
    }

    private static Mono<?> runSomethingFailing() {
        return Mono.error(RuntimeException::new)
            .delaySubscription(Duration.ofMillis(10), Schedulers.single());
    }

}

Of course I confirm that the issue is still present:

...
2024-02-09T12:30:49.868+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-09T12:30:49.880+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-09T12:30:49.891+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-09T12:30:49.904+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-09T12:30:49.917+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-09T12:30:49.923+01:00 DEBUG 94597 --- [       single-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
// After a while it cannot obtain a connection anymore and it hangs here
jivkodobrev commented 6 months ago

Thanks! I haven't had the chance to investigate, hopefully I can get back to this in the beginning of March

mp911de commented 6 months ago

You can add a bit more debugging logging via Mono.log(), ideally on the Mono producing usingWhen to detect whether a cancellation signal is being propagated into usingWhen.

SimoneGiusso commented 6 months ago

You can add a bit more debugging logging via Mono.log(), ideally on the Mono producing usingWhen to detect whether a cancellation signal is being propagated into usingWhen.

It seems be propagated?

I updated the code, also using different threads, it actually makes the log clear:

    package com.example.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.List;

import static io.r2dbc.pool.PoolingConnectionFactoryProvider.INITIAL_SIZE;
import static io.r2dbc.pool.PoolingConnectionFactoryProvider.MAX_SIZE;
import static io.r2dbc.spi.ConnectionFactoryOptions.*;

@Component
public class R2dbcConnectionLeakDemo {

    private final ConnectionFactory pooledConnectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
        .option(DRIVER, "pool")
        .option(PROTOCOL, "postgresql") // driver identifier, PROTOCOL is delegated as DRIVER by the pool.
        .option(HOST, "127.0.0.1")
        .option(PORT, 5432)
        .option(USER, "postgres")
        .option(PASSWORD, "demo")
        .option(DATABASE, "postgres")
        .option(MAX_SIZE, 1)
        .option(INITIAL_SIZE, 1)
        .build());

    @EventListener(ApplicationReadyEvent.class)
    private void runGoodQueryPeriodically() {
        Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.* FROM demo WHERE demo.id = 1 LIMIT 2")
                    .execute()),
                Connection::close)
            .log()
            .delaySubscription(Duration.ofMillis(1000), Schedulers.newSingle("GoodThread"))
            .repeat()
            .subscribe();
    }

    @EventListener(ApplicationReadyEvent.class)
    private void runSomethingBadPeriodically() {
        getDemoEntitiesAsList()
            .zipWith(runSomethingFailing())
            .delaySubscription(Duration.ofMillis(10), Schedulers.newSingle("LeakingThread"))
            // resume on error to repeat
            .onErrorResume(e -> Mono.empty())
            .repeat()
            .subscribe();
    }

    private Mono<List<Integer>> getDemoEntitiesAsList() {
        return Mono.usingWhen(pooledConnectionFactory.create(),
                connection -> Mono.from(connection.createStatement("SELECT demo.id FROM demo")
                    .execute()),
                Connection::close)
            .log()
            .map(result -> result.map(readable -> readable.get(0, Integer.class)))
            .flatMapMany(Flux::from)
            .collectList();
    }

    private static Mono<?> runSomethingFailing() {
        return Mono.error(RuntimeException::new)
            .delaySubscription(Duration.ofMillis(10), Schedulers.newSingle("BadThread"));
    }

}
2024-02-12T12:58:33.743+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.743+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.743+01:00 DEBUG 24946 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.755+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.756+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T12:58:33.768+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.768+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.769+01:00 DEBUG 24946 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.781+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.782+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T12:58:33.794+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.794+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.795+01:00 DEBUG 24946 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.807+01:00  INFO 24946 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.807+01:00  INFO 24946 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)
2024-02-12T12:58:33.807+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.807+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T12:58:33.807+01:00 DEBUG 24946 --- [    BadThread-4] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T12:58:33.821+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.821+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.833+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.845+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.845+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.858+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.871+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.871+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.883+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T12:58:33.896+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:33.896+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:33.907+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()

It seems that the cancellation signal happening on the BadThread causes the release of the connection. However something happens when there is a concurrent query. I.e. When the concurrent GoodThread is going to acquire a connection. Meanwhile the BadThread is releasing and right after acquiring the connection. From this point everything is stuck. The GoodThread is probably waiting to get the connection (I don't have any others log coming from this thread).

The odd thing is that the LeakingThread + BadThread is continuously running (why), but apparently is not trying to get a connection from the pool, otherwise I'd not seeing it logging repeatedly:

2024-02-12T12:58:35.134+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T12:58:35.134+01:00  INFO 24946 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T12:58:35.146+01:00  INFO 24946 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()

I changed this line:

.delaySubscription(Duration.ofMillis(100), Schedulers.newSingle("LeakingThread")) // delayed increased 10 times.

and the log slightly change:

2024-02-12T13:11:01.839+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:01.840+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:01.840+01:00 DEBUG 25171 --- [LeakingThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:11:01.851+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T13:11:01.852+01:00 DEBUG 25171 --- [    BadThread-4] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:11:01.923+01:00  INFO 25171 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:01.924+01:00  INFO 25171 --- [   GoodThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)
2024-02-12T13:11:01.924+01:00 DEBUG 25171 --- [   GoodThread-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:11:01.957+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:01.957+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:01.970+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T13:11:02.073+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:02.074+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:02.086+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()
2024-02-12T13:11:02.192+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:11:02.192+01:00  INFO 25171 --- [LeakingThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:11:02.205+01:00  INFO 25171 --- [    BadThread-4] reactor.Mono.UsingWhen.1                 : cancel()

Now the GoodThread is also logging the attempt to Obtaining a new connection. It wasn't the case before. Moreover the BadThread seems already releasing the connection. However the leak occurs.

Finally I decided to increase the delay to 1250ms (less frequently than the good query which runs every 1000ms):

.delaySubscription(Duration.ofMillis(1250), Schedulers.newSingle("LeakingThread"))

At least at the beginning everything seems working and the Log looks completely different:

2024-02-12T13:20:49.931+01:00 DEBUG 25449 --- [LeakingThread-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:49.936+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:49.936+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:49.936+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onComplete()
2024-02-12T13:20:49.946+01:00  INFO 25449 --- [    BadThread-4] reactor.Mono.UsingWhen.2                 : cancel()
2024-02-12T13:20:50.791+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:50.792+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:20:50.793+01:00 DEBUG 25449 --- [   GoodThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:50.800+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:50.801+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:50.801+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onComplete()
2024-02-12T13:20:51.202+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:51.203+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)
2024-02-12T13:20:51.203+01:00 DEBUG 25449 --- [LeakingThread-2] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:51.209+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:51.209+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:51.209+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.2                 : onComplete()
2024-02-12T13:20:51.217+01:00  INFO 25449 --- [    BadThread-4] reactor.Mono.UsingWhen.2                 : cancel()
2024-02-12T13:20:51.805+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:51.806+01:00  INFO 25449 --- [   GoodThread-1] reactor.Mono.UsingWhen.1                 : request(unbounded)
2024-02-12T13:20:51.806+01:00 DEBUG 25449 --- [   GoodThread-1] io.r2dbc.pool.ConnectionPool             : Obtaining new connection from the pool
2024-02-12T13:20:51.812+01:00 DEBUG 25449 --- [actor-tcp-nio-1] io.r2dbc.pool.PooledConnection           : Releasing connection
2024-02-12T13:20:51.813+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onNext(PostgresqlResult{context=ConnectionContext{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9, connection=PostgresqlConnection{client=io.r2dbc.postgresql.client.ReactorNettyClient@10a36d1b, codecs=io.r2dbc.postgresql.codec.DefaultCodecs@157ae3d9}, configuration=PostgresqlConnectionConfiguration{applicationName='r2dbc-postgresql', autodetectExtensions='true', compatibilityMode=false, connectTimeout=null, errorResponseLogLevel=DEBUG, database='postgres', extensions=[], fetchSize=io.r2dbc.postgresql.PostgresqlConnectionConfiguration$Builder$$Lambda$642/0x00000008004bc240@34d713a2, forceBinary='false', lockWaitTimeout='null, loopResources='null', multiHostConfiguration='null', noticeLogLevel='DEBUG', options='{}', password='****', preferAttachedBuffers=false, singleHostConfiguration=SingleHostConfiguration{host='127.0.0.1', port=5432, socket='null'}, statementTimeout=null, tcpKeepAlive=false, tcpNoDelay=true, timeZone=sun.util.calendar.ZoneInfo[id="Europe/Zurich",offset=3600000,dstSavings=3600000,useDaylight=true,transitions=119,lastRule=java.util.SimpleTimeZone[id=Europe/Zurich,offset=3600000,dstSavings=3600000,useDaylight=true,startYear=0,startMode=2,startMonth=2,startDay=-1,startDayOfWeek=1,startTime=3600000,startTimeMode=2,endMode=2,endMonth=9,endDay=-1,endDayOfWeek=1,endTime=3600000,endTimeMode=2]], username='postgres'}, portalNameSupplier=io.r2dbc.postgresql.DefaultPortalNameSupplier@ce2d296, statementCache=IndefiniteStatementCache{cache={}, counter=0}}, messages=WindowFlux})
2024-02-12T13:20:51.813+01:00  INFO 25449 --- [actor-tcp-nio-1] reactor.Mono.UsingWhen.1                 : onComplete()
2024-02-12T13:20:52.473+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : onSubscribe(MonoUsingWhen.ResourceSubscriber)
2024-02-12T13:20:52.474+01:00  INFO 25449 --- [LeakingThread-2] reactor.Mono.UsingWhen.2                 : request(unbounded)

The BadThread, I'm not sure why, seems kicking in after the LeakingThread has finished to run the query (onComplete()). Moreover this time the query is currently running I suppose since there is the reactor-tcp-nio-1 thread.

mp911de commented 6 months ago

Releasing connection is logged upon subscription to the Connection.close() publisher. This indicates that usingWhen isn't subscribing to the publisher and in that regard, it is more likely a Reactor Core issue.

chemicL commented 6 months ago

@mp911de Hey :) I was summoned here from https://github.com/reactor/reactor-core/issues/3695. Please note that the original reporter is not using the usingWhen operator. I am trying to understand the issues that users run into with the API and javadoc of that operator and I am failing to see where it might fail to deliver on its promises or where improvements are needed. This example seems at first to involve the operator, however, as just noted - the problem reported also manifests when this operator is not involved at all. Perhaps the issue lies elsewhere (either in other parts of reactor-core or in r2dbc)? I'd suggest to investigate the problem that doesn't involve the usingWhen operator first and once that is fixed we can see if usingWhen is still a source of issues or perhaps behaves as expected.

agorbachenko commented 6 months ago

@chemicL, hmm... As I understand, my example uses usingWhen implicitly via DefaultDatabaseClient's org.springframework.r2dbc.core.DefaultDatabaseClient#inConnection: SimpleR2dbcRepository -> R2dbcEntityTemplate implements R2dbcEntityOperations -> DatabaseClient. Doesn't it?

chemicL commented 6 months ago

@agorbachenko thanks for the quick response. I was not aware of the internals in Spring, I just had a look at r2dbc sources. I'll have a look then!

SimoneGiusso commented 6 months ago

I was not aware of the internals in Spring, I just had a look at r2dbc sources. I'll have a look then!

Btw @chemicL I revised the author example keeping the test logic and its goal on showing the issue. However I removed the "Spring" layer, hoping it make it easier to understand. I don't think you have to look at the spring internal if you just look at the revised example here.

chemicL commented 6 months ago

Thanks @SimoneGiusso. We experimented together with @mp911de today with both examples. Thanks for providing your version of it! What I can say is that usingWhen is definitely not at fault here but handling of cancellation at the phase before the connection is acquired. When the connection is abandoned before being handed over, it ends up in the pool. However, the execution never picks up the allocated connection nor cleans it up going forward. So I'll try to investigate a bit further and we can see together with @mp911de if something elsewhere in reactor-core is at fault, or the use of it in r2dbc, or in reactor-pool...

mp911de commented 6 months ago

Thanks to @chemicL's investigation it seems that the underlying Reactor Pool causes the issue. The related issue is https://github.com/reactor/reactor-pool/issues/124. I think the problem is going to be addressed there so closing the ticket here.

mp911de commented 5 months ago

@chemicL and I ran some investigations. For now, it seems that the issue only happens when there is an actual driver involved. So far, we tested only against Postgres. Using R2DBC Mocks doesn't reproduce the problem so we need to widen our scope again.

Anyone willing to help looking into the cause is welcome.

chemicL commented 5 months ago

What I have found is that the connection acquisition is working properly and the connection is not stuck in the pool, but is being held by an actor that fails to consume the responses for an ongoing query. The scenario is as follows:

  1. bad actor acquires the connection
  2. bad actor starts an exchange and sends the request
  3. a response is being generated and the bad actor starts consuming
  4. bad actor cancels before consuming the response entirely
  5. bad actor returns the connection to the pool
  6. good actor acquires the connection
  7. good actor sends a request
  8. good actor never receives the response

So with that scenario I replicated what was observed using the Postgres driver to check just the integration and the code is in https://github.com/chemicL/r2dbc-lab. However, all the tests pass in those mocked scenarios. Further investigation is needed.

It looks as if the reactor-pool issue mentioned above is not the root cause here.

cyberluke commented 4 months ago

@chemicL Hello Dariusz, greetings to Krakow! Yesterday I checked your r2dbc-lab in order to reproduce the problem. But your current example is using H2, not Postgre. I had to rewrite it for Postgre. Anyway I cannot reproduce the problem with bad actor. Can we help each other?

chemicL commented 4 months ago

Hey, @cyberluke 👋 I'm afraid I don't understand - r2dbc-lab has no actual dependency on H2 nor Postgresql. It uses only mocks with r2dbc-pool, r2dbc-spi, and r2dbc-spi-test. Can you elaborate? Also, r2dbc-lab shows in fact that using the primitives from the SPI there is no issue - the problem is not reproducible in isolation, only using the examples provided by @SimoneGiusso and @agorbachenko.

cyberluke commented 4 months ago

@chemicL You're right, too much work. That H2 database was used in R2DBC Test case to reproduce the bug from another issue here.

blangenberg commented 2 weeks ago

Is there a status update on this issue?