r2dbc / r2dbc-pool

Connection Pooling for Reactive Relational Database Connectivity
https://r2dbc.io
Apache License 2.0
331 stars 55 forks source link

Use `Schedulers.single()` to avoid accidental thread co-location #190

Closed dstepanov closed 1 year ago

dstepanov commented 1 year ago

Bug Report

The issue was originally reported here https://github.com/micronaut-projects/micronaut-data/issues/2136 I did debug a bit. It looks like some calculations aren't correct when min and max are the same, the pool only allows one connection. The example project has a reproducible example, setting maxSize bigger than initialSize makes the connections in parallel. This also explains some bad performances in TechEmpower/FrameworkBenchmarks because I configured the values with the same value.

mp911de commented 1 year ago

I wasn't able to reproduce any computation issue regarding the pool size. The pool itself properly creates the configured number of connections:

ConnectionPoolConfiguration configuration = ConnectionPoolConfiguration.builder(connectionFactoryMock)
        .initialSize(5)
        .maxSize(5)
        .build();
ConnectionPool pool = new ConnectionPool(configuration);

List<Connection> connections = new ArrayList<>();

for (int i = 0; i <5; i++) {
    pool.create().as(StepVerifier::create).consumeNextWith(connections::add).verifyComplete();
}
assertThat(connections).hasSize(5);

The event loop co-location is indeed an issue. It is the default setting in Reactor Netty, likely, this came from allocating channels for HTTP requests.

Let me know whether I missed something regarding the sizing.

dstepanov commented 1 year ago

It might have the correct size, but it only provides one connection.

You can run the example attached to the Micronaut Data issue and see that the queries are run sequentially. I changed the code a bit to reproduce it:

     List<Flux<Item>> fluxes = List.of(
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded1")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded2")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded3")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded4")),
                Flux.from(itemsRepository.findAll()).doOnComplete(() -> log.info("all loaded5"))
        );
        List<Mono<Long>> next = new ArrayList<>();
        for (Flux<Item> flux : fluxes) {
            next.add(
                    flux.count().doOnNext(number -> log.info("num:" + number))
            );
        }

        Flux.fromIterable(next).flatMap(x -> x).collectList().block();
mp911de commented 1 year ago

And this makes totally sense because if connections are co-located on the same event-loop thread, then a single thread can only run things sequentially. If you warmup the pool (ConnectionPool.warmup().block()), then you see a more parallel behavior:

15:45:53.640 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.645 [Test worker] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:45:53.653 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:45:53.655 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded3
15:45:53.655 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - num:0
15:45:53.656 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:45:53.656 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded2

Without warmup:

15:46:49.414 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:46:49.431 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:46:49.432 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded1
15:46:49.433 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - num:0
15:46:49.433 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.ConnectionPool - Obtaining new connection from the pool
15:46:49.435 [reactor-tcp-nio-1] DEBUG io.r2dbc.pool.PooledConnection - Releasing connection
15:46:49.435 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - all loaded2
15:46:49.435 [reactor-tcp-nio-1] INFO  com.example.R2dbctestTest - num:0
dstepanov commented 1 year ago

I didn't know that the pool needs to be initialized somehow, there is no mention of warmup in the readme. Do you suggest that the warmup should be called when the pool is built by the container?

PiotrDuz commented 1 year ago

Let me just add a thought here: Even if warmup helps here, I do not see it as a solution. We still will have more sockets than available threads, so some queries might get congested while other cores are available. What do you guys think of just creating a slightly modified DefaultLoopResources class, where instead of .colocate on onClient() method we would return a new Nio / Epoll eventLoopGroup with maxThreads = maxSize of connection pool? Regards

mp911de commented 1 year ago

Warmup can be a slight workaround, but as mentioned by @PiotrDuz, it's by no means a fix. In an ideal world, the pool would be event-loop aware and hand out a connection that runs on the same thread.

In reality, we do not have the event loop information being exposed from the connections. Additionally, pools of that size would negatively affect database servers.

Disabling colocation is the only viable solution for long-lived connections. I already reached out to the Reactor Netty team for guidance and will share it once I have more details.

