Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread local is not cleaned up sometimes #2930

Open
frost13it opened this issue Sep 13, 2021 · 29 comments
Open

Thread local is not cleaned up sometimes #2930

frost13it opened this issue Sep 13, 2021 · 29 comments
Assignees

Comments

@frost13it
Copy link

I'm using a ThreadContextElement that sets value of a ThreadLocal. After resolving of #985 it worked perfectly.
But after upgrade to 1.5.0 I've got a similar problem: sometimes the last value of the thread local stucks in a worker thread.
Equivalent code:

while(true) {
    someCode {
        // here the thread local may already have a value from previous iteration
        withContext(threadLocal.asContextElement("foo")) {
            someOtherCode()
        }
    }
}

Actual code of the ThreadContextElement implementation is here.

It is hard to reproduce the issue, but I'm facing it periodically in production (it may take hours or days to arise).
Tested 1.5.0 and 1.5.2, both behaves the same. Running it with -ea.

@qwwdfsad qwwdfsad added the bug label Sep 27, 2021
@qwwdfsad
Copy link
Member

qwwdfsad commented Sep 27, 2021

It's hard to tell what exactly is wrong without seeing the whole coroutine's hierarchy.

What I suspect can be a root cause is a 3rd-party implementation of coroutine builder that does not implement CoroutineStackFrame or completely rewrite the coroutine context instead of overwriting only required elements by +.

#985 uses stackwalking capabilities, leveraging CoroutineStackFrame and the fact that all suspending coroutine builders implement it and also relies on the fact that context is properly propagated.

When the exception is thrown, can you please check if the coroutine context in the most nested coroutine contains UndispatchedMarker?

@frost13it
Copy link
Author

Thanks for the response. I'll check that.

@frost13it
Copy link
Author

I've checked presence of the UndispatchedMarker. It is here inside withContext(threadLocal) { } (but not outside it) and in the most nested suspension points (which are CompletableFuture.await()).
Besides CompletableFuture.await(), the only coroutine-related things in the project are basic coroutine builders (withContext { }, coroutineScope { }, launch { }, withTimeout { }). There is no 3rd-party builders or anything like that.

@michail-nikolaev
Copy link

michail-nikolaev commented Oct 12, 2021

Hello.

We are getting something like this after few days in production...

We have a loop like this:


class RequestContextsStorage()
val threadLocalForRequestContext = ThreadLocal<RequestContextsStorage>()

class RequestContextThreadContextElement(private val storage: RequestContextsStorage) :
    ThreadContextElement<RequestContextsStorage> {

    // Key for CoroutineContext key-value storage
    private object Key : CoroutineContext.Key<RequestContextThreadContextElement>

    override val key: CoroutineContext.Key<*> get() = Key

    override fun updateThreadContext(context: CoroutineContext): RequestContextsStorage {
        val oldState = threadLocalForRequestContext.get()
        threadLocalForRequestContext.set(storage)
        return oldState
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: RequestContextsStorage) {
        threadLocalForRequestContext.set(oldState)
    }
}

private var otherThreadLocal = ThreadLocal<String?>()
private val scope = CoroutineScope(Dispatchers.IO)

scope.launch(otherThreadLocal.asContextElement("x")) {
    while (isActive) {
        delay(100)
        // sometimes here we could see some value in **threadLocalForRequestContext**
        someStuff()
    }
}

Also, all builders are pretty standard. Maybe some tricks with cancellation\exceptions\etc...

1.5.1 version.

And of course, we have a lot of code like this:

runBlocking(Dispatchers.IO) {
    withContext(RequestContextThreadContextElement(someValue) + otherThreadLocal.asContextElement("x")) {
         // everything seems be fine here
    }
}

@michail-nikolaev
Copy link

Workaround like this:

scope.launch(otherThreadLocal.asContextElement("x")) {
    while (isActive) {
      **withContext(RequestContextThreadContextElement(empty)) {**
            delay(100)
            // sometimes here we could see some value in **threadLocalForRequestContext**
            someStuff()
        }
    }
}

fixed the issue in our case.

@frost13it
Copy link
Author

Still reproducible on 1.6.0.

@frost13it
Copy link
Author

frost13it commented Jan 21, 2022

