reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.93k stars 1.2k forks source link

Parallelism limitation of 100 VirtualThread in Flux #3857

Closed expe-elenigen closed 2 months ago

expe-elenigen commented 2 months ago

Expected Behavior

With Java 21 VirtualThread, I was expecting to be able to run a huge number of virtual threads in parallel, since they are much lighter to handle. I tried to run a Flux configured with a schedule which is a Schedulers.boundedElastic() while enabling the virtual threads: -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true.

The test is simple, in a loop, I run simple Thread sleep of 1 sec and at first it worked, if I picked a value 100 virtual threads, I can see them from loomBoundedElastic-1 to loomBoundedElastic-100 are executed and the total time elapsed is ~1 sec but if I double this number to 200, I would still expect to see ~1 sec.

Actual Behavior

With 200, I see a different pattern where the first 100 virtual threads are executed with the same ~1 sec, but then the remaining 100 are executed in ~ 100 sec for a total of 101542 ms.

Steps to Reproduce

Here's the code:

package com.my.test

import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

class Main

private val log = KotlinLogging.logger {}

const val ONE_SEC = 1_000L
const val TOTAL = 200

fun main(args: Array<String>) {
    val steps = (1..TOTAL).toList()
    val scheduler = Schedulers.boundedElastic()
    val startTime = System.currentTimeMillis()
    log.info { "=== START ===" }
    Flux
        .fromIterable(steps)
        .flatMap { step ->
            Mono
                .fromCallable {
                    Thread.sleep(ONE_SEC)
                }.log()
                .subscribeOn(scheduler)
        }.collectList()
        .block()
    log.info { "=== END ===" }
    log.info { "Time taken: ${System.currentTimeMillis() - startTime} ms" }
}
Logs TOTAL = 200 ```sh 10:47:10.905 [main] INFO com.my.test.Main -- === START === 10:47:10.957 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:10.957 [loomBoundedElastic-4] INFO reactor.Mono.Callable.4 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:10.957 [loomBoundedElastic-9] INFO reactor.Mono.Callable.9 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:10.957 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | request(32) 10:47:10.957 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:10.959 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | request(32) 10:47:10.960 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | request(32) 10:47:10.957 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) ... 10:47:11.969 [loomBoundedElastic-73] INFO reactor.Mono.Callable.73 -- | onComplete() 10:47:12.975 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onNext(kotlin.Unit) 10:47:12.976 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onComplete() 10:47:12.976 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:12.977 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | request(32) 10:47:13.978 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onNext(kotlin.Unit) 10:47:13.979 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onComplete() 10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | request(32) 10:47:14.985 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onNext(kotlin.Unit) 10:47:14.986 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onComplete() 10:47:14.986 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:14.987 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | request(32) 10:47:15.991 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onNext(kotlin.Unit) 10:47:15.992 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onComplete() 10:47:15.992 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:15.993 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | request(32) 10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onNext(kotlin.Unit) 10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onComplete() 10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | request(32) 10:47:17.998 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onNext(kotlin.Unit) ... 10:48:51.440 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | request(32) 10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onNext(kotlin.Unit) 10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onComplete() 10:48:52.445 [main] INFO com.my.test.Main -- === END === 10:48:52.448 [main] INFO com.my.test.Main -- Time taken: 101542 ms ```
Logs TOTAL = 100 ```sh 10:59:05.849 [main] INFO com.my.test.Main -- === START === 10:59:05.905 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:59:05.905 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) 10:59:05.905 [loomBoundedElastic-5] INFO reactor.Mono.Callable.5 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription) ... 10:59:06.918 [loomBoundedElastic-30] INFO reactor.Mono.Callable.30 -- | onComplete() 10:59:06.919 [loomBoundedElastic-63] INFO reactor.Mono.Callable.63 -- | onComplete() 10:59:06.919 [loomBoundedElastic-65] INFO reactor.Mono.Callable.65 -- | onComplete() 10:59:06.919 [loomBoundedElastic-72] INFO reactor.Mono.Callable.72 -- | onComplete() 10:59:06.919 [loomBoundedElastic-70] INFO reactor.Mono.Callable.70 -- | onComplete() 10:59:06.919 [loomBoundedElastic-86] INFO reactor.Mono.Callable.86 -- | onComplete() 10:59:06.920 [main] INFO com.my.test.Main -- === END === 10:59:06.922 [main] INFO com.my.test.Main -- Time taken: 1073 ms ```

Possible Solution

I didn't see anywhere mentioning a limitation of 100 virtual threads and I'm curious also if the threads are supposed to be reused or they are simply disposable? I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.

Your Environment

MacBook Pro M1

chemicL commented 2 months ago

Hey @expe-elenigen! Thank you for the report.

This VT-based Schedulers.boundedElastic() has the same limitation of 10 * availableCPUs as the regular one.

You can change that default with the reactor.schedulers.defaultBoundedElasticSize System property.

I will investigate the other effects you are seeing but please consider the above config for the time being.

chemicL commented 2 months ago

I didn't see anywhere mentioning a limitation of 100 virtual threads

This aspect is explained in my above comment.

I'm curious also if the threads are supposed to be reused or they are simply disposable?

The Virtual Threads are not reused, they use a thread-per-task model with in-order processing within a Worker. A particular operator/chain (like Mono#subscribeOn) allocates a Worker and as new tasks are scheduled they need to maintain the order so the Virtual Threads are created and their completion is awaited sequentially for a particular Worker.

I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.

Absolutely correct. flatMap does not have any sequentiality with regards to the completion of individual Monos that you create. You have encountered a bug which schedules all Mono subscriptions to the same Worker. With what I explained above, it appears that all the Callables are exercised sequentially after the first batch which is not correct.

I will push a fix soon.