spring-projects / spring-framework

Spring Framework
https://spring.io/projects/spring-framework
Apache License 2.0
56.72k stars 38.15k forks source link

Explore leveraging Virtual Thread Coroutine dispatcher #33788

Open MarcinMoskala opened 4 weeks ago

MarcinMoskala commented 4 weeks ago

Currently, the default dispatcher used for suspending controller functions is Dispatchers.Unconfined, which is a dangerous poor choice. I believe it was chosen due to the common misconception that Dispatchers.Unconfined runs on the thread that was used to start it. That is true, but only until the first suspension point, after that it runs on the thread that was used to resume it, what is dangerous, because libraries are designed to use the minimal number of threads in their suspending API, and they do not design what thread is used to resume, as they assume a dispatcher will change it anyway (out of all dispatchers, only Dispatchers.Unconfined is not changing it).

Take a look at this example from my book Kotlin Coroutines: Deep Dive:

fun main() {
    var continuation: Continuation<Unit>? = null

    thread(name = "Thread1") {
        CoroutineScope(Dispatchers.Unconfined).launch {
            println(Thread.currentThread().name) // Thread1

            suspendCancellableCoroutine {
                continuation = it
            }

            println(Thread.currentThread().name) // Thread2

            delay(1000)

            println(Thread.currentThread().name) // kotlinx.coroutines.DefaultExecutor
        }
    }

    thread(name = "Thread2") {
        Thread.sleep(1000)
        continuation?.resume(Unit)
    }

    Thread.sleep(3000)
}

As you can see, after suspension, the coroutine runs on the thread that resumed it, and after delay it runs on DefaultExecutor. This poor thread is only supposed to be used to schedule coroutines resuming, not to run their bodies. Above all, it is one for the whole application.

Consider this simplified controller:

@RestController
@RequestMapping
class PingController(){
    @GetMapping("/ping")
    suspend fun ping(): ResponseEntity<Map<String, Boolean>> {
        delay(1000)
        Thread.sleep(1000)
        return ResponseEntity(mapOf("success" to true), HttpStatus.OK)
    }
}

If you make 1000 requests, it should take at least 1001 seconds, as all sleeps will happen on DefaultExecutor (my experiments confirm that). That is no good. If we used Dispatchers.IO, it would need 1000 / 64 + 1 = 17 seconds (due to IO limit). Of course, in a real-life example we should have some db or network request instead of delay, and some processing instead of sleep, but the essential problem remains the same.

@RestController
@RequestMapping
class PingController(){
    @GetMapping("/ping")
    suspend fun ping(): ResponseEntity<Map<String, Boolean>> {
        val data = fetchData()
        complexProcessing(data)
        return ResponseEntity(mapOf("success" to true), HttpStatus.OK)
    }
}

Most suspending network clients optimize to use a minimal number of threads. In Ktor Client, for instance, most engines will use only one thread to resume coroutines, so delay is actually mimicking that pretty well. Consider the following example. On my computer, it takes 30 seconds with Dispatchers.Unconfined, but only 5 seconds if we used Dispatchers.Default instead:

suspend fun main() = measureTimeMillis { 
    withContext(Dispatchers.Unconfined) {
        repeat(1000) {
            launch {
                val data = fetchData()
                complexProcessing(data)
            }
        }
    }
}.let { println("Took $it") }

suspend fun fetchData(): Data {
    delay(1000)
    return Data()
}
class Data()

private val list = List(200_000) { it }.shuffled()
fun complexProcessing(data: Data) {
    list.map { it % 10_000 }.sorted()
}

So what dispatcher should be used? In theory, if we never block threads, Dispatchers.Default is the best option, but using it would be a wishful thinking. There are too many blocking APIs on backend, and Dispatchers.Default is not good if you have blocking calls. Dispatchers.IO is what is used by Ktor Server, and it would be a better option. Though it is not perfect, as it has one global limit of 64 threads. The danger is that Dispatchers.IO is used to wrap over blocking calls, and if one process needs to do a lot of blocking calls (consider a job that sends newsletter using blocking SendGrid API), then controller handlers might wait in queue for an available thread.

I believe the perfect option would be to use:

sdeleuze commented 4 weeks ago

Hey @MarcinMoskala, thanks for your feedback. I have asked Kotlin team feedback on this to be able to do an educated choice.

dkhalanskyjb commented 4 weeks ago

