quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.66k stars 2.64k forks source link

Kotlin Coroutine suspend function Integration uses single thread dispatcher #41288

Open Legion2 opened 3 months ago

Legion2 commented 3 months ago

Describe the bug

When working with kotlin coroutines and suspend functions in quarkus, the code is executed sequentially by a single thread by default. One has to explicitly change the kotlin coroutine dispatcher to get the expected behavior of the Dispatchers.Default which uses a Thread pool.

Expected behavior

By default when using suspend function with quarkus the Dispatchers.Default or Vertx.vertx().dispatcher() should be used as the dispatchers. Or it should be possible to configure the used Dispatcher.

Actual behavior

Some single threaded Dispatcher is used (maybe runBlocking is used) to run the suspend functions.

How to Reproduce?

Add the following code to an RestEasy Resource and call the endpoint

    @GET
    @Path("test")
    suspend fun test() {
        val start = System.currentTimeMillis()
        coroutineScope {
            repeat(10) {
                launch {
                    val timestamp = System.currentTimeMillis()
                    while (System.currentTimeMillis() < timestamp + 1000) {
                        // Busy wait
                    }
                    println("Hello, world!")
                }
            }
        }
        println("Total time: ${System.currentTimeMillis() - start}")
    }

Output: total time 10 seconds

    @GET
    @Path("test")
    suspend fun test() {
        val start = System.currentTimeMillis()
        coroutineScope {
            repeat(10) {
                launch(Vertx.vertx().dispatcher()) {
                    val timestamp = System.currentTimeMillis()
                    while (System.currentTimeMillis() < timestamp + 1000) {
                        // Busy wait
                    }
                    println("Hello, world!")
                }
            }
        }
        println("Total time: ${System.currentTimeMillis() - start}")
    }

Output: total time 1 second

    @GET
    @Path("test")
    suspend fun test() {
        val start = System.currentTimeMillis()
        coroutineScope {
            repeat(10) {
                launch(Dispatchers.Default) { //Dispatchers.IO
                    val timestamp = System.currentTimeMillis()
                    while (System.currentTimeMillis() < timestamp + 1000) {
                        // Busy wait
                    }
                    println("Hello, world!")
                }
            }
        }
        println("Total time: ${System.currentTimeMillis() - start}")
    }

Output: total time 1 second

Output of uname -a or ver

Linux leon-pc 6.5.0-35-generic #35~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue May 7 09:00:52 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

Output of java -version

openjdk version "17.0.11" 2024-04-16 OpenJDK Runtime Environment (build 17.0.11+9-Ubuntu-122.04.1) OpenJDK 64-Bit Server VM (build 17.0.11+9-Ubuntu-122.04.1, mixed mode, sharing)

Quarkus version or git rev

3.7.3

Build tool (ie. output of mvnw --version or gradlew --version)

Gradle 8.5

Additional information

No response

quarkus-bot[bot] commented 3 months ago

/cc @geoand (kotlin)

Legion2 commented 3 months ago

Can someone point me to the source code of the coroutine dispatcher implementation, then I can take a look and try to debug the problem.

Legion2 commented 3 months ago

I found this dispatcher implementation which is used by the resteasy reactive client.

https://github.com/quarkusio/quarkus/blob/32d0c2dd4d935149c71f6e81524fb3a46fe89515/extensions/resteasy-reactive/rest-kotlin/runtime/src/main/kotlin/org/jboss/resteasy/reactive/server/runtime/kotlin/ApplicationCoroutineScope.kt#L34-L51

There is also a similar implementation in the vertx extension https://github.com/quarkusio/quarkus/blob/ea2c6a4090d1eccb4b39250198601f7da9454833/extensions/vertx/kotlin/runtime/src/main/kotlin/io/quarkus/vertx/kotlin/runtime/VertxDispatcher.kt

Both implementations do not use the existing vertx Dispatcher and implement a very basic execution of coroutine on the vertx event loop.

mschorsch commented 3 months ago

I am also interested in the outcome of the analysis of this issue.

geoand commented 3 months ago

Thanks for opening this!

Just so I understand this, you are saying that when are manually handling the launch of coroutines, the dispatcher is single threader?

Also, some comments on things mentioned above:

I found this dispatcher implementation which is used by the resteasy reactive client.

The code you have linked to is for the Quarkus REST server part, not the client.

Both implementations do not use the existing vertx Dispatcher

Which one are you refering to as the existing vertx Dispatcher?

mschorsch commented 3 months ago

Which one are you refering to as the existing vertx Dispatcher?

The vertx.dispatcher() from vertx-lang-kotlin-coroutines (https://vertx.io/docs/vertx-lang-kotlin-coroutines/kotlin/#_running_a_coroutine_from_a_vert_x_context)

geoand commented 3 months ago

Oh, I was not aware of that one at all...

I wonder if we should start using it elsewhere too Never mind, that would not work in Quarkus

Legion2 commented 3 months ago

@geoand why can the Dispatcher from vertx-lang-kotlin-coroutines not be used in quarkus. If it can not reused, what changes are needed in quarkus to port the Dispatcher from vertx-lang-kotlin-coroutines.

geoand commented 3 months ago

See the code in RESTEasy Reactive that uses its own dispatcher

Legion2 commented 3 months ago

@geoand you mean this code?

            requestScope.activate()
            CurrentRequestManager.set(rrContext)
            try {
                block.run()
            } finally {
                requestScope.deactivate()
            }

I don't know what this code is used for, but it looks like it wraps the actual request code. I think there are two options:

  1. instead of having this in the dispatcher, move it CoroutineInvocationHandler
  2. or, create a wrapper for the Dispatcher of vertx-lang-kotlin-coroutines

I see you also implemented coroutine canceling for canceled requests, which is realy cool. Can't wait having this in my projects.

geoand commented 3 months ago

I don't know what this code is used for

You essentially need to propagate a lot of context from the thread were the request initiates to whatever thread is going to be used for running each stage of the suspend(able) function

I see you also implemented coroutine canceling for canceled requests, which is realy cool. Can't wait having this in my projects.

Thanks. I was actually surprised we didn't have this already...

Legion2 commented 3 months ago

You essentially need to propagate a lot of context from the thread were the request initiates to whatever thread is going to be used for running each stage of the suspend(able) function

Have a look at ThreadContextElement

geoand commented 3 months ago

I was not aware of that, thanks