mp911de commented 1 year ago

Since Reactor Netty 1.0.28, LoopResources can be created without co-location (see https://github.com/reactor/reactor-netty/pull/2655). The Postgres driver allows already configuration of LoopResources that help disabling co-location while other drivers (like SQL Server) do not yet have that configuration option.

Setting LoopResources creates additional EventLoopGroups increasing the system resource usage, however.

mp911de commented 1 year ago

During investigation with a customized LoopResources instance I found that the empty pool still behaves sequentially. I have no idea why this is, allocating a connection or warming the pool leads to the desired concurrency profile.

PiotrDuz commented 1 year ago

How does your custom loop resource look like? Here is the one that works for us:

@Override
    public EventLoopGroup onClient(boolean useNative) {
        return cacheLoops();
    }

EventLoopGroup cacheLoops() {
        EventLoopGroup eventLoopGroup = loops.get();
        if (null == eventLoopGroup) {
            EventLoopGroup newEventLoopGroup = createNewEventLoopGroup();
            if (!loops.compareAndSet(null, newEventLoopGroup)) {
                //"FutureReturnValueIgnored" this is deliberate
                newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
            }
            eventLoopGroup = cacheLoops();
        }
        return eventLoopGroup;
    }

    private NioEventLoopGroup createNewEventLoopGroup() {
        return new NioEventLoopGroup(threads, new ThreadPerTaskExecutor(new DefaultThreadFactory(THREAD_PREFIX)));
    }

where "threads" = pool max size

1528110566 commented 1 year ago

During investigation with a customized LoopResources instance I found that the empty pool still behaves sequentially. I have no idea why this is, allocating a connection or warming the pool leads to the desired concurrency profile.

@mp911de I found a problem when I calling warmup().subscibe() and findAll().block() in parallel, https://github.com/reactor/reactor-pool/issues/170 Is this the cause of your problem?

mp911de commented 1 year ago

@PiotrDuz there's a LoopResources.create(…) method taking a colocate boolean flag, introduced with Reactor Netty 1.0.28., It removes the need to implement LoopResources.

I additionally filed a ticket to reuse the default event loop groups with a LoopResources wrapper that doesn't apply colocation.

dstepanov commented 1 year ago

What is the best way to inject that LoopResources.create?

mp911de commented 1 year ago

For the Postgres driver, either via PostgresqlConnectionConfiguration.Builder#loopResources(…) or via the R2DBC config SPI:

ConnectionFactoryOptions.builder()
 .option(Option.valueOf("loopResources"), loopResources). …
 .build();
samueldlightfoot commented 1 year ago

@mp911de Is there an agreed fix R2DBC Postgres users can apply to avoid this? I've had a read through this thread but have found it difficult to piece together what exactly is needed.

artemik commented 1 year ago

@dstepanov Big thanks for spotting the area of the cause! I've been struggling a lot (looking down to PG TCP packets) to find it until I noticed R2DBC is really using only a single thread like you said.

@mp911de I confirm to experience the same. This makes R2DBC Postgres driver 3 times slower compared to JDBC blocking driver. Setting different maxSize and initialSize settings makes it faster (very weird effect), however still ~1.6 slower than JDBC. @mp911de This is a serious major issue that makes R2DC drivers basically unusable for any production usage. This needs serious attention. I'm honestly surprised how r2dbc-pool library made it up to 1.0.0.RELEASE over 3 years without this being noticed by maintainers nor users. The benchmarks were even already there on TechEmpower, like @dstepanov mentioned, but it's also very easy to reproduce anyway. I assume it affects not only Postgres driver (which I was testing), since the issue is in the r2dbc-pool/netty.

My Reproduction

I have 4 (8) cores CPU i7-7700K. I do a single request to a WebFlux endpoint. Inside, it executes very simple SELECT (returning 1 record) 40 000 times concurrently. I.e. I create a list of 40 000 Monos of below:

databaseClient.sql("SELECT * FROM companies WHERE company_id = $1")
    .bind(0, currentId)
    .fetch()
    .first()
    .map(stringObjectMap -> ((Number) stringObjectMap.get("company_id")).longValue());

and execute them concurrently, calculating time spent once all Monos finish:

long timeStartNs = System.nanoTime();
return Mono.zip(dbCallMonos, objects -> TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - timeStartNs));

