Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify internal machinery #2512

Merged
merged 4 commits into from Mar 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -299,7 +299,7 @@ private class ToContinuation<T>(
*/
private class ListenableFutureCoroutine<T>(
context: CoroutineContext
) : AbstractCoroutine<T>(context) {
) : AbstractCoroutine<T>(context, initParentJob = true, active = true) {

// JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture.
@JvmField
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.guava
Expand Down Expand Up @@ -747,4 +747,12 @@ class ListenableFutureTest : TestBase() {
latch.countDown()
return future
}

@Test
fun testCancelledParent() = runTest({ it is CancellationException }) {
cancel()
future { expectUnreached() }
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}
}
2 changes: 1 addition & 1 deletion integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Expand Up @@ -48,7 +48,7 @@ public fun <T> CoroutineScope.future(
private class CompletableFutureCoroutine<T>(
context: CoroutineContext,
private val future: CompletableFuture<T>
) : AbstractCoroutine<T>(context), BiConsumer<T?, Throwable?> {
) : AbstractCoroutine<T>(context, initParentJob = true, active = true), BiConsumer<T?, Throwable?> {
override fun accept(value: T?, exception: Throwable?) {
cancel()
}
Expand Down
10 changes: 9 additions & 1 deletion integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.future
Expand Down Expand Up @@ -567,4 +567,12 @@ class FutureTest : TestBase() {
assertFailsWith<CancellationException> { stage.await() }
finish(4)
}

@Test
fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) {
cancel()
future { expectUnreached() }
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -1,7 +1,5 @@
public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/JobSupport, kotlin/coroutines/Continuation, kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/Job {
protected final field parentContext Lkotlin/coroutines/CoroutineContext;
public fun <init> (Lkotlin/coroutines/CoroutineContext;Z)V
public synthetic fun <init> (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V
public fun <init> (Lkotlin/coroutines/CoroutineContext;ZZ)V
protected fun afterResume (Ljava/lang/Object;)V
protected fun cancellationExceptionMessage ()Ljava/lang/String;
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
Expand All @@ -10,10 +8,8 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/
protected fun onCancelled (Ljava/lang/Throwable;Z)V
protected fun onCompleted (Ljava/lang/Object;)V
protected final fun onCompletionInternal (Ljava/lang/Object;)V
protected fun onStart ()V
public final fun resumeWith (Ljava/lang/Object;)V
public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V
public final fun start (Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;)V
}

public final class kotlinx/coroutines/AwaitKt {
Expand Down Expand Up @@ -89,6 +85,7 @@ public final class kotlinx/coroutines/CancellableContinuationKt {

public abstract interface class kotlinx/coroutines/ChildHandle : kotlinx/coroutines/DisposableHandle {
public abstract fun childCancelled (Ljava/lang/Throwable;)Z
public abstract fun getParent ()Lkotlinx/coroutines/Job;
}

public abstract interface class kotlinx/coroutines/ChildJob : kotlinx/coroutines/Job {
Expand Down Expand Up @@ -420,6 +417,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key;
public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0;
protected fun handleJobException (Ljava/lang/Throwable;)Z
protected final fun initParentJob (Lkotlinx/coroutines/Job;)V
public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle;
public fun isActive ()Z
Expand All @@ -431,6 +429,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin
public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext;
protected fun onCancelling (Ljava/lang/Throwable;)V
protected fun onCompletionInternal (Ljava/lang/Object;)V
protected fun onStart ()V
public final fun parentCancelled (Lkotlinx/coroutines/ParentJob;)V
public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext;
public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
Expand Down Expand Up @@ -473,6 +472,7 @@ public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/C
public static final field INSTANCE Lkotlinx/coroutines/NonDisposableHandle;
public fun childCancelled (Ljava/lang/Throwable;)Z
public fun dispose ()V
public fun getParent ()Lkotlinx/coroutines/Job;
public fun toString ()Ljava/lang/String;
}

Expand Down
68 changes: 18 additions & 50 deletions kotlinx-coroutines-core/common/src/AbstractCoroutine.kt
Expand Up @@ -8,7 +8,6 @@ package kotlinx.coroutines
import kotlinx.coroutines.CoroutineStart.*
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.jvm.*

/**
* Abstract base class for implementation of coroutines in coroutine builders.
Expand All @@ -26,20 +25,32 @@ import kotlin.jvm.*
* * [onCancelled] in invoked when the coroutine completes with an exception (cancelled).
*
* @param parentContext the context of the parent coroutine.
* @param initParentJob specifies whether the parent-child relationship should be instantiated directly
* in `AbstractCoroutine` constructor. If set to `false`, it's the responsibility of the child class
* to invoke [initParentJob] manually.
* @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state.
* See [Job] for details.
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
parentContext: CoroutineContext,
initParentJob: Boolean,
active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {

init {
/*
* Setup parent-child relationship between the parent in the context and the current coroutine.
* It may cause this coroutine to become _cancelling_ if the parent is already cancelled.
* It is dangerous to install parent-child relationship here if the coroutine class
* operates its state from within onCancelled or onCancelling
* (with exceptions for rx integrations that can't have any parent)
*/
if (initParentJob) initParentJob(parentContext[Job])
}

/**
* The context of this coroutine that includes this coroutine as a [Job].
*/
Expand All @@ -53,28 +64,6 @@ public abstract class AbstractCoroutine<in T>(

override val isActive: Boolean get() = super.isActive

/**
* Initializes the parent job from the `parentContext` of this coroutine that was passed to it during construction.
* It shall be invoked at most once after construction after all other initialization.
*
* Invocation of this function may cause this coroutine to become cancelled if the parent is already cancelled,
* in which case it synchronously invokes all the corresponding handlers.
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun initParentJob() {
initParentJobInternal(parentContext[Job])
}

/**
* This function is invoked once when a non-active coroutine (constructed with `active` set to `false)
* is [started][start].
*/
protected open fun onStart() {}

internal final override fun onStartInternal() {
onStart()
}

/**
* This function is invoked once when the job was completed normally with the specified [value],
* right before all the waiters for the coroutine's completion are notified.
Expand Down Expand Up @@ -127,34 +116,13 @@ public abstract class AbstractCoroutine<in T>(
/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun start(start: CoroutineStart, block: suspend () -> T) {
initParentJob()
start(block, this)
}

/**
* Starts this coroutine with the given code [block] and [start] strategy.
* This function shall be invoked at most once on this coroutine.
*
* First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it
* during construction. Second, it starts the coroutine based on [start] parameter:
*
* * [DEFAULT] uses [startCoroutineCancellable].
* * [ATOMIC] uses [startCoroutine].
* * [UNDISPATCHED] uses [startCoroutineUndispatched].
* * [LAZY] does nothing.
*/
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
}
5 changes: 2 additions & 3 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Expand Up @@ -96,7 +96,7 @@ public fun <T> CoroutineScope.async(
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
) : AbstractCoroutine<T>(parentContext, true, active = active), Deferred<T>, SelectClause1<T> {
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
override val onAwait: SelectClause1<T> get() = this
Expand Down Expand Up @@ -167,7 +167,6 @@ public suspend fun <T> withContext(
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
Expand All @@ -188,7 +187,7 @@ public suspend inline operator fun <T> CoroutineDispatcher.invoke(
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
) : AbstractCoroutine<Unit>(parentContext, initParentJob = true, active = active) {
override fun handleJobException(exception: Throwable): Boolean {
handleCoroutineException(context, exception)
return true
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/CompletableDeferred.kt
Expand Up @@ -80,7 +80,7 @@ public fun <T> CompletableDeferred(value: T): CompletableDeferred<T> = Completab
private class CompletableDeferredImpl<T>(
parent: Job?
) : JobSupport(true), CompletableDeferred<T>, SelectClause1<T> {
init { initParentJobInternal(parent) }
init { initParentJob(parent) }
override val onCancelComplete get() = true
override fun getCompleted(): T = getCompletedInternal() as T
override suspend fun await(): T = awaitInternal() as T
Expand Down
11 changes: 11 additions & 0 deletions kotlinx-coroutines-core/common/src/Job.kt
Expand Up @@ -466,6 +466,14 @@ public interface ParentJob : Job {
@InternalCoroutinesApi
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
public interface ChildHandle : DisposableHandle {

/**
* Returns the parent of the current parent-child relationship.
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public val parent: Job?

/**
* Child is cancelling its parent by invoking this method.
* This method is invoked by the child twice. The first time child report its root cause as soon as possible,
Expand Down Expand Up @@ -659,6 +667,9 @@ private fun Throwable?.orCancellation(job: Job): Throwable = this ?: JobCancella
*/
@InternalCoroutinesApi
public object NonDisposableHandle : DisposableHandle, ChildHandle {

override val parent: Job? get() = null

/**
* Does not do anything.
* @suppress
Expand Down
13 changes: 7 additions & 6 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -96,7 +96,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
~ waits for start
>> start / join / await invoked
## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
+ onStartInternal / onStart (lazy coroutine is started)
+ onStart (lazy coroutine is started)
~ active coroutine is working (or scheduled to execution)
>> childCancelled / cancelImpl invoked
## CANCELLING: state is Finishing, state.rootCause != null
Expand Down Expand Up @@ -139,7 +139,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* Initializes parent job.
* It shall be invoked at most once after construction after all other initialization.
*/
internal fun initParentJobInternal(parent: Job?) {
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
Expand Down Expand Up @@ -393,12 +393,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
onStartInternal()
onStart()
return TRUE
}
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
if (!_state.compareAndSet(state, state.list)) return RETRY
onStartInternal()
onStart()
return TRUE
}
else -> return FALSE // not a new state
Expand All @@ -409,7 +409,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren
* Override to provide the actual [start] action.
* This function is invoked exactly once when non-active coroutine is [started][start].
*/
internal open fun onStartInternal() {}
protected open fun onStart() {}

public final override fun getCancellationException(): CancellationException =
when (val state = this.state) {
Expand Down Expand Up @@ -1311,7 +1311,7 @@ private class Empty(override val isActive: Boolean) : Incomplete {
}

internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
init { initParentJobInternal(parent) }
init { initParentJob(parent) }
override val onCancelComplete get() = true
/*
* Check whether parent is able to handle exceptions as well.
Expand Down Expand Up @@ -1459,6 +1459,7 @@ private class InvokeOnCancelling(
internal class ChildHandleNode(
@JvmField val childJob: ChildJob
) : JobCancellingNode(), ChildHandle {
override val parent: Job get() = job
override fun invoke(cause: Throwable?) = childJob.parentCancelled(job)
override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause)
}
Expand Down
8 changes: 7 additions & 1 deletion kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Expand Up @@ -127,7 +127,13 @@ private open class BroadcastCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: BroadcastChannel<E>,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), ProducerScope<E>, BroadcastChannel<E> by _channel {
) : AbstractCoroutine<Unit>(parentContext, initParentJob = false, active = active),
ProducerScope<E>, BroadcastChannel<E> by _channel {

init {
initParentJob(parentContext[Job])
}

override val isActive: Boolean get() = super.isActive

override val channel: SendChannel<E>
Expand Down
Expand Up @@ -11,8 +11,10 @@ import kotlin.coroutines.*
internal open class ChannelCoroutine<E>(
parentContext: CoroutineContext,
protected val _channel: Channel<E>,
initParentJob: Boolean,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active), Channel<E> by _channel {
) : AbstractCoroutine<Unit>(parentContext, initParentJob, active), Channel<E> by _channel {

val channel: Channel<E> get() = this

override fun cancel() {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/channels/Produce.kt
Expand Up @@ -139,7 +139,7 @@ internal fun <E> CoroutineScope.produce(

internal open class ProducerCoroutine<E>(
parentContext: CoroutineContext, channel: Channel<E>
) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
) : ChannelCoroutine<E>(parentContext, channel, true, active = true), ProducerScope<E> {
override val isActive: Boolean
get() = super.isActive

Expand Down
7 changes: 4 additions & 3 deletions kotlinx-coroutines-core/common/src/internal/Scopes.kt
Expand Up @@ -15,12 +15,13 @@ import kotlin.jvm.*
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // unintercepted continuation
) : AbstractCoroutine<T>(context, true), CoroutineStackFrame {
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame {

final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame
final override fun getStackTraceElement(): StackTraceElement? = null
final override val isScopedCoroutine: Boolean get() = true

internal val parent: Job? get() = parentContext[Job]
final override val isScopedCoroutine: Boolean get() = true
internal val parent: Job? get() = parentHandle?.parent

override fun afterCompletion(state: Any?) {
// Resume in a cancellable way by default when resuming from another context
Expand Down