Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CopyableThreadContextElement implementation #3227

Merged
merged 26 commits into from Apr 4, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fe34623
Prototype of merge and eager copy
qwwdfsad Jan 31, 2022
091a3d7
CopyableThreadContextElement -- second iteration
qwwdfsad Feb 1, 2022
1857404
Speed-up newCoroutineContext for 'withContext' which is the biggest p…
qwwdfsad Feb 15, 2022
774031e
Better naming and @Delicate coroutines API
qwwdfsad Feb 15, 2022
8563d46
Update binary API dump
qwwdfsad Feb 15, 2022
7757b06
~documentation tweaks
qwwdfsad Mar 30, 2022
027e267
Update kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
qwwdfsad Apr 1, 2022
05479c9
Update kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
qwwdfsad Apr 1, 2022
1f7fa0b
Update kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
qwwdfsad Apr 1, 2022
e9f31d9
Update kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
qwwdfsad Apr 1, 2022
be8161a
Update kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
qwwdfsad Apr 1, 2022
a85e8fb
Update kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
qwwdfsad Apr 1, 2022
f572e8b
Update kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
qwwdfsad Apr 1, 2022
ff5083b
Update kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
qwwdfsad Apr 1, 2022
31b57ad
Update kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
qwwdfsad Apr 1, 2022
db0d613
Update kotlinx-coroutines-core/jvm/test/ThreadContextMutableCopiesTes…
qwwdfsad Apr 1, 2022
20bedf9
Update kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
qwwdfsad Apr 1, 2022
479481e
Update kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
qwwdfsad Apr 1, 2022
a32036c
Update kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
qwwdfsad Apr 1, 2022
64163c1
Update kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
qwwdfsad Apr 1, 2022
9750b59
~small tweaks
qwwdfsad Apr 1, 2022
5360968
Update kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
qwwdfsad Apr 1, 2022
20a0da8
~clarify doc sentence
qwwdfsad Apr 2, 2022
e9f02f1
Update kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
qwwdfsad Apr 4, 2022
9dfb868
~clarify doc sentence
qwwdfsad Apr 4, 2022
60c43c9
~
qwwdfsad Apr 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -141,7 +141,8 @@ public final class kotlinx/coroutines/CompletionHandlerException : java/lang/Run
}

public abstract interface class kotlinx/coroutines/CopyableThreadContextElement : kotlinx/coroutines/ThreadContextElement {
public abstract fun copyForChildCoroutine ()Lkotlinx/coroutines/CopyableThreadContextElement;
public abstract fun copyForChild ()Lkotlinx/coroutines/CopyableThreadContextElement;
public abstract fun mergeForChild (Lkotlin/coroutines/CoroutineContext$Element;)Lkotlin/coroutines/CoroutineContext;
}

public final class kotlinx/coroutines/CopyableThreadContextElement$DefaultImpls {
Expand All @@ -156,6 +157,7 @@ public abstract interface class kotlinx/coroutines/CopyableThrowable {
}

public final class kotlinx/coroutines/CoroutineContextKt {
public static final fun newCoroutineContext (Lkotlin/coroutines/CoroutineContext;Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
public static final fun newCoroutineContext (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
}

Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -148,7 +148,8 @@ public suspend fun <T> withContext(
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// Copy CopyableThreadContextElement if necessary
val newContext = oldContext.newCoroutineContext(context)
// always check for cancellation of new context
newContext.ensureActive()
// FAST PATH #1 -- new context is the same as the old one
Expand Down
10 changes: 9 additions & 1 deletion kotlinx-coroutines-core/common/src/CoroutineContext.common.kt
Expand Up @@ -8,10 +8,18 @@ import kotlin.coroutines.*

/**
* Creates a context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher or
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on)
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* and copyable thread local facilities on JVM.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*/
public expect fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext

/**
* Creates context for coroutine builder functions that do not launch a new coroutine, namely [withContext].
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* @suppress
*/
@InternalCoroutinesApi
public expect fun CoroutineContext.newCoroutineContext(addedContext: CoroutineContext): CoroutineContext

@PublishedApi
@Suppress("PropertyName")
internal expect val DefaultDelay: Delay
Expand Down
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/js/src/CoroutineContext.kt
Expand Up @@ -42,6 +42,10 @@ public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext):
combined + Dispatchers.Default else combined
}

public actual fun CoroutineContext.newCoroutineContext(addedContext: CoroutineContext): CoroutineContext {
return this + addedContext
}

// No debugging facilities on JS
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T = block()
internal actual inline fun <T> withContinuationContext(continuation: Continuation<*>, countOrElement: Any?, block: () -> T): T = block()
Expand Down
79 changes: 64 additions & 15 deletions kotlinx-coroutines-core/jvm/src/CoroutineContext.kt
Expand Up @@ -10,35 +10,84 @@ import kotlin.coroutines.jvm.internal.CoroutineStackFrame

/**
* Creates context for the new coroutine. It installs [Dispatchers.Default] when no other dispatcher nor
* [ContinuationInterceptor] is specified, and adds optional support for debugging facilities (when turned on).
* [ContinuationInterceptor] is specified, and adds optional support for
* copyable thread context [elements][CopyableThreadContextElement] and debugging facilities (when turned on).
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* See [DEBUG_PROPERTY_NAME] for description of debugging facilities on JVM.
*/
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext.foldCopiesForChildCoroutine() + context
val combined = foldCopies(coroutineContext, context, true)
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}

/**
* Returns the [CoroutineContext] for a child coroutine to inherit.
*
* If any [CopyableThreadContextElement] is in the [this], calls
* [CopyableThreadContextElement.copyForChildCoroutine] on each, returning a new [CoroutineContext]
* by folding the returned copied elements into [this].
*
* Returns [this] if `this` has zero [CopyableThreadContextElement] in it.
* Creates context for coroutine builder functions that do not launch a new coroutine, namely [withContext].
* @suppress
dkhalanskyjb marked this conversation as resolved.
Show resolved Hide resolved
*/
private fun CoroutineContext.foldCopiesForChildCoroutine(): CoroutineContext {
val hasToCopy = fold(false) { result, it ->
result || it is CopyableThreadContextElement<*>
@InternalCoroutinesApi
public actual fun CoroutineContext.newCoroutineContext(addedContext: CoroutineContext): CoroutineContext {
/*
* Fast-path: we only have to copy/merge if 'addedContext' (which typically has one or two elements)
* contains copyable element.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*/
if (!addedContext.fold(false, hasCopyableElements)) return this + addedContext
return foldCopies(this, addedContext, false)
}

private val hasCopyableElements: (Boolean, CoroutineContext.Element) -> Boolean = { result, it ->
result || it is CopyableThreadContextElement<*>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of this function (as well as the recurring pattern it's in) suggests these signature and implementation:

private fun CoroutineContext.hasCopyableElements(): Boolean =
    fold(false) { result, it -> result || it is CopyableThreadContextElement<*> }

(or a val, no opinion on that)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion, thanks! I've originally extracted the function to save the number of classfiles, but this is even better

}

/**
* Folds two contexts properly applying [CopyableThreadContextElement] rules when necessary.
* The rules are the following:
* * If both context do not have CTCE, the sum of two contexts is returned
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* * Every CTCE from left-hand side context that does not have matching (by key) element from right-hand side context
* is [copied][CopyableThreadContextElement.copyForChild]
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* * Every CTCE from left-hand side context that has matching element in right-hand side context is [merged][CopyableThreadContextElement.mergeForChild]
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* * Every CTCE from right-hand side context that hasn't been merged is copied
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* * Everything else is added to the resulting context as is.
*/
private fun foldCopies(originalContext: CoroutineContext, appendContext: CoroutineContext, isNewCoroutine: Boolean): CoroutineContext {
// Do we have something to copy left-hand side?
val hasElementsLeft = originalContext.fold(false, hasCopyableElements)
val hasElementsRight = appendContext.fold(false, hasCopyableElements)

// Nothing to fold, so just return the sum of contexts
if (!hasElementsLeft && !hasElementsRight) {
return originalContext + appendContext
}
if (!hasToCopy) return this
return fold<CoroutineContext>(EmptyCoroutineContext) { combined, it ->
combined + if (it is CopyableThreadContextElement<*>) it.copyForChildCoroutine() else it

var leftoverContext = appendContext
val folded = originalContext.fold<CoroutineContext>(EmptyCoroutineContext) { result, element ->
if (element !is CopyableThreadContextElement<*>) return@fold result + element
// Will this element be overwritten?
val newElement = leftoverContext[element.key]
// No, just copy it
if (newElement == null) {
// For 'withContext'-like builders we do not copy as the element is not shared
return@fold result + if (isNewCoroutine) element.copyForChild() else element
}
// Yes, then first remove the element from append context
leftoverContext = leftoverContext.minusKey(element.key)
// Return the sum
@Suppress("UNCHECKED_CAST")
return@fold result + (element as CopyableThreadContextElement<Any?>).mergeForChild(newElement)
}

if (hasElementsRight) {
leftoverContext = leftoverContext.fold<CoroutineContext>(EmptyCoroutineContext) { result, element ->
// We're appending new context element -- we have to copy it, otherwise it may be shared with others
if (element is CopyableThreadContextElement<*>) {
return@fold result + element.copyForChild()
}
return@fold result + element
Comment on lines +82 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (element is CopyableThreadContextElement<*>) {
return@fold result + element.copyForChild()
}
return@fold result + element
result + if (element is CopyableThreadContextElement<*>) { element.copyForChild() } else { element }

Copy link
Member Author

@qwwdfsad qwwdfsad Apr 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the original version is more readable (and debuggable)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, for some reason I can't place a breakpoint on block subexpressions here. How about this then?

Suggested change
if (element is CopyableThreadContextElement<*>) {
return@fold result + element.copyForChild()
}
return@fold result + element
result + if (element is CopyableThreadContextElement<*>) {
element.copyForChild()
} else {
element
}

If not, feel free to just disregard this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I prefer

if (condition) {
    return foo
}
doOrReturnOtherFoo

over

return if (condition) {
    foo
} else {
   otherFoo
}

to keep the number of nested blocks minimal. This is, of course, opinionated, so we can stick to the most "concise" one or to the proposed middle-ground if you do object

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I'm the opposite and will typically prefer the other way because it highlights the symmetry between the cases and simplifies mental parsing of the control flow. In particular, in this case, I certainly don't find nesting to be an issue.

In any case, we're splitting hairs here. Both versions are perfectly fine, I think.

}
}
return folded + leftoverContext
}

/**
Expand Down
28 changes: 24 additions & 4 deletions kotlinx-coroutines-core/jvm/src/ThreadContextElement.kt
Expand Up @@ -97,7 +97,7 @@ public interface ThreadContextElement<S> : CoroutineContext.Element {
* is in a coroutine:
*
* ```
* class TraceContextElement(private val traceData: TraceData?) : CopyableThreadContextElement<TraceData?> {
* class TraceContextElement(private val traceData: TraceData?) : CopyableThreadContextElement<TraceData?, TraceContextElement> {
* companion object Key : CoroutineContext.Key<TraceContextElement>
* override val key: CoroutineContext.Key<TraceContextElement> = Key
*
Expand All @@ -111,32 +111,52 @@ public interface ThreadContextElement<S> : CoroutineContext.Element {
* traceThreadLocal.set(oldState)
* }
*
* override fun copyForChildCoroutine(): CopyableThreadContextElement<TraceData?> {
* override fun copyForChild(): TraceContextElement {
* // Copy from the ThreadLocal source of truth at child coroutine launch time. This makes
* // ThreadLocal writes between resumption of the parent coroutine and the launch of the
* // child coroutine visible to the child.
* return TraceContextElement(traceThreadLocal.get()?.copy())
* }
*
* override fun mergeForChild(overwritingElement: CoroutineContext.Element): CoroutineContext
* // Merge operation defines how to handle situation when both
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* // parent coroutine has the element in the context and it was also
* // explicitly passed to a child coroutine.
* // If merge is not defined, the copy of the element can be returned.
* return TraceContextElement(traceThreadLocal.get()?.copy())
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* }
* }
* ```
*
* A coroutine using this mechanism can safely call Java code that assumes it's called using a
* `Thread`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the meaning here. Is it possible to call any Java code without using a Thread?

*/
@DelicateCoroutinesApi
dkhalanskyjb marked this conversation as resolved.
Show resolved Hide resolved
@ExperimentalCoroutinesApi
public interface CopyableThreadContextElement<S> : ThreadContextElement<S> {

/**
* Returns a [CopyableThreadContextElement] to replace `this` `CopyableThreadContextElement` in the child
* coroutine's context that is under construction.
* coroutine's context that is under construction if the child coroutine's context does not contain
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* an element with the same [key].
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* This function is called on the element each time a new coroutine inherits a context containing it,
* and the returned value is folded into the context given to the child.
*
* Since this method is called whenever a new coroutine is launched in a context containing this
* [CopyableThreadContextElement], implementations are performance-sensitive.
*/
public fun copyForChildCoroutine(): CopyableThreadContextElement<S>
public fun copyForChild(): CopyableThreadContextElement<S>

/**
* Returns a [CopyableThreadContextElement] to replace `this` `CopyableThreadContextElement` in the child
* coroutine's context when the child coroutine's context contains an element with the same [key] as the
* current one.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* This function is called on current element, supplied with an element retrieved from child's
* coroutine context by the current [key].
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*/
public fun mergeForChild(overwritingElement: CoroutineContext.Element): CoroutineContext
}

/**
Expand Down
13 changes: 8 additions & 5 deletions kotlinx-coroutines-core/jvm/test/ThreadContextElementTest.kt
Expand Up @@ -126,8 +126,7 @@ class ThreadContextElementTest : TestBase() {
@Test
fun testCopyableThreadContextElementImplementsWriteVisibility() = runTest {
newFixedThreadPoolContext(nThreads = 4, name = "withContext").use {
val startData = MyData()
withContext(it + CopyForChildCoroutineElement(startData)) {
withContext(it + CopyForChildCoroutineElement(MyData())) {
val forBlockData = MyData()
myThreadLocal.setForBlock(forBlockData) {
assertSame(myThreadLocal.get(), forBlockData)
Expand All @@ -153,7 +152,7 @@ class ThreadContextElementTest : TestBase() {
assertSame(myThreadLocal.get(), forBlockData)
}
}
assertSame(myThreadLocal.get(), startData) // Asserts value was restored.
assertNull(myThreadLocal.get()) // Asserts value was restored to its origin
}
}
}
Expand Down Expand Up @@ -187,7 +186,7 @@ class MyElement(val data: MyData) : ThreadContextElement<MyData?> {
}

/**
* A [ThreadContextElement] that implements copy semantics in [copyForChildCoroutine].
* A [ThreadContextElement] that implements copy semantics in [copyForChild].
*/
class CopyForChildCoroutineElement(val data: MyData?) : CopyableThreadContextElement<MyData?> {
companion object Key : CoroutineContext.Key<CopyForChildCoroutineElement>
Expand All @@ -201,6 +200,10 @@ class CopyForChildCoroutineElement(val data: MyData?) : CopyableThreadContextEle
return oldState
}

override fun mergeForChild(overwritingElement: CoroutineContext.Element): CopyForChildCoroutineElement {
TODO("Not used in tests")
}

override fun restoreThreadContext(context: CoroutineContext, oldState: MyData?) {
myThreadLocal.set(oldState)
}
Expand All @@ -216,7 +219,7 @@ class CopyForChildCoroutineElement(val data: MyData?) : CopyableThreadContextEle
* will be reflected in the parent coroutine's [CopyForChildCoroutineElement] when it yields the
* thread and calls [restoreThreadContext].
*/
override fun copyForChildCoroutine(): CopyableThreadContextElement<MyData?> {
override fun copyForChild(): CopyForChildCoroutineElement {
return CopyForChildCoroutineElement(myThreadLocal.get())
}
}
Expand Down