Kotlin / kotlinx.coroutines

Library support for Kotlin coroutines
Apache License 2.0
12.94k stars 1.84k forks source link

Thread local is not cleaned up sometimes #2930

Open frost13it opened 2 years ago

frost13it commented 2 years ago

I'm using a ThreadContextElement that sets value of a ThreadLocal. After resolving of #985 it worked perfectly. But after upgrade to 1.5.0 I've got a similar problem: sometimes the last value of the thread local stucks in a worker thread. Equivalent code:

while(true) {
    someCode {
        // here the thread local may already have a value from previous iteration
        withContext(threadLocal.asContextElement("foo")) {
            someOtherCode()
        }
    }
}

Actual code of the ThreadContextElement implementation is here.

It is hard to reproduce the issue, but I'm facing it periodically in production (it may take hours or days to arise). Tested 1.5.0 and 1.5.2, both behaves the same. Running it with -ea.

qwwdfsad commented 2 years ago

It's hard to tell what exactly is wrong without seeing the whole coroutine's hierarchy.

What I suspect can be a root cause is a 3rd-party implementation of coroutine builder that does not implement CoroutineStackFrame or completely rewrite the coroutine context instead of overwriting only required elements by +.

985 uses stackwalking capabilities, leveraging CoroutineStackFrame and the fact that all suspending coroutine builders implement it and also relies on the fact that context is properly propagated.

When the exception is thrown, can you please check if the coroutine context in the most nested coroutine contains UndispatchedMarker?

frost13it commented 2 years ago

Thanks for the response. I'll check that.

frost13it commented 2 years ago

I've checked presence of the UndispatchedMarker. It is here inside withContext(threadLocal) { } (but not outside it) and in the most nested suspension points (which are CompletableFuture.await()). Besides CompletableFuture.await(), the only coroutine-related things in the project are basic coroutine builders (withContext { }, coroutineScope { }, launch { }, withTimeout { }). There is no 3rd-party builders or anything like that.

michail-nikolaev commented 2 years ago

Hello.

We are getting something like this after few days in production...

We have a loop like this:


class RequestContextsStorage()
val threadLocalForRequestContext = ThreadLocal<RequestContextsStorage>()

class RequestContextThreadContextElement(private val storage: RequestContextsStorage) :
    ThreadContextElement<RequestContextsStorage> {

    // Key for CoroutineContext key-value storage
    private object Key : CoroutineContext.Key<RequestContextThreadContextElement>

    override val key: CoroutineContext.Key<*> get() = Key

    override fun updateThreadContext(context: CoroutineContext): RequestContextsStorage {
        val oldState = threadLocalForRequestContext.get()
        threadLocalForRequestContext.set(storage)
        return oldState
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: RequestContextsStorage) {
        threadLocalForRequestContext.set(oldState)
    }
}

private var otherThreadLocal = ThreadLocal<String?>()
private val scope = CoroutineScope(Dispatchers.IO)

scope.launch(otherThreadLocal.asContextElement("x")) {
    while (isActive) {
        delay(100)
        // sometimes here we could see some value in **threadLocalForRequestContext**
        someStuff()
    }
}

Also, all builders are pretty standard. Maybe some tricks with cancellation\exceptions\etc...

1.5.1 version.

And of course, we have a lot of code like this:

runBlocking(Dispatchers.IO) {
    withContext(RequestContextThreadContextElement(someValue) + otherThreadLocal.asContextElement("x")) {
         // everything seems be fine here
    }
}
michail-nikolaev commented 2 years ago

Workaround like this:

scope.launch(otherThreadLocal.asContextElement("x")) {
    while (isActive) {
      **withContext(RequestContextThreadContextElement(empty)) {**
            delay(100)
            // sometimes here we could see some value in **threadLocalForRequestContext**
            someStuff()
        }
    }
}

fixed the issue in our case.

frost13it commented 2 years ago

Still reproducible on 1.6.0.

frost13it commented 2 years ago

Finally I've managed to write a small reproducer:

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    while (true) {
        coroutineScope {
            repeat(100) {
                launch {
                    doSomeJob()
                }
            }
        }
    }
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        val semaphore = Semaphore(1, 1)
        suspendCancellableCoroutine<Unit> { cont ->
            Dispatchers.Default.asExecutor().execute {
                cont.resume(Unit)
            }
        }
        cancel()
        semaphore.acquire()
    }
}

It completes almost instantly on my machine and takes some time on play.kotlinlang.org.

qwwdfsad commented 2 years ago

Great job with a reproducer! Verified it reproduces, we'll fix it in 1.6.1

frost13it commented 2 years ago