The information in the post is accurate: Undispatched just runs the computations wherever they happen to be provided. I don't understand why one would call delay in a web server, but if this does happen on Dispatchers.Unconfined, then indeed, after the delay resumes, the thread responsible for all delays has to execute the code further.

Dispatchers.IO or Dispatchers.IO.limitedParallelism(howManyParallelBlockingTasksYouExpect) is a solid choice if you want to run many tasks in parallel and expect most of them to run blocking code. Regarding the limit of 64: it can be configured via a JVM system property (see https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html). Dispatchers.Default only creates as many threads as there are CPU cores, which would leave the system severely underutilized if blocking tasks are dispatched on it.

sdeleuze commented 4 weeks ago

I am pretty sure I discussed the usage of Dispatchers.Unconfined back in the days with the Kotlin team, Coroutines support in Spring is built on top of the Reactive stack where you are not supposed to use blocking code, so the spirit was that switching thread was not a requirement. That said, I was not aware that after delay() for example, coroutine runs on DefaultExecutor. Happy to refine if the Kotlin team agree with some proposals done in this issue (the Loom dispatcher looks like an appealing option).

MarcinMoskala commented 3 weeks ago

I have a sense, that the initial intention was to have a dispatcher like the one used by runBlocking, that is always using the thread that was used to start this coroutine. However, such a dispatcher is not appropriate, as it blocks this thread, which neglects the benefits of using coroutines in a backend application.

The perfect solution should:

  1. Start coroutines in some dispatcher that effectively uses threads (like the dispatchers I listed in the original post)
  2. Await completion of those coroutines in a non-blocking manner (I am not sure how it is right now, but if we block a thread waiting for completion of those coroutines, it would neglect the benefits of using coroutines).

About the second point, I see coroutines use the callback API of Mono, so it might work fine, but to confirm that, I would need to dive deeper into how Mono works: https://github.com/Kotlin/kotlinx.coroutines/blob/6c6df2b850382887462eeaf51f21f58bd982491d/reactive/kotlinx-coroutines-reactor/src/Mono.kt#L91

sdeleuze commented 3 weeks ago

Mono and its integration with Coroutines is non-blocking, otherwise we would have chosen another dispatcher. That's why I was confused by your usage of Thread.sleep(1000) in one of the samples.

dkhalanskyjb commented 3 weeks ago

I am pretty sure I discussed the usage of Dispatchers.Unconfined back in the days with the Kotlin team

This could well be, just before my time. I've tried to look for any logs of such a discussion, but didn't manage to find any.

Coroutines support in Spring is built on top of the Reactive stack

I can guess why Unconfined was suggested: it's consistent with how Reactive Streams typically deal with threading.

    Flux.concat<Long>(
        Mono.defer {
            println("Creating 3 on ${Thread.currentThread()}")
            Mono.just(3L)
        },
        Mono.delay(java.time.Duration.ofMillis(100)),
        Mono.defer {
            println("Creating 4 on ${Thread.currentThread()}")
            Mono.just(4L)
        }
    ).blockLast()

prints