Finally I've managed to write a small reproducer:

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    while (true) {
        coroutineScope {
            repeat(100) {
                launch {
                    doSomeJob()
                }
            }
        }
    }
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        val semaphore = Semaphore(1, 1)
        suspendCancellableCoroutine<Unit> { cont ->
            Dispatchers.Default.asExecutor().execute {
                cont.resume(Unit)
            }
        }
        cancel()
        semaphore.acquire()
    }
}

It completes almost instantly on my machine and takes some time on play.kotlinlang.org.

@qwwdfsad
Copy link
Member

Great job with a reproducer! Verified it reproduces, we'll fix it in 1.6.1

qwwdfsad added a commit that referenced this issue Jan 25, 2022
… in order to avoid state interference when the coroutine is updated concurrently.

Concurrency is inevitable in this scenario: when the coroutine that has UndispatchedCoroutine as its completion suspends, we have to clear the thread context, but while we are doing so, concurrent resume of the coroutine could've happened that also ends up in save/clear/update context

Fixes #2930
@frost13it
Copy link
Author

Is there a planned release date for 1.6.1?

@frost13it
Copy link
Author

Unfortunately, the issue seems to be still there.

The following code throws an exception on versions from 1.5.0 till current develop branch (262876b):

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    doSomeJob()
    doSomeJob()
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        try {
            coroutineScope {
                val semaphore = Semaphore(1, 1)
                dummyAwait()
                cancel()
                semaphore.acquire()
            }
        } catch (e: CancellationException) {
            println("cancelled")
        }
    }
}

private suspend fun dummyAwait() {
    CompletableFuture.runAsync({ }, Dispatchers.Default.asExecutor()).await()
}

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 4, 2022

Could you please recheck on 1.6.1?

I cannot reproduce it as is, I will give a few tries a bit later to see if it still reproduces. Anyway, 1.6.1 fixes at least one serious bug in thread locals, so it's worth upgrading

@frost13it
Copy link
Author

The same on 1.6.1, every time. Checked on Liberica JDK 11.0.14 and some build of OpenJDK 17.
Tried kotlinx.coroutines.scheduler.core.pool.size of 1-8, no changes. Is there something else that could depend on environment?

@qwwdfsad qwwdfsad reopened this Apr 4, 2022
@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 4, 2022

Aha, I see, it only reproduces with non kotlinx.coroutines-related entry point, namely suspend fun main!

I'll fix it separately. Meanwhile, it would be nice to see if you are still affected in the production environment as it's unlikely to be the case that someone has suspend provider without integration with kotlinx.coroutines.

Depending on that we'll decide on an urgency of the fix

@frost13it
Copy link
Author

Indeed, suspend fun main() is not a production case. But it is pretty sad (not fatal, though) when a "quick and dirty" piece of code fails with such exception. I'm faced this situation using one of our support tools.

The potential production case is an application based on Ktor 1.6.8. When using Netty engine and kotlinx.coroutines 1.6.1, it fails exactly the same way. I don't know if it is a Ktor issue, but I achieved the same effect without it.
Since this bug scares me as hell now, I can't leave it without attention. When using CIO engine, everything is ok, but who knows where it will strike again without any chance for a quick fix.

We have some hooks in the infrastructure to ensure that code runs with an initial value of ThreadLocal. This approach could work even on pre-1.4.3 kotlinx.coroutines and let us work around this bug in most cases. But still every GlobalScope.launch { } is a potential bomb.

@frost13it
Copy link
Author

The reproducer for Ktor does not differ much:

val threadLocal = ThreadLocal<String>()

fun main() {
    val engine = embeddedServer(Netty, port = 8080) {
        routing {
            get {
                doSomeJob()
                doSomeJob()
            }
        }
    }
    engine.start()
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        try {
            coroutineScope {
                val semaphore = Semaphore(1, 1)
                dummyAwait()
                cancel()
                semaphore.acquire()
            }
        } catch (e: CancellationException) {
            println("cancelled")
        }
    }
}

private suspend fun dummyAwait() {
    CompletableFuture.runAsync({ }, Dispatchers.Default.asExecutor()).await()
}

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 13, 2022

Thanks for both Ktor and regular reproducer!

The source of the issue is indeed non-kotlinx.coroutines related entry point that Ktor leverages in order to optimize its internal machinery (SuspendFunGun). #3155 fixed completely different bug that happened to be reproducible with the very same snippet :)

