Skip to content

Commit

Permalink
Restore thread context elements when directly resuming to parent
Browse files Browse the repository at this point in the history
This fix solves the problem of restoring thread-context when
returning to another context in undispatched way. It impacts
suspend/resume performance of coroutines that use ThreadContextElement
since we have to walk up the coroutine completion stack in search for
parent UndispatchedCoroutine. However, there is a fast-path to ensure
that there is no performance impact in cases when ThreadContextElement
is not used by a coroutine.

Fixes #985
  • Loading branch information
elizarov authored and qwwdfsad committed Dec 1, 2020
1 parent 167c44e commit 5d0298f
Show file tree
Hide file tree
Showing 9 changed files with 299 additions and 21 deletions.
14 changes: 3 additions & 11 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -207,25 +207,17 @@ private class LazyStandaloneCoroutine(
}

// Used by withContext when context changes, but dispatcher stays the same
private class UndispatchedCoroutine<in T>(
internal expect class UndispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
override fun afterResume(state: Any?) {
// resume undispatched -- update context by stay on the same dispatcher
val result = recoverResult(state, uCont)
withCoroutineContext(uCont.context, null) {
uCont.resumeWith(result)
}
}
}
) : ScopeCoroutine<T>

private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2

// Used by withContext when context dispatcher changes
private class DispatchedCoroutine<in T>(
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
Expand Down
Expand Up @@ -19,5 +19,6 @@ internal expect val DefaultDelay: Delay

// countOrElement -- pre-cached value for ThreadContext.kt
internal expect inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T
internal expect inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T
internal expect fun Continuation<*>.toDebugString(): String
internal expect val CoroutineContext.coroutineName: String?
Expand Up @@ -23,7 +23,7 @@ internal class DispatchedContinuation<in T>(
@JvmField
@Suppress("PropertyName")
internal var _state: Any? = UNDEFINED
override val callerFrame: CoroutineStackFrame? = continuation as? CoroutineStackFrame
override val callerFrame: CoroutineStackFrame? get() = continuation as? CoroutineStackFrame
override fun getStackTraceElement(): StackTraceElement? = null
@JvmField // pre-cached value to avoid ctx.fold on every resumption
internal val countOrElement = threadContextElements(context)
Expand Down Expand Up @@ -235,7 +235,7 @@ internal class DispatchedContinuation<in T>(

@Suppress("NOTHING_TO_INLINE") // we need it inline to save us an entry on the stack
inline fun resumeUndispatchedWith(result: Result<T>) {
withCoroutineContext(context, countOrElement) {
withContinuationContext(continuation, countOrElement) {
continuation.resumeWith(result)
}
}
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt
Expand Up @@ -85,9 +85,9 @@ internal abstract class DispatchedTask<in T>(
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
withContinuationContext(continuation, delegate.countOrElement) {
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
val exception = getExceptionalResult(state)
/*
* Check whether continuation was originally resumed with an exception.
Expand Down
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/js/src/CoroutineContext.kt
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlin.browser.*
import kotlin.coroutines.*

Expand Down Expand Up @@ -49,5 +50,13 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):

// No debugging facilities on JS
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T = block()
internal actual fun Continuation<*>.toDebugString(): String = toString()
internal actual val CoroutineContext.coroutineName: String? get() = null // not supported on JS

internal actual class UndispatchedCoroutine<in T> actual constructor(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
override fun afterResume(state: Any?) = uCont.resumeWith(recoverResult(state, uCont))
}
70 changes: 69 additions & 1 deletion kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Expand Up @@ -2,12 +2,14 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.scheduling.*
import java.util.concurrent.atomic.*
import kotlin.coroutines.*
import kotlin.coroutines.jvm.internal.*

internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"

Expand Down Expand Up @@ -48,6 +50,72 @@ internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, c
}
}

/**
* Executes a block using a context of a given continuation.
*/
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T {
val context = continuation.context
val oldValue = updateThreadContext(context, countOrElement)
val undispatchedCompletion = if (oldValue !== NO_THREAD_ELEMENTS) {
// Only if some values were replaced we'll go to the slow path of figuring out where/how to restore them
continuation.undispatchedCompletion()
} else
null // fast path -- don't even try to find undispatchedCompletion as there's nothing to restore in the context
undispatchedCompletion?.saveThreadContext(context, oldValue)
try {
return block()
} finally {
if (undispatchedCompletion == null || undispatchedCompletion.clearThreadContext())
restoreThreadContext(context, oldValue)
}
}

internal tailrec fun Continuation<*>.undispatchedCompletion(): UndispatchedCoroutine<*>? {
// Find direct completion of this continuation
val completion: Continuation<*> = when (this) {
is BaseContinuationImpl -> completion ?: return null // regular suspending function -- direct resume
is DispatchedCoroutine -> return null // dispatches on resume
is ScopeCoroutine -> uCont // other scoped coroutine -- direct resume
else -> return null // something else -- not supported
}
if (completion is UndispatchedCoroutine<*>) return completion // found UndispatchedCoroutine!
return completion.undispatchedCompletion() // walk up the call stack with tail call
}

// Used by withContext when context changes, but dispatcher stays the same
internal actual class UndispatchedCoroutine<in T> actual constructor(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
private var savedContext: CoroutineContext? = null
private var savedOldValue: Any? = null

fun saveThreadContext(context: CoroutineContext, oldValue: Any?) {
savedContext = context
savedOldValue = oldValue
}

fun clearThreadContext(): Boolean {
if (savedContext == null) return false
savedContext = null
savedOldValue = null
return true
}

override fun afterResume(state: Any?) {
savedContext?.let { context ->
restoreThreadContext(context, savedOldValue)
savedContext = null
savedOldValue = null
}
// resume undispatched -- update context but stay on the same dispatcher
val result = recoverResult(state, uCont)
withContinuationContext(uCont, null) {
uCont.resumeWith(result)
}
}
}

internal actual val CoroutineContext.coroutineName: String? get() {
if (!DEBUG) return null
val coroutineId = this[CoroutineId] ?: return null
Expand Down
9 changes: 5 additions & 4 deletions kotlinx-coroutines-core/jvm/src/internal/ThreadContext.kt
Expand Up @@ -7,8 +7,8 @@ package kotlinx.coroutines.internal
import kotlinx.coroutines.*
import kotlin.coroutines.*


private val ZERO = Symbol("ZERO")
@JvmField
internal val NO_THREAD_ELEMENTS = Symbol("NO_THREAD_ELEMENTS")

// Used when there are >= 2 active elements in the context
private class ThreadState(val context: CoroutineContext, n: Int) {
Expand Down Expand Up @@ -60,12 +60,13 @@ private val restoreState =
internal actual fun threadContextElements(context: CoroutineContext): Any = context.fold(0, countAll)!!

// countOrElement is pre-cached in dispatched continuation
// returns NO_THREAD_ELEMENTS if the contest does not have any ThreadContextElements
internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?): Any? {
@Suppress("NAME_SHADOWING")
val countOrElement = countOrElement ?: threadContextElements(context)
@Suppress("IMPLICIT_BOXING_IN_IDENTITY_EQUALS")
return when {
countOrElement === 0 -> ZERO // very fast path when there are no active ThreadContextElements
countOrElement === 0 -> NO_THREAD_ELEMENTS // very fast path when there are no active ThreadContextElements
// ^^^ identity comparison for speed, we know zero always has the same identity
countOrElement is Int -> {
// slow path for multiple active ThreadContextElements, allocates ThreadState for multiple old values
Expand All @@ -82,7 +83,7 @@ internal fun updateThreadContext(context: CoroutineContext, countOrElement: Any?

internal fun restoreThreadContext(context: CoroutineContext, oldState: Any?) {
when {
oldState === ZERO -> return // very fast path when there are no ThreadContextElements
oldState === NO_THREAD_ELEMENTS -> return // very fast path when there are no ThreadContextElements
oldState is ThreadState -> {
// slow path with multiple stored ThreadContextElements
oldState.start()
Expand Down

0 comments on commit 5d0298f

Please sign in to comment.