Is there a planned release date for 1.6.1?

frost13it commented 2 years ago

Unfortunately, the issue seems to be still there.

The following code throws an exception on versions from 1.5.0 till current develop branch (262876ba):

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    doSomeJob()
    doSomeJob()
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        try {
            coroutineScope {
                val semaphore = Semaphore(1, 1)
                dummyAwait()
                cancel()
                semaphore.acquire()
            }
        } catch (e: CancellationException) {
            println("cancelled")
        }
    }
}

private suspend fun dummyAwait() {
    CompletableFuture.runAsync({ }, Dispatchers.Default.asExecutor()).await()
}
qwwdfsad commented 2 years ago

Could you please recheck on 1.6.1?

I cannot reproduce it as is, I will give a few tries a bit later to see if it still reproduces. Anyway, 1.6.1 fixes at least one serious bug in thread locals, so it's worth upgrading

frost13it commented 2 years ago

The same on 1.6.1, every time. Checked on Liberica JDK 11.0.14 and some build of OpenJDK 17. Tried kotlinx.coroutines.scheduler.core.pool.size of 1-8, no changes. Is there something else that could depend on environment?

qwwdfsad commented 2 years ago

Aha, I see, it only reproduces with non kotlinx.coroutines-related entry point, namely suspend fun main!

I'll fix it separately. Meanwhile, it would be nice to see if you are still affected in the production environment as it's unlikely to be the case that someone has suspend provider without integration with kotlinx.coroutines.

Depending on that we'll decide on an urgency of the fix

frost13it commented 2 years ago

Indeed, suspend fun main() is not a production case. But it is pretty sad (not fatal, though) when a "quick and dirty" piece of code fails with such exception. I'm faced this situation using one of our support tools.

The potential production case is an application based on Ktor 1.6.8. When using Netty engine and kotlinx.coroutines 1.6.1, it fails exactly the same way. I don't know if it is a Ktor issue, but I achieved the same effect without it. Since this bug scares me as hell now, I can't leave it without attention. When using CIO engine, everything is ok, but who knows where it will strike again without any chance for a quick fix.

We have some hooks in the infrastructure to ensure that code runs with an initial value of ThreadLocal. This approach could work even on pre-1.4.3 kotlinx.coroutines and let us work around this bug in most cases. But still every GlobalScope.launch { } is a potential bomb.

frost13it commented 2 years ago

The reproducer for Ktor does not differ much:

val threadLocal = ThreadLocal<String>()

fun main() {
    val engine = embeddedServer(Netty, port = 8080) {
        routing {
            get {
                doSomeJob()
                doSomeJob()
            }
        }
    }
    engine.start()
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        try {
            coroutineScope {
                val semaphore = Semaphore(1, 1)
                dummyAwait()
                cancel()
                semaphore.acquire()
            }
        } catch (e: CancellationException) {
            println("cancelled")
        }
    }
}

private suspend fun dummyAwait() {
    CompletableFuture.runAsync({ }, Dispatchers.Default.asExecutor()).await()
}
qwwdfsad commented 2 years ago

Thanks for both Ktor and regular reproducer!

The source of the issue is indeed non-kotlinx.coroutines related entry point that Ktor leverages in order to optimize its internal machinery (SuspendFunGun). #3155 fixed completely different bug that happened to be reproducible with the very same snippet :)