I have a potential solution in mind (#3252) and also future-proof plan to avoid similar problems (#3253), I believe this issue itself is enough to release 1.6.2 with a fix, though I cannot give you a strict timeline here

qwwdfsad added a commit that referenced this issue Apr 14, 2022
dee-tree pushed a commit to dee-tree/kotlinx.coroutines that referenced this issue Jul 21, 2022
Kotlin#3155)

* Confine context-specific state to the thread in UndispatchedCoroutine in order to avoid state interference when the coroutine is updated concurrently.

Concurrency is inevitable in this scenario: when the coroutine that has UndispatchedCoroutine as its completion suspends, we have to clear the thread context, but while we are doing so, concurrent resume of the coroutine could've happened that also ends up in save/clear/update context

Fixes Kotlin#2930
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
Kotlin#3155)

* Confine context-specific state to the thread in UndispatchedCoroutine in order to avoid state interference when the coroutine is updated concurrently.

Concurrency is inevitable in this scenario: when the coroutine that has UndispatchedCoroutine as its completion suspends, we have to clear the thread context, but while we are doing so, concurrent resume of the coroutine could've happened that also ends up in save/clear/update context

Fixes Kotlin#2930
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
…ercepted with DispatchedContinuation (Kotlin#3252)

* Properly preserve thread local values for coroutines that are not intercepted with DispatchedContinuation

Fixes Kotlin#2930
@michail-nikolaev
Copy link

Just want to notify - KTOR users are still affected by the issue even on latest 1.6.4 version (SuspendFunctionGun)

https://youtrack.jetbrains.com/issue/KTOR-2644/Seems-like-Restore-thread-context-elements-when-directly-resuming-to-parent-is-still-broken#focus=Comments-27-6530727.0-0

@qwwdfsad
Copy link
Member

@michail-nikolaev could you please share a reproducer?

@michail-nikolaev
Copy link

@qwwdfsad I have updated repo with new versions and new test based on Aleksei Tirman provided.

https://github.com/michail-nikolaev/kotlin-coroutines-thread-local/blob/master/test/SuspendFunctionGunTest.kt (and copy past of KTOR in https://github.com/michail-nikolaev/kotlin-coroutines-thread-local/blob/master/test/CopyPast.kt)

@qwwdfsad qwwdfsad reopened this Oct 18, 2022
@qwwdfsad
Copy link
Member

I'll investigate it, thanks

@qwwdfsad qwwdfsad self-assigned this Nov 1, 2022
@qwwdfsad
Copy link
Member

qwwdfsad commented Nov 1, 2022

Preliminary findings are that the problem is caused by SuspendFunGun implementation from Ktor

@qwwdfsad
Copy link
Member

qwwdfsad commented Nov 2, 2022

Here is the patch to make a reproducer from https://github.com/michail-nikolaev/kotlin-coroutines-thread-local/ work (NB: reproducer is non-deterministic and happen to break only under debugger due to its timing-sensitive nature)

Subject: [PATCH] 1
---
Index: test/ApplicationTest.kt
===================================================================
diff --git a/test/ApplicationTest.kt b/test/ApplicationTest.kt
deleted file mode 100644
--- a/test/ApplicationTest.kt	(revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ /dev/null	(revision efb68c79e32d8beddd416b48a73eee8141274f1d)
@@ -1,37 +0,0 @@
-package bug.reproduce
-
-import io.ktor.http.HttpMethod
-import io.ktor.http.HttpStatusCode
-import io.ktor.server.testing.handleRequest
-import io.ktor.server.testing.withTestApplication
-import kotlin.test.Test
-import kotlin.test.assertEquals
-
-class ApplicationTest {
-    @Test
-    fun testWorks() {
-        withTestApplication({ module(testing = true) }) {
-            handleRequest(HttpMethod.Get, "/works").apply {
-                assertEquals(HttpStatusCode.OK, response.status())
-            }
-        }
-    }
-
-    @Test
-    fun testBroken() {
-        withTestApplication({ module(testing = true) }) {
-            handleRequest(HttpMethod.Get, "/broken").apply {
-                assertEquals(HttpStatusCode.OK, response.status())
-            }
-        }
-    }
-
-    @Test
-    fun testAlsoBroken() {
-        withTestApplication({ module(testing = true) }) {
-            handleRequest(HttpMethod.Get, "/also-broken").apply {
-                assertEquals(HttpStatusCode.InternalServerError, response.status())
-            }
-        }
-    }
-}
Index: test/CopyPast.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/test/CopyPast.kt b/test/CopyPast.kt
--- a/test/CopyPast.kt	(revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ b/test/CopyPast.kt	(date 1667320099390)
@@ -1,6 +1,5 @@
-package io.ktor.util.pipeline
+package foo
 
-import io.ktor.util.KtorDsl
 import kotlinx.coroutines.CoroutineScope
 import kotlin.coroutines.Continuation
 import kotlin.coroutines.CoroutineContext
@@ -30,7 +29,6 @@
     }
 }
 
-@KtorDsl
 public abstract class PipelineContext<TSubject : Any, TContext : Any>(
     public val context: TContext
 ) : CoroutineScope {
@@ -62,7 +60,7 @@
     initial: TSubject,
     context: TContext,
     private val blocks: List<PipelineInterceptorFunction<TSubject, TContext>>
-) : io.ktor.util.pipeline.PipelineContext<TSubject, TContext>(context) {
+) : PipelineContext<TSubject, TContext>(context) {
 
     override val coroutineContext: CoroutineContext get() = continuation.context
 
@@ -89,11 +87,11 @@
             // and simply return StackWalkingFailedFrame on any unfortunate accident
 
             try {
-                val result = suspensions[currentIndex] ?: return io.ktor.util.pipeline.StackWalkingFailedFrame
+                val result = suspensions[currentIndex] ?: return StackWalkingFailedFrame
                 currentIndex -= 1
                 return result
             } catch (_: Throwable) {
-                return io.ktor.util.pipeline.StackWalkingFailedFrame
+                return StackWalkingFailedFrame
             }
         }
 
Index: test/SuspendFunctionGunTest.kt
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/test/SuspendFunctionGunTest.kt b/test/SuspendFunctionGunTest.kt
--- a/test/SuspendFunctionGunTest.kt	(revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ b/test/SuspendFunctionGunTest.kt	(date 1667406449175)
@@ -1,36 +1,62 @@
-package io.ktor.util.pipeline
+package foo
 
-import io.ktor.util.cio.readChannel
-import kotlinx.coroutines.asContextElement
-import kotlinx.coroutines.runBlocking
-import kotlinx.coroutines.withContext
+import foo.*
+import io.ktor.util.cio.*
+import kotlinx.coroutines.*
 import java.io.File
-import kotlin.test.Test
-import kotlin.test.assertNull
+import kotlin.concurrent.*
+import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
+import kotlin.test.*
 
-private val requestIdThreadLocal = ThreadLocal<Int?>()
+private val requestIdThreadLocal = ThreadLocal.withInitial { 72 }
+public typealias PipelineInterceptor<TSubject, TContext> =
+        suspend PipelineContext<TSubject, TContext>.(TSubject) -> Unit
 
 class SuspendFunctionGunTest {
+
     @Test
     fun test() = runBlocking {
         val interceptors = listOf<PipelineInterceptor<Unit, Unit>>(
             {
+                // 1
                 withContext(requestIdThreadLocal.asContextElement(123)) {
                     proceed()
+                    val a = 2
                 }
-
                 println(requestIdThreadLocal.get())
-                assertNull(requestIdThreadLocal.get(), "Thread local's context should be restored")
+                assertEquals(72, requestIdThreadLocal.get(), "Thread local's context should be restored")
             },
 
             {
                 // file has more than 4088 bytes
                 val channel = File(object {}.javaClass.getResource("/file").file).readChannel()
                 val result = ByteArray(4089)
-                channel.readFully(result, 0, 4089)
+//                channel.readFully(result, 0, 4089)
+
+                println("?")
+                val f = suspend { channel.readFully(result, 0, 4089) }
+                suspendCoroutine<Unit> { cont ->
+                    f.startCoroutine(Continuation(cont.context) {
+//                        println("Invoked with $it in " + Thread.c)
+                        cont.resumeWith(Result.success(Unit))
+                    })
+                }
+                println("I'm here")
+
+//
+//                val r = suspendCoroutineUninterceptedOrReturn<Int> { c ->
+//                    c.resumeWith(Result.success(42))
+//                    COROUTINE_SUSPENDED
+//                }
+//                println("Resumed " + r)
             }
         )
 
-        SuspendFunctionGun(Unit, Unit, interceptors as List<PipelineInterceptorFunction<Unit, Unit>>).execute(Unit)
+        try {
+            SuspendFunctionGun(Unit, Unit, interceptors as List<PipelineInterceptorFunction<Unit, Unit>>).execute(Unit)
+        } finally {
+            println(requestIdThreadLocal.get())
+        }
     }
 }
Index: build.gradle.kts
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/build.gradle.kts b/build.gradle.kts
--- a/build.gradle.kts	(revision efb68c79e32d8beddd416b48a73eee8141274f1d)
+++ b/build.gradle.kts	(date 1667302275554)
@@ -34,7 +34,7 @@
     implementation("io.ktor:ktor-client-cio:$ktor_version")
     implementation("io.ktor:ktor-server-core:$ktor_version")
     testImplementation("io.ktor:ktor-server-tests:$ktor_version")
-    testImplementation("org.jetbrains.kotlin:kotlin-test:$ktor_version")
+    testImplementation("org.jetbrains.kotlin:kotlin-test-junit:$kotlin_version")
 }
 
 kotlin.sourceSets["main"].kotlin.srcDirs("src")

We discussed it internally and figured out that this is the Ktor-sided problem (https://youtrack.jetbrains.com/issue/KTOR-2644/) that will be taken care of by Ktor team.
The only known workaround for Ktor know is to enable "developer mode" that turns off SuspendFunGun.
TL;DR SFG has reentrancy problem that breaks linear code flow and prevents finally block in the code that recovers thread-locals from kicking in

Closing as a third-party problem.

@qwwdfsad qwwdfsad closed this as completed Nov 2, 2022
@frost13it
Copy link
Author

Hi! Here I am again. The following code fails on 1.6.4:

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    doSomeJob()
    doSomeJob()
    doSomeJob()
}

private suspend fun doSomeJob() {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        withTimeoutOrNull<Any>(100) {
            withContext(CoroutineName("foo")) {
                awaitCancellation()
            }
        }
    }
    println("done")
}

playground

@qwwdfsad qwwdfsad reopened this Nov 29, 2022
@qwwdfsad
Copy link
Member

qwwdfsad commented Nov 29, 2022

Thanks!

More deterministic repro:

val threadLocal = ThreadLocal<String>()

suspend fun main() {
    doSomeJob(1)
}

private suspend fun doSomeJob(id: Int) {
    check(threadLocal.get() == null)
    withContext(threadLocal.asContextElement("foo")) {
        withContext<Any>(Dispatchers.Default) {
           
        }
        println("$id " + Thread.currentThread() + " " + threadLocal.get())
    }
    println("$id " + Thread.currentThread() + " " + threadLocal.get())
    check(threadLocal.get() == null)
}

@qwwdfsad
Copy link
Member

I'm working on that, the ETA is 1.7.0-Beta|RC, the fix is unfortunately far from being trivial and can be basically boil down to #3253

@qwwdfsad
Copy link
Member

A small update: there are two bugs: one in kotlinx.coroutines that affects only the code launched directly without kotlinx.coroutines (e.g. startCoroutineUnintercepted, suspend fun main etc.) and one in Ktor.

The bug in Ktor is fixed in 2.2.0-eap-553 (that is going to be 2.2.0 during the next two weeks or so), the coroutines one is not [yet]. So if the problem reproduces with Ktor handlers, the best workaround is to update Ktor version

@michail-nikolaev
Copy link

michail-nikolaev commented Jan 13, 2023

Just want to inform - we still see the issue in production with KTOR 2.2.2. But -Dio.ktor.internal.disable.sfg=true (disables SuspendFunctionGun resolves the issue in our case.

@nkey0
Copy link

nkey0 commented Aug 9, 2023

@qwwdfsad Could you please provide some insight about the state of the issue and plans? We have to avoid coroutines in our server-side code because of it.

@qwwdfsad
Copy link
Member

qwwdfsad commented Aug 9, 2023

Thanks for the reminder, no updates yet. We'll see if it's manageable in the scope of 1.8.0

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants