quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.57k stars 2.63k forks source link

In a @Scheduled block Kotlin Coroutines run on the same vert.x worker, or eventloop threads, or DefaultDispatcher-worker with inconsistent behaviours #42236

Open laurentperez opened 1 month ago

laurentperez commented 1 month ago

Describe the bug

Hi

I'd like to run the coroutines asynchronously on different worker threads, being vertx or DefaultDispatcher ones.

Using the TCCL workaround + quarkus 3.9, asyncs run fine on different DefaultDispatcher-worker threads ; but I'd like to avoid the TCCL workaround.

Full reproducer is attached below. Am I doing something wrong ?

It's ok for me to block the Scheduled method, but then, I can't manage to use different vert.x-worker threads, and if not using vertx.worker threads, then DefaultDispatcher-worker threads don't reach the @Injected client code at all.

The core code block from reproducer :

 @Scheduled(cron = "0 0 * * * ? *")
    fun schedule() { // n.b insert suspend keyword if withcontext() is used
        val ids = (1 .. 128).toList()

        // when NOT using suspend on parent method
        // using runBlocking(Dispatchers.Unconfined) all run on same vert.x-worker-thread-XX
        // using runBlocking(Dispatchers.Default) only one DefaultDispatcher-worker is used, only one "in thread" print stmt is shown, doSomething() is never called
        // using runBlocking(Dispatchers.IO) only one DefaultDispatcher-worker is used, only one "in thread" print stmt is shown, doSomething() is never called
        // withContext can't be called if 'suspend' is not used on parent method

        // when using suspend on parent method
        // using withcontext(Dispatchers.Unconfined) all run on same vert.x-eventloop-thread-XXX and we get an expected warning "has been blocked for 3478 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked"
        // using withcontext(Dispatchers.Default) all run on different DefaultDispatcher-worker-XXX threads, doSomething() is never called
        // using withcontext(Dispatchers.IO) only one DefaultDispatcher-worker is used, only one "in thread" print stmt is shown, doSomething() is never called

        // using runBlocking(Dispatchers.Unconfined) all run on same vert.x-eventloop-thread-XXX and we get has been blocked for 3478 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
        // using runBlocking(Dispatchers.Default) all run on different DefaultDispatcher-worker-XXX threads, doSomething() is never called
        // using runBlocking(Dispatchers.IO) all run on different DefaultDispatcher-worker-XXX threads, doSomething() is never called

        runBlocking {
            ids.map { id ->
                async {
                    println("in thread ${Thread.currentThread().name}#${Thread.currentThread().threadId()}")
                    service.doSomething(id.toString())
                }
            }.awaitAll()
        }

Expected behavior

Actual behavior

How to Reproduce?

Output of uname -a or ver

No response

Output of java -version

21

Quarkus version or git rev

3.12.3

Build tool (ie. output of mvnw --version or gradlew --version)

No response

Additional information

No response

quarkus-bot[bot] commented 1 month ago

/cc @geoand (kotlin), @manovotn (scheduler), @mkouba (scheduler)

mkouba commented 1 month ago

Does it work if you run the same logic outside a @Scheduled method? For example, directly in a blocking JAX-RS resource method? I'm no Kotlin expert but I would guess that this use case is not supported.

laurentperez commented 1 month ago

I updated the reproducer repository with a REST endpoint displaying the above issues

laurentperez commented 1 month ago

to clarify my expected behaviour above

Every deferred id of ids.map { id -> async code block} runs either in a different vertx.worker thread or a different DefaulltDispatcher-worker thread

Is this the expected way ? i.e directly using the vertx instance :

    @Inject
    lateinit var vertx: Vertx
@GET
 fun foo() : String {return executeOnWorkerPool()}

   private suspend fun executeOnWorkerPool(): String {
        return vertx.executeBlocking { promise: Promise<String> ->
            runBlocking(Dispatchers.Default) {
                ids.map { id -> service.doSomething(id) // this is blocking}
                promise.complete(result)
            }
        }.await()
    }
mkouba commented 1 month ago

Is this the expected way ? i.e directly using the vertx instance :

This could work but the Vert.x duplicated context is not used; see https://quarkus.io/guides/duplicated-context for some basic info. Also things like CDI request context are not activated.