I have a potential solution in mind (#3252) and also future-proof plan to avoid similar problems (#3253), I believe this issue itself is enough to release 1.6.2 with a fix, though I cannot give you a strict timeline here

michail-nikolaev commented 1 year ago

Just want to notify - KTOR users are still affected by the issue even on latest 1.6.4 version (SuspendFunctionGun)

https://youtrack.jetbrains.com/issue/KTOR-2644/Seems-like-Restore-thread-context-elements-when-directly-resuming-to-parent-is-still-broken#focus=Comments-27-6530727.0-0

qwwdfsad commented 1 year ago

@michail-nikolaev could you please share a reproducer?

michail-nikolaev commented 1 year ago

@qwwdfsad I have updated repo with new versions and new test based on Aleksei Tirman provided.

https://github.com/michail-nikolaev/kotlin-coroutines-thread-local/blob/master/test/SuspendFunctionGunTest.kt (and copy past of KTOR in https://github.com/michail-nikolaev/kotlin-coroutines-thread-local/blob/master/test/CopyPast.kt)

qwwdfsad commented 1 year ago

I'll investigate it, thanks

qwwdfsad commented 1 year ago

Preliminary findings are that the problem is caused by SuspendFunGun implementation from Ktor

qwwdfsad commented 1 year ago

Here is the patch to make a reproducer from https://github.com/michail-nikolaev/kotlin-coroutines-thread-local/ work (NB: reproducer is non-deterministic and happen to break only under debugger due to its timing-sensitive nature)

Subject: [PATCH] 1
---
Index: test/ApplicationTest.kt
===================================================================
diff --git a/test/ApplicationTest.kt b/test/ApplicationTest.kt
deleted file mode 100644
--- a/test/ApplicationTest.kt   (revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ /dev/null   (revision efb68c79e32d8beddd416b48a73eee8141274f1d)
@@ -1,37 +0,0 @@
-package bug.reproduce
-
-import io.ktor.http.HttpMethod
-import io.ktor.http.HttpStatusCode
-import io.ktor.server.testing.handleRequest
-import io.ktor.server.testing.withTestApplication
-import kotlin.test.Test
-import kotlin.test.assertEquals
-
-class ApplicationTest {
-    @Test
-    fun testWorks() {
-        withTestApplication({ module(testing = true) }) {
-            handleRequest(HttpMethod.Get, "/works").apply {
-                assertEquals(HttpStatusCode.OK, response.status())
-            }
-        }
-    }
-
-    @Test
-    fun testBroken() {
-        withTestApplication({ module(testing = true) }) {
-            handleRequest(HttpMethod.Get, "/broken").apply {
-                assertEquals(HttpStatusCode.OK, response.status())
-            }
-        }
-    }
-
-    @Test
-    fun testAlsoBroken() {
-        withTestApplication({ module(testing = true) }) {
-            handleRequest(HttpMethod.Get, "/also-broken").apply {
-                assertEquals(HttpStatusCode.InternalServerError, response.status())
-            }
-        }
-    }
-}
Index: test/CopyPast.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/test/CopyPast.kt b/test/CopyPast.kt
--- a/test/CopyPast.kt  (revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ b/test/CopyPast.kt  (date 1667320099390)
@@ -1,6 +1,5 @@
-package io.ktor.util.pipeline
+package foo

-import io.ktor.util.KtorDsl
 import kotlinx.coroutines.CoroutineScope
 import kotlin.coroutines.Continuation
 import kotlin.coroutines.CoroutineContext
@@ -30,7 +29,6 @@
     }
 }

-@KtorDsl
 public abstract class PipelineContext<TSubject : Any, TContext : Any>(
     public val context: TContext
 ) : CoroutineScope {
@@ -62,7 +60,7 @@
     initial: TSubject,
     context: TContext,
     private val blocks: List<PipelineInterceptorFunction<TSubject, TContext>>
-) : io.ktor.util.pipeline.PipelineContext<TSubject, TContext>(context) {
+) : PipelineContext<TSubject, TContext>(context) {

     override val coroutineContext: CoroutineContext get() = continuation.context

@@ -89,11 +87,11 @@
             // and simply return StackWalkingFailedFrame on any unfortunate accident

             try {
-                val result = suspensions[currentIndex] ?: return io.ktor.util.pipeline.StackWalkingFailedFrame
+                val result = suspensions[currentIndex] ?: return StackWalkingFailedFrame
                 currentIndex -= 1
                 return result
             } catch (_: Throwable) {
-                return io.ktor.util.pipeline.StackWalkingFailedFrame
+                return StackWalkingFailedFrame
             }
         }

Index: test/SuspendFunctionGunTest.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/test/SuspendFunctionGunTest.kt b/test/SuspendFunctionGunTest.kt
--- a/test/SuspendFunctionGunTest.kt    (revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ b/test/SuspendFunctionGunTest.kt    (date 1667406449175)
@@ -1,36 +1,62 @@
-package io.ktor.util.pipeline
+package foo

-import io.ktor.util.cio.readChannel
-import kotlinx.coroutines.asContextElement
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
+import foo.*
+import io.ktor.util.cio.*
+import kotlinx.coroutines.*
 import java.io.File
-import kotlin.test.Test
-import kotlin.test.assertNull
+import kotlin.concurrent.*
+import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
+import kotlin.test.*

-private val requestIdThreadLocal = ThreadLocal<Int?>()
+private val requestIdThreadLocal = ThreadLocal.withInitial { 72 }
+public typealias PipelineInterceptor<TSubject, TContext> =
+        suspend PipelineContext<TSubject, TContext>.(TSubject) -> Unit

 class SuspendFunctionGunTest {
+
     @Test
     fun test() = runBlocking {
         val interceptors = listOf<PipelineInterceptor<Unit, Unit>>(
             {
+                // 1
                 withContext(requestIdThreadLocal.asContextElement(123)) {
                     proceed()
+                    val a = 2
                 }
-
                 println(requestIdThreadLocal.get())
-                assertNull(requestIdThreadLocal.get(), "Thread local's context should be restored")
+                assertEquals(72, requestIdThreadLocal.get(), "Thread local's context should be restored")
             },

             {
                 // file has more than 4088 bytes
                 val channel = File(object {}.javaClass.getResource("/file").file).readChannel()
                 val result = ByteArray(4089)
-                channel.readFully(result, 0, 4089)
+//                channel.readFully(result, 0, 4089)
+
+                println("?")
+                val f = suspend { channel.readFully(result, 0, 4089) }
+                suspendCoroutine<Unit> { cont ->
+                    f.startCoroutine(Continuation(cont.context) {
+//                        println("Invoked with $it in " + Thread.c)
+                        cont.resumeWith(Result.success(Unit))
+                    })
+                }
+                println("I'm here")
+
+//
+//                val r = suspendCoroutineUninterceptedOrReturn<Int> { c ->
+//                    c.resumeWith(Result.success(42))
+//                    COROUTINE_SUSPENDED
+//                }
+//                println("Resumed " + r)
             }
         )

-        SuspendFunctionGun(Unit, Unit, interceptors as List<PipelineInterceptorFunction<Unit, Unit>>).execute(Unit)
+        try {
+            SuspendFunctionGun(Unit, Unit, interceptors as List<PipelineInterceptorFunction<Unit, Unit>>).execute(Unit)
+        } finally {
+            println(requestIdThreadLocal.get())
+        }
     }
 }
Index: build.gradle.kts
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/build.gradle.kts b/build.gradle.kts
--- a/build.gradle.kts  (revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ b/build.gradle.kts  (date 1667302275554)
@@ -34,7 +34,7 @@
     implementation("io.ktor:ktor-client-cio:$ktor_version")
     implementation("io.ktor:ktor-server-core:$ktor_version")
     testImplementation("io.ktor:ktor-server-tests:$ktor_version")
-    testImplementation("org.jetbrains.kotlin:kotlin-test:$ktor_version")
+    testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version")
 }

 kotlin.sourceSets["main"].kotlin.srcDirs("src")

We discussed it internally and figured out that this is the Ktor-sided problem (https://youtrack.jetbrains.com/issue/KTOR-2644/) that will be taken care of by Ktor team. The only known workaround for Ktor know is to enable "developer mode" that turns off SuspendFunGun. TL;DR SFG has reentrancy problem that breaks linear code flow and prevents finally block in the code that recovers thread-locals from kicking in

Closing as a third-party problem.

frost13it commented 1 year ago

Hi! Here I am again. The following code fails on 1.6.4:

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    doSomeJob()
    doSomeJob()
    doSomeJob()
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        withTimeoutOrNull<Any>(100) {
            withContext(CoroutineName("foo")) {
                awaitCancellation()
            }
        }
    }
    println("done")
}

playground

qwwdfsad commented 1 year ago

Thanks!

More deterministic repro:

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    doSomeJob(1)
}