Creating 3 on Thread[#1,main,5,main]
Creating 4 on Thread[#33,parallel-1,5,main]

delay happens on a different scheduler (in this case, parallel), and after switching to another thread, Reactor happily stays there for the other computations, just like Dispatchers.Unconfined does. So, Unconfined may be a good choice if you want a uniform behavior with Reactor's threading. If you don't need to inherit this behavior of React and are certain that blocking tasks will not enter the queue, Dispatchers.Default should be a good option. That's what we use by default in our Mono integration: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-reactor/kotlinx.coroutines.reactor/mono.html

Happy to refine if the Kotlin team agree with some proposals done in this issue

I unfortunately don't know how Spring in particular uses coroutines (where the computations come from, on which thread we want to execute them, etc.) and can't say anything more conclusive right now. If you have a description of the Spring/Mono/coroutines integration or at least an example of its typical usage, it will be easier for me to give you advice suitable for your specific case.

(the Loom dispatcher looks like an appealing option)

I'm not sure what you'd get from that if you don't have blocking calls in the computations: if we simplify things a bit, the point of Loom is, in essence, to treat Thread.sleep(1000) as delay(1000) automatically.

MarcinMoskala commented 2 weeks ago

I collected all my reflections in this article: https://kt.academy/article/dispatcher-for-backend

dkhalanskyjb commented 2 weeks ago

@MarcinMoskala, the point about blocking calls in Dispatchers.Default being difficult to find can be mitigated by using BlockHound together with kotlinx-coroutines-debug. Using 64 threads like you suggest when blocking calls are typically not expected is not advised, because this introduces additional thread-switching overhead: the more threads you have, the bigger portion of the work done by CPUs is dedicated to ensuring that all threads have a chance to perform their tasks.

sdeleuze commented 2 weeks ago

@dkhalanskyjb Thanks for your feedback, make sense. Spring support for Coroutines indeed tries to stay as close as possible of our Reactive arrangement.

@MarcinMoskala I value your feedback, but I am not sure that writing a blog post with bold statements like "Why Dispatchers.Unconfined is not a good choice", and without taking in account most of the nuances discussed here really helps as it may be confusing and misleading for some people. Also, unless I miss something, it contains outdated statements like "However, currently to use Project Loom is in preview. You can use it if you use JVM 19+ and enable preview features".

Based on the various points discussed here, I am leaning towards keeping Dispatchers.Unconfined as the default on Java < 21.

For Java 21+, I am open to make it easier to use Executors.newVirtualThreadPerTaskExecutor().asCoroutineDispatcher() instead of Dispatchers.Unconfined in order to have a better support of blocking code, but I am not sure yet how far we should go. For example, how should this VT friendly dispatcher should be enabled (default at Framework level, property at Framework level, Boot level depending on the value of spring.threads.virtual.enabled=true, on demand)? Also we likely need to explore how consistent Spring Coroutines support will be with Reactor related support.

I will refine the title of this issue accordingly, and tentatively plan a related exploration for Spring Framework 7.

@chemicL @simonbasle @rstoyanchev Feedback welcome.

chemicL commented 2 weeks ago

Thanks for the mention @sdeleuze. I generally agree with the sentiment stated by @dkhalanskyjb:

Using 64 threads like you suggest when blocking calls are typically not expected is not advised, because this introduces additional thread-switching overhead

I think it's worth noting that I am not well versed with coroutines in Kotlin and dispatchers, but having the above points and the understanding that the unconfined dispatcher:

It feels like the Dispatcher.Unconfined is precisely the same conceptual model that reactive-streams has and for that usage in Spring it sounds like a natural choice not limiting performance in case of non-blocking scenarios.

@MarcinMoskala đź‘‹ The examples with Thread.sleep() suggest you are considering using blocking APIs in your coroutines or even in reactive scenarios. For such usages, in Reactor, we have specific Schedulers to offload the blocking work onto. We also introduced support for Loom's Virtual Threads as it has been pointed out. It looks like Dispatchers allow you to achieve the same conceptual goal of offloading in case you know you're going to use blocking APIs.

The Virtual Thread-based Schedulers and Dispatchers sound rather appealing but should be used carefully and with proper understanding of their limitations. One aspect is performance (unnecessary thread hops) and another is scalability. An important nuance today is that Virtual Threads run on the default ForkJoinPool, which by default is using as many platform threads as the number of cores the JVM can see. There is a well known and described problem with thread pinning, which can limit the scalability significantly if your blocking APIs are not properly aligned with the requirements of VTs (no JNI calls, no blocking calls in synchronized blocks) - one of these will only be resolved in JDK24 per the recently updated JEP, others perhaps over time.

I agree there are cases where the "continue on whichever thread is resuming the processing" is problematic for these blocking scenarios so I agree with @sdeleuze that it should be a different discussion where a correct model for controlling the offloading to a dedicated Dispatcher is proposed. It should not be at the expense of the default non-blocking mode though, as for instance in reactor-netty the end goal is to make as little thread hops as possible - e.g. sharing the event loop between the server and the client. (cc @violetagg).

rstoyanchev commented 4 days ago

If we're talking about CoroutinesUtils#invokeSuspendingFunction, it returns Publisher and that's intended for non-blocking handling by design (Reactive Streams spec). Specifically for controller method invocations, the return value is handled by the web framework, so we know exactly what happens. WebFlux is non-blocking throughout by design. WebMvc is not but it handles reactive types carefully to avoid blocking for Servlet response writes.

I think the original decision to use Dispatchers.Unconfined is fine given the Reactive Streams contract, and consistent with non-blocking handling in a reactive pipeline. Blocking is possible, but it is not expected, and in any case that is more general issue that is mitigated by Blockhound.

I don't think it makes sense to use virtual threads where non-blocking execution is assumed. We should consider Dispatchers.Default, which would align with Kotlin's Mono integration.