100 maxSize for R2DBC pool.

For JDBC, I do the above in a fixed thread pool, with the same size of 100 (matches the connection pool size in Hikari), with blocking Callable calls, waiting for all Futures to finish at the end.

Results:

The above correlates with TPS I see in Postgres dashboard. JVM warmup was done prior. Everything on the latest version (Ubuntu, Spring, drivers, Postgres). Postgres is on a separate machine. The absolute latency values don't matter, since it will be different on each machine, only the relative difference is important.

40 000 calls are done inside the endpoint (as opposed to calling 40 000 times the endpoint), to eliminate any framework/http latencies. However, I have another test with Spring MVC (JDBC) and WebFlux (R2DBC) doing the same SELECT single time during endpoint call, and endpoints are bombarded with a benchmarking tool from another machine on the local network with 1024 concurrency. In this setup, there are 8 Spring threads and 8 reactor-tcp-epoll threads. I observe the same speeds like above (even a bit worse). As for the threads, here is the WebFlux R2DBC graph - reactor-tcp-epoll-1 does 15% of the work, with the rest doing only ~3.5%; Spring HTTP threads are evenly spread: different-sizes-1024-concurrency-(faster)

In Spring MVC JDBC there is Hibernate on top, but it makes no difference - I've done tests without it as well, same results. All records returned from DB are unique, so Hibernate has no advantage (instead, even makes things a bit slower, probably).

Setting colocate to false in LoopResources.create(…) didn't have any effect. The warmup() didn't help as well.

I am able to provide the full reproducibles with .jfr's, but it's very easy to reproduce.

1528110566 commented 1 year ago

@artemik Thanks for your test. In my opinion, one single thread can not support high TPS if one single job(one DB query/update) is heavy. For example, when we using R2dbcRepository, framework help us do some serializations/deserializations or reflections to inject values, and they are heavy works. Setting colocate to false is a necessarily way to improve performance in some circumstances. Sorry I can't provide any evidence for now.

PiotrDuz commented 1 year ago

Hello, I am the original author of an issue posted on micronaut-data project. It has been escalated from there by libraries mantainers. Problem explanation is here: https://github.com/micronaut-projects/micronaut-data/issues/2136

So what is the fix for this problem? Having separate threads per connection socket. Is colocate = false achieving that? Looking at the source it is not easy to tell. There is no clear, separate eventLoopGroup created for client. Aso the created loop has some complicated thread factory model.

I have proposed custom, simple solution. During our tests it seemed to fix the issue. A picture is worth more than words, so here is the code:

import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.DatagramChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.ThreadPerTaskExecutor;
import reactor.core.publisher.Mono;
import reactor.netty.FutureMono;
import reactor.netty.resources.LoopResources;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

class NioClientEventLoopResources implements LoopResources {
    public static final String THREAD_PREFIX = "prefix-";
    final int threads;
    final AtomicReference<EventLoopGroup> loops = new AtomicReference<>();
    final AtomicBoolean running;

    NioClientEventLoopResources(int threads) {
        this.running = new AtomicBoolean(true);
        this.threads = threads;
    }

    @Override
    @SuppressWarnings("unchecked")
    public Mono<Void> disposeLater(Duration quietPeriod, Duration timeout) {
        return Mono.defer(() -> {
            long quietPeriodMillis = quietPeriod.toMillis();
            long timeoutMillis = timeout.toMillis();
            EventLoopGroup serverLoopsGroup = loops.get();
            Mono<?> slMono = Mono.empty();
            if (running.compareAndSet(true, false)) {
                if (serverLoopsGroup != null) {
                    slMono = FutureMono.from((Future) serverLoopsGroup.shutdownGracefully(
                            quietPeriodMillis, timeoutMillis, TimeUnit.MILLISECONDS));
                }
            }
            return Mono.when(slMono);
        });
    }

    @Override
    public boolean isDisposed() {
        return !running.get();
    }