private suspend fun doSomeJob(id: Int) {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        withContext<Any>(Dispatchers.Default) {

        }
        println("$id " + Thread.currentThread() + " " + threadLocal.get())
    }
    println("$id " + Thread.currentThread() + " " + threadLocal.get())
    check(threadLocal.get() == null)
}
qwwdfsad commented 1 year ago

I'm working on that, the ETA is 1.7.0-Beta|RC, the fix is unfortunately far from being trivial and can be basically boil down to #3253

qwwdfsad commented 1 year ago

A small update: there are two bugs: one in kotlinx.coroutines that affects only the code launched directly without kotlinx.coroutines (e.g. startCoroutineUnintercepted, suspend fun main etc.) and one in Ktor.

The bug in Ktor is fixed in 2.2.0-eap-553 (that is going to be 2.2.0 during the next two weeks or so), the coroutines one is not [yet]. So if the problem reproduces with Ktor handlers, the best workaround is to update Ktor version

michail-nikolaev commented 1 year ago

Just want to inform - we still see the issue in production with KTOR 2.2.2. But -Dio.ktor.internal.disable.sfg=true (disables SuspendFunctionGun resolves the issue in our case.

nkey0 commented 1 year ago

@qwwdfsad Could you please provide some insight about the state of the issue and plans? We have to avoid coroutines in our server-side code because of it.

qwwdfsad commented 1 year ago

Thanks for the reminder, no updates yet. We'll see if it's manageable in the scope of 1.8.0

cbruegg commented 4 months ago

Looks related to these bugs: