Skip to content

Commit

Permalink
Simplify internal coroutines machinery (Kotlin#2512)
Browse files Browse the repository at this point in the history
* Merge onStartInternal and onStart to reduce the number of methods and make code a bit simpler
* Rework initParentJob
* Always establish a parent-child relationship when creating a subclass of AbstractCoroutine. That's our own internal class that we have full control of and it never has a chance to leak to the user-code (so cancellation handlers will be installed etc.).  Force implementors of AbstractCoroutine deliberately choose whether parent-child relationship should be established
 * As a consequence, get rid of parentContext in all our coroutine classes that are not ScopeCoroutine
* Remove some dead code
* Get rid of an additional parent field from ScopeCoroutine
 Leverage already presented information in our implementation, just expose it via an already present internal interface
  • Loading branch information
qwwdfsad authored and pablobaxter committed Sep 14, 2022
1 parent 15ee7f5 commit 40fb826
Show file tree
Hide file tree
Showing 37 changed files with 259 additions and 109 deletions.
Expand Up @@ -299,7 +299,7 @@ private class ToContinuation<T>(
*/
private class ListenableFutureCoroutine<T>(
context: CoroutineContext
) : AbstractCoroutine<T>(context) {
) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {

// JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
@JvmField
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.guava
Expand Down Expand Up @@ -747,4 +747,12 @@ class ListenableFutureTest : TestBase() {
latch.countDown()
return future
}

@Test
fun testCancelledParent() = runTest({ it is CancellationException }) {
cancel()
future { expectUnreached() }
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}
}
2 changes: 1 addition & 1 deletion integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Expand Up @@ -48,7 +48,7 @@ public fun <T> CoroutineScope.future(
private class CompletableFutureCoroutine<T>(
context: CoroutineContext,
private val future: CompletableFuture<T>
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> {
override fun accept(value: T?, exception: Throwable?) {
cancel()
}
Expand Down
10 changes: 9 additions & 1 deletion integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.future
Expand Down Expand Up @@ -567,4 +567,12 @@ class FutureTest : TestBase() {
assertFailsWith<CancellationException> { stage.await() }
finish(4)
}

@Test
fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) {
cancel()
future { expectUnreached() }
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -1,7 +1,5 @@
public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/JobSupport, kotlin/coroutines/Continuation, kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/Job {
protected final field parentContext Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;Z)V
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lkotlin/coroutines/CoroutineContext;ZZ)V
protected fun afterResume (Ljava/lang/Object;)V
protected fun cancellationExceptionMessage ()Ljava/lang/String;
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
Expand All @@ -10,10 +8,8 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
protected fun onCancelled (Ljava/lang/Throwable;Z)V
protected fun onCompleted (Ljava/lang/Object;)V
protected final fun onCompletionInternal (Ljava/lang/Object;)V
protected fun onStart ()V
public final fun resumeWith (Ljava/lang/Object;)V
public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public final fun start (Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;)V
}

public final class kotlinx/coroutines/AwaitKt {
Expand Down Expand Up @@ -89,6 +85,7 @@ public final class kotlinx/coroutines/CancellableContinuationKt {

public abstract interface class kotlinx/coroutines/ChildHandle : kotlinx/coroutines/DisposableHandle {
public abstract fun childCancelled (Ljava/lang/Throwable;)Z
public abstract fun getParent ()Lkotlinx/coroutines/Job;
}

public abstract interface class kotlinx/coroutines/ChildJob : kotlinx/coroutines/Job {
Expand Down Expand Up @@ -420,6 +417,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
protected fun handleJobException (Ljava/lang/Throwable;)Z
protected final fun initParentJob (Lkotlinx/coroutines/Job;)V
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
public fun isActive ()Z
Expand All @@ -431,6 +429,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
protected fun onCancelling (Ljava/lang/Throwable;)V
protected fun onCompletionInternal (Ljava/lang/Object;)V
protected fun onStart ()V
public final fun parentCancelled (Lkotlinx/coroutines/ParentJob;)V
public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
Expand Down Expand Up @@ -473,6 +472,7 @@ public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/C
public static final field INSTANCE Lkotlinx/coroutines/NonDisposableHandle;
public fun childCancelled (Ljava/lang/Throwable;)Z
public fun dispose ()V
public fun getParent ()Lkotlinx/coroutines/Job;
public fun toString ()Ljava/lang/String;
}

Expand Down
68 changes: 18 additions & 50 deletions kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Expand Up @@ -8,7 +8,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.CoroutineStart.*
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
* Abstract base class for implementation of coroutines in coroutine builders.
Expand All @@ -26,20 +25,32 @@ import kotlin.jvm.*
* * [onCancelled] in invoked when the coroutine completes with an exception (cancelled).
*
* @param parentContext the context of the parent coroutine.
* @param initParentJob specifies whether the parent-child relationship should be instantiated directly
* in `AbstractCoroutine` constructor. If set to `false`, it's the responsibility of the child class
* to invoke [initParentJob] manually.
* @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state.
* See [Job] for details.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

init {
/*
* Setup parent-child relationship between the parent in the context and the current coroutine.
* It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
* It is dangerous to install parent-child relationship here if the coroutine class
* operates its state from within onCancelled or onCancelling
* (with exceptions for rx integrations that can't have any parent)
*/
if (initParentJob) initParentJob(parentContext[Job])
}

/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
Expand All @@ -53,28 +64,6 @@ public abstract class AbstractCoroutine<in T>(

override val isActive: Boolean get() = super.isActive

/**
* Initializes the parent job from the `parentContext` of this coroutine that was passed to it during construction.
* It shall be invoked at most once after construction after all other initialization.
*
* Invocation of this function may cause this coroutine to become cancelled if the parent is already cancelled,
* in which case it synchronously invokes all the corresponding handlers.
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun initParentJob() {
initParentJobInternal(parentContext[Job])
}

/**
* This function is invoked once when a non-active coroutine (constructed with `active` set to `false)
* is [started][start].
*/
protected open fun onStart() {}

internal final override fun onStartInternal() {
onStart()
}

/**
* This function is invoked once when the job was completed normally with the specified [value],
* right before all the waiters for the coroutine's completion are notified.
Expand Down Expand Up @@ -127,34 +116,13 @@ public abstract class AbstractCoroutine<in T>(
/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun start(start: CoroutineStart, block: suspend () -> T) {
initParentJob()
start(block, this)
}

/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
}
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -96,7 +96,7 @@ public fun <T> CoroutineScope.async(
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
Expand Down Expand Up @@ -167,7 +167,6 @@ public suspend fun <T> withContext(
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
Expand All @@ -188,7 +187,7 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Expand Up @@ -80,7 +80,7 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab
private class CompletableDeferredImpl<T>(
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
init { initParentJob(parent) }
override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
Expand Down
11 changes: 11 additions & 0 deletions kotlinx-coroutines-core/common/src/Job.kt
Expand Up @@ -457,6 +457,14 @@ public interface ParentJob : Job {
@InternalCoroutinesApi
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
public interface ChildHandle : DisposableHandle {

/**
* Returns the parent of the current parent-child relationship.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public val parent: Job?

/**
* Child is cancelling its parent by invoking this method.
* This method is invoked by the child twice. The first time child report its root cause as soon as possible,
Expand Down Expand Up @@ -650,6 +658,9 @@ private fun Throwable?.orCancellation(job: Job): Throwable = this ?: JobCancella
*/
@InternalCoroutinesApi
public object NonDisposableHandle : DisposableHandle, ChildHandle {

override val parent: Job? get() = null

/**
* Does not do anything.
* @suppress
Expand Down
13 changes: 7 additions & 6 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -96,7 +96,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
~ waits for start
>> start / join / await invoked
## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
+ onStartInternal / onStart (lazy coroutine is started)
+ onStart (lazy coroutine is started)
~ active coroutine is working (or scheduled to execution)
>> childCancelled / cancelImpl invoked
## CANCELLING: state is Finishing, state.rootCause != null
Expand Down Expand Up @@ -139,7 +139,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* Initializes parent job.
* It shall be invoked at most once after construction after all other initialization.
*/
internal fun initParentJobInternal(parent: Job?) {
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
Expand Down Expand Up @@ -393,12 +393,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
onStartInternal()
onStart()
return TRUE
}
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
if (!_state.compareAndSet(state, state.list)) return RETRY
onStartInternal()
onStart()
return TRUE
}
else -> return FALSE // not a new state
Expand All @@ -409,7 +409,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* Override to provide the actual [start] action.
* This function is invoked exactly once when non-active coroutine is [started][start].
*/
internal open fun onStartInternal() {}
protected open fun onStart() {}

public final override fun getCancellationException(): CancellationException =
when (val state = this.state) {
Expand Down Expand Up @@ -1311,7 +1311,7 @@ private class Empty(override val isActive: Boolean) : Incomplete {
}

internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
init { initParentJobInternal(parent) }
init { initParentJob(parent) }
override val onCancelComplete get() = true
/*
* Check whether parent is able to handle exceptions as well.
Expand Down Expand Up @@ -1459,6 +1459,7 @@ private class InvokeOnCancelling(
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
Expand Down
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Expand Up @@ -127,7 +127,13 @@ private open class BroadcastCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active),
ProducerScope<E>, BroadcastChannel<E> by _channel {

init {
initParentJob(parentContext[Job])
}

override val isActive: Boolean get() = super.isActive

override val channel: SendChannel<E>
Expand Down
Expand Up @@ -11,8 +11,10 @@ import kotlin.coroutines.*
internal open class ChannelCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: Channel<E>,
initParentJob: Boolean,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel {

val channel: Channel<E> get() = this

override fun cancel() {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Produce.kt
Expand Up @@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce(

internal open class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
override val isActive: Boolean
get() = super.isActive

Expand Down
7 changes: 4 additions & 3 deletions kotlinx-coroutines-core/common/src/internal/Scopes.kt
Expand Up @@ -15,12 +15,13 @@ import kotlin.jvm.*
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {

final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame
final override fun getStackTraceElement(): StackTraceElement? = null
final override val isScopedCoroutine: Boolean get() = true

internal val parent: Job? get() = parentContext[Job]
final override val isScopedCoroutine: Boolean get() = true
internal val parent: Job? get() = parentHandle?.parent

override fun afterCompletion(state: Any?) {
// Resume in a cancellable way by default when resuming from another context
Expand Down

0 comments on commit 40fb826

Please sign in to comment.