    @Override
    public EventLoopGroup onClient(boolean useNative) {
        return cacheLoops();
    }

    @Override
    public EventLoopGroup onServer(boolean useNative) {
        throw new UnsupportedOperationException("This event loop is designed only for client DB calls.");
    }

    @Override
    public EventLoopGroup onServerSelect(boolean useNative) {
        throw new UnsupportedOperationException("This event loop is designed only for client DB calls.");
    }

    @Override
    public <CHANNEL extends Channel> CHANNEL onChannel(Class<CHANNEL> channelType, EventLoopGroup group) {
        if (channelType.equals(SocketChannel.class)) {
            return (CHANNEL) new NioSocketChannel();
        }
        if (channelType.equals(ServerSocketChannel.class)) {
            return (CHANNEL) new NioServerSocketChannel();
        }
        if (channelType.equals(DatagramChannel.class)) {
            return (CHANNEL) new NioDatagramChannel();
        }
        throw new IllegalArgumentException("Unsupported channel type: " + channelType.getSimpleName());
    }

    @Override
    public <CHANNEL extends Channel> Class<? extends CHANNEL> onChannelClass(Class<CHANNEL> channelType,
            EventLoopGroup group) {
        if (channelType.equals(SocketChannel.class)) {
            return (Class<? extends CHANNEL>) NioSocketChannel.class;
        }
        if (channelType.equals(ServerSocketChannel.class)) {
            return (Class<? extends CHANNEL>) NioServerSocketChannel.class;
        }
        if (channelType.equals(DatagramChannel.class)) {
            return (Class<? extends CHANNEL>) NioDatagramChannel.class;
        }
        throw new IllegalArgumentException("Unsupported channel type: " + channelType.getSimpleName());
    }

    @SuppressWarnings("FutureReturnValueIgnored")
    EventLoopGroup cacheLoops() {
        EventLoopGroup eventLoopGroup = loops.get();
        if (null == eventLoopGroup) {
            EventLoopGroup newEventLoopGroup = createNewEventLoopGroup();
            if (!loops.compareAndSet(null, newEventLoopGroup)) {
                //"FutureReturnValueIgnored" this is deliberate
                newEventLoopGroup.shutdownGracefully(0, 0, TimeUnit.MILLISECONDS);
            }
            eventLoopGroup = cacheLoops();
        }
        return eventLoopGroup;
    }

    private NioEventLoopGroup createNewEventLoopGroup() {
        return new NioEventLoopGroup(threads, new ThreadPerTaskExecutor(new DefaultThreadFactory(THREAD_PREFIX)));
    }
}

This class should be used in ConnectionFactoryOptions.option(PostgresqlConnectionFactoryProvider.LOOP_RESOURCES, new ) to replace default eventLoopGroup. Number of threads = max connections in r2dbc-pool config

@artemik can you run your tests once again, using above class? Your tests seem more detailed so I am curious whether this example will work fine here too.

Of course this theoretically is slower than single thread handling many sockets, as we are inducing cpu context switching for each thread. But load balancing the eventLoopGroup seems to be too complicated to implement, and the cost of performance loss is negligible when compared to random slowness introduced by overloaded thread.

Regards

artemik commented 1 year ago

@PiotrDuz Thanks, I've included it in my tests. I didn't check your class in details, but afaik it forces R2DBC to use the specified number of threads?

@mp911de I've cleaned up and prepared the reproducable: https://github.com/artemik/r2dbc-jdbc-vertx-issue-190

It tests more than just initialSize vs maxSize - it tests JDBC vs R2DBC vs Vertx (as another reactive driver example based on Netty) in standalone and WebFlux environments. Different environments produce different interesting results. Please check readme, it has all the details. I'll duplicate the key parts here.


Benchmark

There are 6 applications doing the same - run 500 000 SELECTs and measure the total time spent. Each SELECT returns the same (for max stability) single record by ID. All SELECTs are executed with concurrency equal to max DB connection pool size:

App Duration
JDBC 18 sec (baseline)
R2DBC Connection 18.3 sec (+1.5%)
R2DBC DatabaseClient 19.5 sec (+8.3%)
Vertx 18.1 sec (+0.5%)

Web App

App Duration
Spring MVC, JDBC 18 sec (baseline)
Spring WebFlux, R2DBC Connection 25.5 sec (+42% = 1.42 times)*
- *41.5 sec (+130% = 2.3 times) (without custom LoopResources; with (!) ConnectionPool.warmup())
- *41.5 sec (+130% = 2.3 times) (without custom LoopResources; without ConnectionPool.warmup(); equal initialSize and maxSize)
Spring WebFlux, R2DBC DatabaseClient 31.5 sec (+75% = 1.75 times)*
-*51 sec (+183% = 2.83 times) (without custom LoopResources; with (!) ConnectionPool.warmup())
-*51 sec (+183% = 2.83 times) (without custom LoopResources; without ConnectionPool.warmup(); equal initialSize and maxSize)
Spring WebFlux, Vertx 19 sec (+5.5%)

Conclusions

  1. First of all, in WebFlux, all R2DBC setups are slower than its standalone counterpart by at least ~40% (25.5 sec vs 18.3 sec).
  2. In WebFlux, without custom LoopResources, you MUST NOT run warmup(), otherwise things are slow.
  3. In WebFlux, the choice of equal vs non-equal initialSize and maxSize matters only when without custom LoopResources and without warmup - must be non-equal, otherwise things are slow. In all other cases, this choice doesn't matter.
  4. In standalone, all R2DBC settings perform the same.
  5. R2DBC is slower than JDBC by at least 42% in WebFlux, and by 1.5% in standalone.
  6. Vertx driver performs great in both WebFlux and standalone environments, and close to JDBC (especially in standalone). It doesn't seem to be affected by WebFlux like R2DBC.

Apparently, because Vertx also uses Netty but doesn't suffer from issues like R2DBC under WebFlux (and doesn't require any tricky settings), there should be a way to fix R2DBC.

PiotrDuz commented 1 year ago

@artemik in this post: https://github.com/r2dbc/r2dbc-pool/issues/190#issuecomment-1566333238 you have mentioned that in one test example only one thread was heavily utilised, colocate didnt help. Can you confirm that custom LoopResources posted by me result in even thread utilisation?

Answering your question: Yes, it forces r2dbc to use exactly one thread per open socket. As we would have maxSize of connection pool sockets open, thus we need maxSize number of threads.

Regards

artemik commented 1 year ago

@PiotrDuz I used number_of_cores threads for your LoopResources, because otherwise making it equal to max connection pool size breaks the reactive sense, and in theory shouldn't give much more performance. I tried it - same performance as with number_of_cores.

Anyway, to your question - I confirm your LoopResources class forces the number of threads specified, and the utilization is quite even.

However:

  1. Based on the results, as you can see, the presence of LoopResources doesn't make things faster.
  2. In standalone version, R2DBC works fine even with a single thread. Even more so - just all R2DBC cases perform equally good in standalone.
  3. Vertx performs great in both standalone and Webflux, and it used a single thread. (I mean, I think I saw it using multiple threads sometimes, it's probably smart to auto increase them when needed, but even with a single thread it performed fast as in the benchmarks.)

It means that how many threads R2DBC is using doesn't seem to matter - R2DBC is just slow with WebFlux for some unclear reason. And maybe the collocation, which was originally assumed to be the cause in this ticket, is not the issue.


As for initialSize vs maxSize equality, which was also the original issue raised in this ticket - it seems to be true - your default setup is: without custom LoopResources; without ConnectionPool.warmup(); equal initialSize and maxSize - this is exactly the case where R2DBC shows the worst performance, as per benchmarks. The workaround is - use any other setup, for example start calling warmup, or use LoopResources, etc, but only don't use this combination - "without custom LoopResources; with (!) ConnectionPool.warmup()" - because surprisingly it works the worst as well. But let me highlight here - equality of initialSize and maxSize itself doesn't make things slow - for example, the case "without LoopResources, with warmup(), with equal sizes" works fine... And lastly, these all workarounds give you a better performance, but still much slower than JDBC or Vertx.


So two things to be fixed here:

PiotrDuz commented 1 year ago

I think your benchmarks do not address the original problem of an issue. You have many small queries which have plenty of time in between to overlap with others. But when you start a heavy query, then other colocated query might be heavily impacted. This can be seen as huge drop of performance while other cores are idling. This is why I have found thread - per - socket approach to be a solution. I think by using colocation users just must be aware of drawbacks.

Your tests however are also helpful as they show other problems with r2dbc. But at this moment maybe it is worth to split topics in 2, here we could focus on colocation forcing queries on the same thread when other cores could be idle. Your findings seem to be connected with other causes.

Am I getting it right? Regards

artemik commented 1 year ago

@PiotrDuz you're right that many small queries don't clearly show how concurrently connections are used. I've added additional tests.

Results (SELECT 100 records)

Standalone

Not tested. WebFlux results show it's not needed.

Web App

Both MVC JDBC and WebFlux Vertx: 51 sec (baseline).

R2DBC results are more diverse, so I provide a table specifically for it with all setups. I only tested DatabaseClient. Note: for readability, + means with, - means without.

WebFlux R2DBC DatabaseClient Setup Duration
+ LoopResources, + warmup(), initialSize != maxSize 51 sec
+ LoopResources, + warmup(), initialSize == maxSize 51 sec
+ LoopResources, - warmup(), initialSize != maxSize 51 sec
+ LoopResources, - warmup(), initialSize == maxSize 51 sec
- LoopResources, + warmup(), initialSize != maxSize 111 sec (+117% = 2.17 times)
- LoopResources, - warmup(), initialSize != maxSize 111 sec (+117% = 2.17 times)
- LoopResources, + warmup(), initialSize == maxSize 120 sec (+135% = 2.35 times)
- LoopResources, - warmup(), initialSize == maxSize 120 sec (+135% = 2.35 times)

How to Interpret R2DBC Results

  1. Unlike with fast single-record selects, longer DB processing time of multi-record SELECTs probably makes R2DBC slowness itself unnoticeable, compared to the total processing time. So R2DBC has 4 setups where it performs normally (similarly to JDBC and Vertx) with 51 secs.
  2. Lack of custom LoopResources drops performance ~2.17 times:
    • and additionally having initialSize == maxSize drops it a bit more: ~2.35 times.
  3. R2DBC performs the worst in the default setup (-LoopResources, -warmup(), initialSize==maxSize).

Results (Connections concurrency / Threading)

In all the tests above, it was visible on DB side monitoring that R2DBC established all maxSize connections just fine, but it wasn't really clear how many of them were concurrently active, because the queries were too fast to build up concurrent processing on DB side.

You saw 4 cases with multi-record SELECTs where R2DBC performed as fast as JDBC and Vertx, but from single-record SELECT tests we know R2DBC is slower, so longer DB processing time of multi-record SELECTs just helps to hide R2DBC slowness itself. Moreover, those multi-record SELECTs were still too fast to simulate some heavy long query.

It all means that we need to make a test specifically for concurrent connections usage. To do that, I just modified the single-record SELECT to select pg_sleep(2) as well, to simulate 2 secs processing time. It means in ideal case, for 100 connections pool size, for example, all 100 sessions should become seen as active, proving that all connections are used. Let's see.

I also included observations on threading usage. It corresponds to what I saw in single/multi record SELECT tests as well.

Standalone and Web App

Vertx - all 100 connections were active in parallel. As for threading, it used only 1 thread, like in all previous benchmarks above as well, but it doesn't seem to cause performance issues.

R2DBC

How to Interpret R2DBC Results

  1. Because R2DBC underutilizes connections on first runs, it's a problem, for example, for specific cases where you have a cron job starting your app to run a batch of Mono queries in one go, and then shutdown - you'll must use warmup() in this case.
  2. Otherwise, connections usage is fine and I didn't see, for example, R2DBC being stuck with a single connection (regardless of how many threads were used).
  3. Threading - not clear whether it's an issue, because even with LoopResources R2DBC performed slow in single-record SELECTs for example. And also Vertx, for example, was able to perform fine even with 1 thread in all tests.
  4. It's not in the benchmarks, but in addition to triggering SELECTs from the single http request, I also separately tested triggering them from multiple http requests bombarded from a load testing tool, to simulate query calls triggered from different WebFlux threads - R2DBC showed the same threading usage like mentioned above.
    • Vertx, however, used multiple threads (~ CPU cores) just fine.

So to summarize:

dstepanov commented 1 year ago

@artemik Would you also mind including Micronaut V4 (latest milestone or RC) into the benchmark and see how it does behave vs. WebFlux? We may need some optimization too.

mp911de commented 1 year ago

After a few iterations, the best we can do here is using a dedicated thread pool for subscribing (Schedulers.single) to the connection factory to prevent accidental co-location. In combination with the optimized pool warmup, the single-threaded issue should be gone.

There is no good place to solve this issue as Reactor Netty wants to keep their colocated default. R2DBC Pool isn't opinionated about the underlying driver nor its threading model, yet it doesn't make sense to have connections with colocated threads in a pool, so I guess we're the one now compensating for a default that makes only sense in certain client-request scenarios.

artemik commented 1 year ago

@mp911de I think performance results (before/after) need to be provided from you here, before officially closing this issue.

P.S. @dstepanov I'll try checking Micronaut, if I have time.

samueldlightfoot commented 1 year ago

@mp911de Once we pick up the pool parallelism changes from reactor-pool, does it make sense to add this parallelism as a config property to ConnectionPoolConfiguration?

mp911de commented 1 year ago

It makes sense to do something. Can you file a new ticket?

samueldlightfoot commented 1 year ago

https://github.com/r2dbc/r2dbc-pool/issues/195

agorbachenko commented 1 year ago

Probably, It is time to replace my custom solution with an out-of-the-box one!

My custom fix was:

public class PostgresqlConnectionFactoryProvider implements ConnectionFactoryProvider {

    private final io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider delegate =
        new io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider();

    @Override
    public ConnectionFactory create(ConnectionFactoryOptions connectionFactoryOptions) {
        return new DelegatingConnectionFactory(delegate.create(connectionFactoryOptions)) {
            @Override
            public Mono<? extends Connection> create() {
                /* IMPORTANT: We have to create connection on dedicated thread, to ensure that connection
                wouldn't be created on eventLoop's reactor-tcp-<TRANSPORT>-X thread and bound to this
                thread because of this. Without this fix, database connections will be bound to the same
                eventLoop's thread causing performance bottleneck. See details at:
                https://gitter.im/reactor/reactor-netty?at=6145cdac99b7d9752817ad74.
                 */
                return super.create().publishOn(Schedulers.single());
            }
        };
    }

    @Override
    public boolean supports(ConnectionFactoryOptions connectionFactoryOptions) {
        return delegate.supports(connectionFactoryOptions);
    }

    @Override
    public String getDriver() {
        return delegate.getDriver();
    }
}
artemik commented 1 year ago

Hi @PiotrDuz , @mp911de , while I understand the low performance cause (or one of them) is threads colocation, any idea why I didn't see any performance degradation in the standalone driver tests (exactly the same tests, but just without WebFlux)? Why does it become slow only when the series of queries is launched from a WebFlux thread? What kind of relation between r2dbc-pool and WebFlux might be causing that?

mp911de commented 1 year ago

Colocation takes effect if a connection is obtained from a process that runs on a thread which is part of the colocated event loop.

Since WebFlux and most drivers use the same default event loop, any WebFlux thread requesting a database connection will create a connection that uses the request thread.

Does that make sense?

artemik commented 1 year ago

@mp911de, "any WebFlux thread requesting a database connection will create a connection that uses the request thread" - From what I saw, the threads were created new (reactor-tcp-*) in addition to the WebFlux thread that started the database queries (reactor-http-*). But if you mean the cause is that all of them were on the same event loop causing the colocation, then it probably makes sense.

Thanks for clarification.

artemik commented 1 year ago

Though @mp911de if colocation is default behaviour, I'm still confused why it doesn't happen in standalone environment, wouldn't we similarly have 8 threads handling all 100 connections (them being colocated as well)?