From 405491095c000875fe42c1cdcc7faa39284e0029 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 1 Feb 2021 18:47:38 +0300 Subject: [PATCH 1/4] Merge onStartInternal and onStart to reduce the number of methods and make code a bit simpler --- .../api/kotlinx-coroutines-core.api | 2 +- .../common/src/AbstractCoroutine.kt | 10 ---------- kotlinx-coroutines-core/common/src/JobSupport.kt | 8 ++++---- 3 files changed, 5 insertions(+), 15 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 1d16d31086..4d5948ccdd 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -10,7 +10,6 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/ protected fun onCancelled (Ljava/lang/Throwable;Z)V protected fun onCompleted (Ljava/lang/Object;)V protected final fun onCompletionInternal (Ljava/lang/Object;)V - protected fun onStart ()V public final fun resumeWith (Ljava/lang/Object;)V public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V public final fun start (Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;)V @@ -431,6 +430,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public fun minusKey (Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; protected fun onCancelling (Ljava/lang/Throwable;)V protected fun onCompletionInternal (Ljava/lang/Object;)V + protected fun onStart ()V public final fun parentCancelled (Lkotlinx/coroutines/ParentJob;)V public fun plus (Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext; public fun plus (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job; diff --git a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt index af392b63f5..f16ed056bd 100644 --- a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt @@ -65,16 +65,6 @@ public abstract class AbstractCoroutine( initParentJobInternal(parentContext[Job]) } - /** - * This function is invoked once when a non-active coroutine (constructed with `active` set to `false) - * is [started][start]. - */ - protected open fun onStart() {} - - internal final override fun onStartInternal() { - onStart() - } - /** * This function is invoked once when the job was completed normally with the specified [value], * right before all the waiters for the coroutine's completion are notified. diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 5b516ae27f..fca3ee5d84 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -96,7 +96,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren ~ waits for start >> start / join / await invoked ## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList - + onStartInternal / onStart (lazy coroutine is started) + + onStart (lazy coroutine is started) ~ active coroutine is working (or scheduled to execution) >> childCancelled / cancelImpl invoked ## CANCELLING: state is Finishing, state.rootCause != null @@ -393,12 +393,12 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren is Empty -> { // EMPTY_X state -- no completion handlers if (state.isActive) return FALSE // already active if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY - onStartInternal() + onStart() return TRUE } is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers if (!_state.compareAndSet(state, state.list)) return RETRY - onStartInternal() + onStart() return TRUE } else -> return FALSE // not a new state @@ -409,7 +409,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * Override to provide the actual [start] action. * This function is invoked exactly once when non-active coroutine is [started][start]. */ - internal open fun onStartInternal() {} + protected open fun onStart() {} public final override fun getCancellationException(): CancellationException = when (val state = this.state) { From d7a56eb0c3c4bbfa724dad5be61bc2608d537071 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 1 Feb 2021 19:33:52 +0300 Subject: [PATCH 2/4] Rework initParentJob * Always establish parent-child relationship when creating a subclass of AbstractCoroutine. That's our own internal class that we have full control of and it never has a chance to leak to the user-code (so cancellation handlers will be installed etc.) * As a consequence, get rid of parentContext in all our coroutine classes that are not ScopeCoroutine * Remove some dead code The whole late-binding of initParentJob seems the be the legacy of days when AbstractCoroutine was public and CancellableContinuation implemented a Job --- .../api/kotlinx-coroutines-core.api | 2 - .../common/src/AbstractCoroutine.kt | 45 ++++--------------- .../common/src/Builders.common.kt | 1 - .../common/src/internal/Scopes.kt | 5 ++- .../common/src/intrinsics/Undispatched.kt | 8 ++-- .../jvm/test/exceptions/SuppressionTests.kt | 6 +-- 6 files changed, 18 insertions(+), 49 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 4d5948ccdd..5632634c37 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1,5 +1,4 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/JobSupport, kotlin/coroutines/Continuation, kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/Job { - protected final field parentContext Lkotlin/coroutines/CoroutineContext; public fun (Lkotlin/coroutines/CoroutineContext;Z)V public synthetic fun (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V protected fun afterResume (Ljava/lang/Object;)V @@ -12,7 +11,6 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/ protected final fun onCompletionInternal (Ljava/lang/Object;)V public final fun resumeWith (Ljava/lang/Object;)V public final fun start (Lkotlinx/coroutines/CoroutineStart;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)V - public final fun start (Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function1;)V } public final class kotlinx/coroutines/AwaitKt { diff --git a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt index f16ed056bd..f1fdfacde6 100644 --- a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt @@ -8,7 +8,6 @@ package kotlinx.coroutines import kotlinx.coroutines.CoroutineStart.* import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* -import kotlin.jvm.* /** * Abstract base class for implementation of coroutines in coroutine builders. @@ -36,8 +35,7 @@ public abstract class AbstractCoroutine( /** * The context of the parent coroutine. */ - @JvmField - protected val parentContext: CoroutineContext, + parentContext: CoroutineContext, active: Boolean = true ) : JobSupport(active), Job, Continuation, CoroutineScope { /** @@ -46,6 +44,14 @@ public abstract class AbstractCoroutine( @Suppress("LeakingThis") public final override val context: CoroutineContext = parentContext + this + init { + /* + * Setup parent-child relationship between the parent in the context + * and the current coroutine. + * It may cause this coroutine to become cancelled if the parent is already cancelled + */ + initParentJobInternal(parentContext[Job]) + } /** * The context of this scope which is the same as the [context] of this coroutine. */ @@ -53,18 +59,6 @@ public abstract class AbstractCoroutine( override val isActive: Boolean get() = super.isActive - /** - * Initializes the parent job from the `parentContext` of this coroutine that was passed to it during construction. - * It shall be invoked at most once after construction after all other initialization. - * - * Invocation of this function may cause this coroutine to become cancelled if the parent is already cancelled, - * in which case it synchronously invokes all the corresponding handlers. - * @suppress **This is unstable API and it is subject to change.** - */ - internal fun initParentJob() { - initParentJobInternal(parentContext[Job]) - } - /** * This function is invoked once when the job was completed normally with the specified [value], * right before all the waiters for the coroutine's completion are notified. @@ -117,26 +111,6 @@ public abstract class AbstractCoroutine( /** * Starts this coroutine with the given code [block] and [start] strategy. * This function shall be invoked at most once on this coroutine. - * - * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it - * during construction. Second, it starts the coroutine based on [start] parameter: - * - * * [DEFAULT] uses [startCoroutineCancellable]. - * * [ATOMIC] uses [startCoroutine]. - * * [UNDISPATCHED] uses [startCoroutineUndispatched]. - * * [LAZY] does nothing. - */ - public fun start(start: CoroutineStart, block: suspend () -> T) { - initParentJob() - start(block, this) - } - - /** - * Starts this coroutine with the given code [block] and [start] strategy. - * This function shall be invoked at most once on this coroutine. - * - * First, this function initializes parent job from the `parentContext` of this coroutine that was passed to it - * during construction. Second, it starts the coroutine based on [start] parameter: * * * [DEFAULT] uses [startCoroutineCancellable]. * * [ATOMIC] uses [startCoroutine]. @@ -144,7 +118,6 @@ public abstract class AbstractCoroutine( * * [LAZY] does nothing. */ public fun start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) { - initParentJob() start(block, receiver, this) } } diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index 93b3ee4849..8351dc3f5a 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -167,7 +167,6 @@ public suspend fun withContext( } // SLOW PATH -- use new dispatcher val coroutine = DispatchedCoroutine(newContext, uCont) - coroutine.initParentJob() block.startCoroutineCancellable(coroutine, coroutine) coroutine.getResult() } diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 98db37de8d..9a31c4230c 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -16,11 +16,12 @@ internal open class ScopeCoroutine( context: CoroutineContext, @JvmField val uCont: Continuation // unintercepted continuation ) : AbstractCoroutine(context, true), CoroutineStackFrame { + final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame final override fun getStackTraceElement(): StackTraceElement? = null - final override val isScopedCoroutine: Boolean get() = true - internal val parent: Job? get() = parentContext[Job] + final override val isScopedCoroutine: Boolean get() = true + internal val parent: Job? = context[Job] override fun afterCompletion(state: Any?) { // Resume in a cancellable way by default when resuming from another context diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt index 1273634efa..38e870ef9c 100644 --- a/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt +++ b/kotlinx-coroutines-core/common/src/intrinsics/Undispatched.kt @@ -82,11 +82,9 @@ private inline fun startDirect(completion: Continuation, block: (Continua * This function shall be invoked at most once on this coroutine. * This function checks cancellation of the outer [Job] on fast-path. * - * First, this function initializes the parent job from the `parentContext` of this coroutine that was passed to it - * during construction. Second, it starts the coroutine using [startCoroutineUninterceptedOrReturn]. + * It starts the coroutine using [startCoroutineUninterceptedOrReturn]. */ internal fun ScopeCoroutine.startUndispatchedOrReturn(receiver: R, block: suspend R.() -> T): Any? { - initParentJob() return undispatchedResult({ true }) { block.startCoroutineUninterceptedOrReturn(receiver, this) } @@ -96,8 +94,8 @@ internal fun ScopeCoroutine.startUndispatchedOrReturn(receiver: R, blo * Same as [startUndispatchedOrReturn], but ignores [TimeoutCancellationException] on fast-path. */ internal fun ScopeCoroutine.startUndispatchedOrReturnIgnoreTimeout( - receiver: R, block: suspend R.() -> T): Any? { - initParentJob() + receiver: R, block: suspend R.() -> T +): Any? { return undispatchedResult({ e -> !(e is TimeoutCancellationException && e.coroutine === this) }) { block.startCoroutineUninterceptedOrReturn(receiver, this) } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt index 6034fccbca..c7ada2d5b2 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.exceptions @@ -15,7 +15,7 @@ class SuppressionTests : TestBase() { @Test fun testNotificationsWithException() = runTest { expect(1) - val coroutineContext = kotlin.coroutines.coroutineContext // workaround for KT-22984 + val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984 val coroutine = object : AbstractCoroutine(coroutineContext, false) { override fun onStart() { expect(3) @@ -82,4 +82,4 @@ class SuppressionTests : TestBase() { assertTrue(e.cause!!.suppressed.isEmpty()) } } -} \ No newline at end of file +} From 45e065202a6f4eeef0efc36a005b90919807e2b3 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 1 Feb 2021 19:50:45 +0300 Subject: [PATCH 3/4] Get rid of an additional parent field from ScopeCoroutine * Leverage already presented information in our implementation, just expose it via already present internal interface --- .../api/kotlinx-coroutines-core.api | 2 ++ kotlinx-coroutines-core/common/src/Job.kt | 11 +++++++++++ kotlinx-coroutines-core/common/src/JobSupport.kt | 1 + kotlinx-coroutines-core/common/src/internal/Scopes.kt | 2 +- 4 files changed, 15 insertions(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 5632634c37..f6010c3d88 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -86,6 +86,7 @@ public final class kotlinx/coroutines/CancellableContinuationKt { public abstract interface class kotlinx/coroutines/ChildHandle : kotlinx/coroutines/DisposableHandle { public abstract fun childCancelled (Ljava/lang/Throwable;)Z + public abstract fun getParent ()Lkotlinx/coroutines/Job; } public abstract interface class kotlinx/coroutines/ChildJob : kotlinx/coroutines/Job { @@ -471,6 +472,7 @@ public final class kotlinx/coroutines/NonDisposableHandle : kotlinx/coroutines/C public static final field INSTANCE Lkotlinx/coroutines/NonDisposableHandle; public fun childCancelled (Ljava/lang/Throwable;)Z public fun dispose ()V + public fun getParent ()Lkotlinx/coroutines/Job; public fun toString ()Ljava/lang/String; } diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index c80463f61c..f203d8a685 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -466,6 +466,14 @@ public interface ParentJob : Job { @InternalCoroutinesApi @Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases") public interface ChildHandle : DisposableHandle { + + /** + * Returns the parent of the current parent-child relationship. + * @suppress **This is unstable API and it is subject to change.** + */ + @InternalCoroutinesApi + public val parent: Job? + /** * Child is cancelling its parent by invoking this method. * This method is invoked by the child twice. The first time child report its root cause as soon as possible, @@ -659,6 +667,9 @@ private fun Throwable?.orCancellation(job: Job): Throwable = this ?: JobCancella */ @InternalCoroutinesApi public object NonDisposableHandle : DisposableHandle, ChildHandle { + + override val parent: Job? get() = null + /** * Does not do anything. * @suppress diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index fca3ee5d84..310610d90f 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -1459,6 +1459,7 @@ private class InvokeOnCancelling( internal class ChildHandleNode( @JvmField val childJob: ChildJob ) : JobCancellingNode(), ChildHandle { + override val parent: Job get() = job override fun invoke(cause: Throwable?) = childJob.parentCancelled(job) override fun childCancelled(cause: Throwable): Boolean = job.childCancelled(cause) } diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 9a31c4230c..0a9d24f15f 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -21,7 +21,7 @@ internal open class ScopeCoroutine( final override fun getStackTraceElement(): StackTraceElement? = null final override val isScopedCoroutine: Boolean get() = true - internal val parent: Job? = context[Job] + internal val parent: Job? get() = parentHandle?.parent override fun afterCompletion(state: Any?) { // Resume in a cancellable way by default when resuming from another context From 523638fd64ac4f0616f778b0f7e8d850a031d077 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 22 Mar 2021 18:00:34 +0300 Subject: [PATCH 4/4] Force implementors of AbstractCoroutine deliberately choose whether parent-child relationship should be established * Test all coroutine builders * Fix FlowSubscription --- .../src/ListenableFuture.kt | 2 +- .../test/ListenableFutureTest.kt | 10 ++- .../src/future/Future.kt | 2 +- .../test/future/FutureTest.kt | 10 ++- .../api/kotlinx-coroutines-core.api | 4 +- .../common/src/AbstractCoroutine.kt | 29 ++++--- .../common/src/Builders.common.kt | 4 +- .../common/src/CompletableDeferred.kt | 2 +- .../common/src/JobSupport.kt | 4 +- .../common/src/channels/Broadcast.kt | 8 +- .../common/src/channels/ChannelCoroutine.kt | 4 +- .../common/src/channels/Produce.kt | 2 +- .../common/src/internal/Scopes.kt | 2 +- .../common/test/AbstractCoroutineTest.kt | 8 +- .../common/test/CancelledParentAttachTest.kt | 85 +++++++++++++++++++ kotlinx-coroutines-core/jvm/src/Builders.kt | 3 +- .../jvm/src/channels/Actor.kt | 6 +- .../{JoinStrTest.kt => JoinStressTest.kt} | 2 +- .../jvm/test/RunBlockingTest.kt | 13 ++- .../jvm/test/channels/ActorLazyTest.kt | 14 ++- .../jvm/test/channels/ActorTest.kt | 17 +++- .../jvm/test/exceptions/SuppressionTests.kt | 2 +- .../native/src/Builders.kt | 2 +- .../src/Publish.kt | 2 +- .../src/ReactiveFlow.kt | 6 +- .../test/CancelledParentAttachTest.kt | 20 +++++ .../kotlinx-coroutines-reactor/src/Mono.kt | 2 +- .../src/RxCompletable.kt | 2 +- .../kotlinx-coroutines-rx2/src/RxMaybe.kt | 2 +- .../src/RxObservable.kt | 4 +- .../kotlinx-coroutines-rx2/src/RxSingle.kt | 2 +- .../src/RxCompletable.kt | 2 +- .../kotlinx-coroutines-rx3/src/RxMaybe.kt | 2 +- .../src/RxObservable.kt | 2 +- .../kotlinx-coroutines-rx3/src/RxSingle.kt | 2 +- 35 files changed, 230 insertions(+), 53 deletions(-) create mode 100644 kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt rename kotlinx-coroutines-core/jvm/test/{JoinStrTest.kt => JoinStressTest.kt} (97%) create mode 100644 reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 53019c4bbe..35e0aeb379 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -299,7 +299,7 @@ private class ToContinuation( */ private class ListenableFutureCoroutine( context: CoroutineContext -) : AbstractCoroutine(context) { +) : AbstractCoroutine(context, initParentJob = true, active = true) { // JobListenableFuture propagates external cancellation to `this` coroutine. See JobListenableFuture. @JvmField diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index 9dca9e9b46..c463174a8d 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.guava @@ -747,4 +747,12 @@ class ListenableFutureTest : TestBase() { latch.countDown() return future } + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + future { expectUnreached() } + future(start = CoroutineStart.ATOMIC) { } + future(start = CoroutineStart.UNDISPATCHED) { } + } } diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index b3b45e9dbc..7e9c349c66 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -48,7 +48,7 @@ public fun CoroutineScope.future( private class CompletableFutureCoroutine( context: CoroutineContext, private val future: CompletableFuture -) : AbstractCoroutine(context), BiConsumer { +) : AbstractCoroutine(context, initParentJob = true, active = true), BiConsumer { override fun accept(value: T?, exception: Throwable?) { cancel() } diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index 998aaa0835..08e5cdad93 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.future @@ -567,4 +567,12 @@ class FutureTest : TestBase() { assertFailsWith { stage.await() } finish(4) } + + @Test + fun testCancelledParent() = runTest({ it is java.util.concurrent.CancellationException }) { + cancel() + future { expectUnreached() } + future(start = CoroutineStart.ATOMIC) { } + future(start = CoroutineStart.UNDISPATCHED) { } + } } diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index f6010c3d88..8a35d18687 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -1,6 +1,5 @@ public abstract class kotlinx/coroutines/AbstractCoroutine : kotlinx/coroutines/JobSupport, kotlin/coroutines/Continuation, kotlinx/coroutines/CoroutineScope, kotlinx/coroutines/Job { - public fun (Lkotlin/coroutines/CoroutineContext;Z)V - public synthetic fun (Lkotlin/coroutines/CoroutineContext;ZILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lkotlin/coroutines/CoroutineContext;ZZ)V protected fun afterResume (Ljava/lang/Object;)V protected fun cancellationExceptionMessage ()Ljava/lang/String; public final fun getContext ()Lkotlin/coroutines/CoroutineContext; @@ -418,6 +417,7 @@ public class kotlinx/coroutines/JobSupport : kotlinx/coroutines/ChildJob, kotlin public final fun getKey ()Lkotlin/coroutines/CoroutineContext$Key; public final fun getOnJoin ()Lkotlinx/coroutines/selects/SelectClause0; protected fun handleJobException (Ljava/lang/Throwable;)Z + protected final fun initParentJob (Lkotlinx/coroutines/Job;)V public final fun invokeOnCompletion (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle; public final fun invokeOnCompletion (ZZLkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/DisposableHandle; public fun isActive ()Z diff --git a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt index f1fdfacde6..439a9ac7a5 100644 --- a/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt @@ -25,6 +25,9 @@ import kotlin.coroutines.* * * [onCancelled] in invoked when the coroutine completes with an exception (cancelled). * * @param parentContext the context of the parent coroutine. + * @param initParentJob specifies whether the parent-child relationship should be instantiated directly + * in `AbstractCoroutine` constructor. If set to `false`, it's the responsibility of the child class + * to invoke [initParentJob] manually. * @param active when `true` (by default), the coroutine is created in the _active_ state, otherwise it is created in the _new_ state. * See [Job] for details. * @@ -32,26 +35,28 @@ import kotlin.coroutines.* */ @InternalCoroutinesApi public abstract class AbstractCoroutine( - /** - * The context of the parent coroutine. - */ parentContext: CoroutineContext, - active: Boolean = true + initParentJob: Boolean, + active: Boolean ) : JobSupport(active), Job, Continuation, CoroutineScope { + + init { + /* + * Setup parent-child relationship between the parent in the context and the current coroutine. + * It may cause this coroutine to become _cancelling_ if the parent is already cancelled. + * It is dangerous to install parent-child relationship here if the coroutine class + * operates its state from within onCancelled or onCancelling + * (with exceptions for rx integrations that can't have any parent) + */ + if (initParentJob) initParentJob(parentContext[Job]) + } + /** * The context of this coroutine that includes this coroutine as a [Job]. */ @Suppress("LeakingThis") public final override val context: CoroutineContext = parentContext + this - init { - /* - * Setup parent-child relationship between the parent in the context - * and the current coroutine. - * It may cause this coroutine to become cancelled if the parent is already cancelled - */ - initParentJobInternal(parentContext[Job]) - } /** * The context of this scope which is the same as the [context] of this coroutine. */ diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt index 8351dc3f5a..09ae9b685d 100644 --- a/kotlinx-coroutines-core/common/src/Builders.common.kt +++ b/kotlinx-coroutines-core/common/src/Builders.common.kt @@ -96,7 +96,7 @@ public fun CoroutineScope.async( private open class DeferredCoroutine( parentContext: CoroutineContext, active: Boolean -) : AbstractCoroutine(parentContext, active), Deferred, SelectClause1 { +) : AbstractCoroutine(parentContext, true, active = active), Deferred, SelectClause1 { override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T override val onAwait: SelectClause1 get() = this @@ -187,7 +187,7 @@ public suspend inline operator fun CoroutineDispatcher.invoke( private open class StandaloneCoroutine( parentContext: CoroutineContext, active: Boolean -) : AbstractCoroutine(parentContext, active) { +) : AbstractCoroutine(parentContext, initParentJob = true, active = active) { override fun handleJobException(exception: Throwable): Boolean { handleCoroutineException(context, exception) return true diff --git a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt index c80737968e..5e76593df2 100644 --- a/kotlinx-coroutines-core/common/src/CompletableDeferred.kt +++ b/kotlinx-coroutines-core/common/src/CompletableDeferred.kt @@ -80,7 +80,7 @@ public fun CompletableDeferred(value: T): CompletableDeferred = Completab private class CompletableDeferredImpl( parent: Job? ) : JobSupport(true), CompletableDeferred, SelectClause1 { - init { initParentJobInternal(parent) } + init { initParentJob(parent) } override val onCancelComplete get() = true override fun getCompleted(): T = getCompletedInternal() as T override suspend fun await(): T = awaitInternal() as T diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 310610d90f..a7dbbf8b31 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -139,7 +139,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * Initializes parent job. * It shall be invoked at most once after construction after all other initialization. */ - internal fun initParentJobInternal(parent: Job?) { + protected fun initParentJob(parent: Job?) { assert { parentHandle == null } if (parent == null) { parentHandle = NonDisposableHandle @@ -1311,7 +1311,7 @@ private class Empty(override val isActive: Boolean) : Incomplete { } internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob { - init { initParentJobInternal(parent) } + init { initParentJob(parent) } override val onCancelComplete get() = true /* * Check whether parent is able to handle exceptions as well. diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index 07e7597627..3ed4bc7fff 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -127,7 +127,13 @@ private open class BroadcastCoroutine( parentContext: CoroutineContext, protected val _channel: BroadcastChannel, active: Boolean -) : AbstractCoroutine(parentContext, active), ProducerScope, BroadcastChannel by _channel { +) : AbstractCoroutine(parentContext, initParentJob = false, active = active), + ProducerScope, BroadcastChannel by _channel { + + init { + initParentJob(parentContext[Job]) + } + override val isActive: Boolean get() = super.isActive override val channel: SendChannel diff --git a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt index b2b257def2..57b2797de6 100644 --- a/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt +++ b/kotlinx-coroutines-core/common/src/channels/ChannelCoroutine.kt @@ -11,8 +11,10 @@ import kotlin.coroutines.* internal open class ChannelCoroutine( parentContext: CoroutineContext, protected val _channel: Channel, + initParentJob: Boolean, active: Boolean -) : AbstractCoroutine(parentContext, active), Channel by _channel { +) : AbstractCoroutine(parentContext, initParentJob, active), Channel by _channel { + val channel: Channel get() = this override fun cancel() { diff --git a/kotlinx-coroutines-core/common/src/channels/Produce.kt b/kotlinx-coroutines-core/common/src/channels/Produce.kt index 3c183587f3..3342fb6ec9 100644 --- a/kotlinx-coroutines-core/common/src/channels/Produce.kt +++ b/kotlinx-coroutines-core/common/src/channels/Produce.kt @@ -139,7 +139,7 @@ internal fun CoroutineScope.produce( internal open class ProducerCoroutine( parentContext: CoroutineContext, channel: Channel -) : ChannelCoroutine(parentContext, channel, active = true), ProducerScope { +) : ChannelCoroutine(parentContext, channel, true, active = true), ProducerScope { override val isActive: Boolean get() = super.isActive diff --git a/kotlinx-coroutines-core/common/src/internal/Scopes.kt b/kotlinx-coroutines-core/common/src/internal/Scopes.kt index 0a9d24f15f..ad8d86ed5a 100644 --- a/kotlinx-coroutines-core/common/src/internal/Scopes.kt +++ b/kotlinx-coroutines-core/common/src/internal/Scopes.kt @@ -15,7 +15,7 @@ import kotlin.jvm.* internal open class ScopeCoroutine( context: CoroutineContext, @JvmField val uCont: Continuation // unintercepted continuation -) : AbstractCoroutine(context, true), CoroutineStackFrame { +) : AbstractCoroutine(context, true, true), CoroutineStackFrame { final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame final override fun getStackTraceElement(): StackTraceElement? = null diff --git a/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt b/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt index ce20837e25..ebe88ce156 100644 --- a/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt +++ b/kotlinx-coroutines-core/common/test/AbstractCoroutineTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -13,7 +13,7 @@ class AbstractCoroutineTest : TestBase() { fun testNotifications() = runTest { expect(1) val coroutineContext = coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine(coroutineContext, false) { + val coroutine = object : AbstractCoroutine(coroutineContext, true, false) { override fun onStart() { expect(3) } @@ -53,7 +53,7 @@ class AbstractCoroutineTest : TestBase() { fun testNotificationsWithException() = runTest { expect(1) val coroutineContext = coroutineContext // workaround for KT-22984 - val coroutine = object : AbstractCoroutine(coroutineContext + NonCancellable, false) { + val coroutine = object : AbstractCoroutine(coroutineContext + NonCancellable, true, false) { override fun onStart() { expect(3) } @@ -91,4 +91,4 @@ class AbstractCoroutineTest : TestBase() { coroutine.resumeWithException(TestException2()) finish(10) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt b/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt new file mode 100644 index 0000000000..749bbfc921 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/CancelledParentAttachTest.kt @@ -0,0 +1,85 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlin.test.* + +class CancelledParentAttachTest : TestBase() { + + @Test + fun testAsync() = CoroutineStart.values().forEach(::testAsyncCancelledParent) + + private fun testAsyncCancelledParent(start: CoroutineStart) = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val d = async(start = start) { 42 } + expect(2) + d.invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testLaunch() = CoroutineStart.values().forEach(::testLaunchCancelledParent) + + private fun testLaunchCancelledParent(start: CoroutineStart) = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val d = launch(start = start) { } + expect(2) + d.invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testProduce() = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val d = produce { } + expect(2) + (d as Job).invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testBroadcast() = CoroutineStart.values().forEach(::testBroadcastCancelledParent) + + private fun testBroadcastCancelledParent(start: CoroutineStart) = + runTest({ it is CancellationException }) { + cancel() + expect(1) + val bc = broadcast(start = start) {} + expect(2) + (bc as Job).invokeOnCompletion { + finish(3) + reset() + } + } + + @Test + fun testScopes() { + testScope { coroutineScope { } } + testScope { supervisorScope { } } + testScope { flowScope { } } + testScope { withTimeout(Long.MAX_VALUE) { } } + testScope { withContext(Job()) { } } + testScope { withContext(CoroutineName("")) { } } + } + + private inline fun testScope(crossinline block: suspend () -> Unit) = runTest({ it is CancellationException }) { + cancel() + block() + } +} diff --git a/kotlinx-coroutines-core/jvm/src/Builders.kt b/kotlinx-coroutines-core/jvm/src/Builders.kt index c1b878ce2c..edb4303198 100644 --- a/kotlinx-coroutines-core/jvm/src/Builders.kt +++ b/kotlinx-coroutines-core/jvm/src/Builders.kt @@ -63,7 +63,8 @@ private class BlockingCoroutine( parentContext: CoroutineContext, private val blockedThread: Thread, private val eventLoop: EventLoop? -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, true, true) { + override val isScopedCoroutine: Boolean get() = true override fun afterCompletion(state: Any?) { diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt index 0212d740bb..3a208c0f70 100644 --- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt +++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt @@ -127,7 +127,11 @@ private open class ActorCoroutine( parentContext: CoroutineContext, channel: Channel, active: Boolean -) : ChannelCoroutine(parentContext, channel, active), ActorScope { +) : ChannelCoroutine(parentContext, channel, initParentJob = false, active = active), ActorScope { + + init { + initParentJob(parentContext[Job]) + } override fun onCancelling(cause: Throwable?) { _channel.cancel(cause?.let { diff --git a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt similarity index 97% rename from kotlinx-coroutines-core/jvm/test/JoinStrTest.kt rename to kotlinx-coroutines-core/jvm/test/JoinStressTest.kt index 5090e7c06c..6d47418564 100644 --- a/kotlinx-coroutines-core/jvm/test/JoinStrTest.kt +++ b/kotlinx-coroutines-core/jvm/test/JoinStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines diff --git a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt index e20362ff93..de38df6b26 100644 --- a/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines @@ -171,4 +171,15 @@ class RunBlockingTest : TestBase() { } rb.hashCode() // unused } + + @Test + fun testCancelledParent() { + val job = Job() + job.cancel() + assertFailsWith { + runBlocking(job) { + expectUnreached() + } + } + } } diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt index ae95e694cd..d3b2ff1265 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorLazyTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -78,4 +78,14 @@ class ActorLazyTest : TestBase() { job.join() finish(5) } -} \ No newline at end of file + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor(start = CoroutineStart.LAZY) { + expectUnreached() + } + finish(2) + } +} diff --git a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt index bdca5039d2..1d7613eded 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ActorTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -173,11 +173,24 @@ class ActorTest(private val capacity: Int) : TestBase() { fun testCloseFreshActor() = runTest { for (start in CoroutineStart.values()) { val job = launch { - val actor = actor(start = start) { for (i in channel) {} } + val actor = actor(start = start) { + for (i in channel) { + } + } actor.close() } job.join() } } + + @Test + fun testCancelledParent() = runTest({ it is CancellationException }) { + cancel() + expect(1) + actor { + expectUnreached() + } + finish(2) + } } diff --git a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt index c7ada2d5b2..edce175d4a 100644 --- a/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt +++ b/kotlinx-coroutines-core/jvm/test/exceptions/SuppressionTests.kt @@ -16,7 +16,7 @@ class SuppressionTests : TestBase() { fun testNotificationsWithException() = runTest { expect(1) val coroutineContext = kotlin.coroutines.coroutineContext + NonCancellable // workaround for KT-22984 - val coroutine = object : AbstractCoroutine(coroutineContext, false) { + val coroutine = object : AbstractCoroutine(coroutineContext, true, false) { override fun onStart() { expect(3) } diff --git a/kotlinx-coroutines-core/native/src/Builders.kt b/kotlinx-coroutines-core/native/src/Builders.kt index 7425a05542..30c187fa96 100644 --- a/kotlinx-coroutines-core/native/src/Builders.kt +++ b/kotlinx-coroutines-core/native/src/Builders.kt @@ -53,7 +53,7 @@ public fun runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl private class BlockingCoroutine( parentContext: CoroutineContext, private val eventLoop: EventLoop? -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, true, true) { override val isScopedCoroutine: Boolean get() = true @Suppress("UNCHECKED_CAST") diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 35878b04b2..156205624e 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -80,7 +80,7 @@ public class PublisherCoroutine( parentContext: CoroutineContext, private val subscriber: Subscriber, private val exceptionOnCancelHandler: (Throwable, CoroutineContext) -> Unit -) : AbstractCoroutine(parentContext, true), ProducerScope, Subscription, SelectClause2> { +) : AbstractCoroutine(parentContext, false, true), ProducerScope, Subscription, SelectClause2> { override val channel: SendChannel get() = this // Mutex is locked when either nRequested == 0 or while subscriber.onXXX is being invoked diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 614e9ead1e..1f04ea538c 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -184,7 +184,11 @@ public class FlowSubscription( @JvmField public val flow: Flow, @JvmField public val subscriber: Subscriber, context: CoroutineContext -) : Subscription, AbstractCoroutine(context, true) { +) : Subscription, AbstractCoroutine(context, initParentJob = false, true) { + /* + * We deliberately set initParentJob to false and do not establish parent-child + * relationship because FlowSubscription doesn't support it + */ private val requested = atomic(0L) private val producer = atomic?>(createInitialContinuation()) diff --git a/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt new file mode 100644 index 0000000000..f846882ab2 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/CancelledParentAttachTest.kt @@ -0,0 +1,20 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.junit.* + + +class CancelledParentAttachTest : TestBase() { + + @Test + fun testFlow() = runTest { + val f = flowOf(1, 2, 3).cancellable() + val j = Job().also { it.cancel() } + f.asPublisher(j).asFlow().collect() + } +} diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 2d595c961f..e146dca9f6 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -58,7 +58,7 @@ private fun monoInternal( private class MonoCoroutine( parentContext: CoroutineContext, private val sink: MonoSink -) : AbstractCoroutine(parentContext, true), Disposable { +) : AbstractCoroutine(parentContext, false, true), Disposable { @Volatile private var disposed = false diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt index d0a43fb1a4..3400278cb3 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxCompletable.kt @@ -53,7 +53,7 @@ private fun rxCompletableInternal( private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: Unit) { try { subscriber.onComplete() diff --git a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt index f5ed48b9a9..c4ca54cf48 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxMaybe.kt @@ -54,7 +54,7 @@ private fun rxMaybeInternal( private class RxMaybeCoroutine( parentContext: CoroutineContext, private val subscriber: MaybeEmitter -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: T) { try { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt index 6d11cb9c35..4f4706d8ad 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxObservable.kt @@ -68,10 +68,10 @@ private const val OPEN = 0 // open channel, still working private const val CLOSED = -1 // closed, but have not signalled onCompleted/onError yet private const val SIGNALLED = -2 // already signalled subscriber onCompleted/onError -private class RxObservableCoroutine( +private class RxObservableCoroutine( parentContext: CoroutineContext, private val subscriber: ObservableEmitter -) : AbstractCoroutine(parentContext, true), ProducerScope, SelectClause2> { +) : AbstractCoroutine(parentContext, false, true), ProducerScope, SelectClause2> { override val channel: SendChannel get() = this // Mutex is locked when while subscriber.onXXX is being invoked diff --git a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt index b8012b6de9..3b81445312 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxSingle.kt @@ -53,7 +53,7 @@ private fun rxSingleInternal( private class RxSingleCoroutine( parentContext: CoroutineContext, private val subscriber: SingleEmitter -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: T) { try { subscriber.onSuccess(value) diff --git a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt index f4c5d7e768..88137675d8 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxCompletable.kt @@ -39,7 +39,7 @@ private fun rxCompletableInternal( private class RxCompletableCoroutine( parentContext: CoroutineContext, private val subscriber: CompletableEmitter -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: Unit) { try { subscriber.onComplete() diff --git a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt index ca1d5b59ee..1c10266470 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxMaybe.kt @@ -40,7 +40,7 @@ private fun rxMaybeInternal( private class RxMaybeCoroutine( parentContext: CoroutineContext, private val subscriber: MaybeEmitter -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: T) { try { if (value == null) subscriber.onComplete() else subscriber.onSuccess(value) diff --git a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt index 7bd07750fe..bd9239e7db 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxObservable.kt @@ -57,7 +57,7 @@ private const val SIGNALLED = -2 // already signalled subscriber onCompleted/on private class RxObservableCoroutine( parentContext: CoroutineContext, private val subscriber: ObservableEmitter -) : AbstractCoroutine(parentContext, true), ProducerScope, SelectClause2> { +) : AbstractCoroutine(parentContext, false, true), ProducerScope, SelectClause2> { override val channel: SendChannel get() = this // Mutex is locked when while subscriber.onXXX is being invoked diff --git a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt index f4d07fb40f..fb6020eab3 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxSingle.kt @@ -39,7 +39,7 @@ private fun rxSingleInternal( private class RxSingleCoroutine( parentContext: CoroutineContext, private val subscriber: SingleEmitter -) : AbstractCoroutine(parentContext, true) { +) : AbstractCoroutine(parentContext, false, true) { override fun onCompleted(value: T) { try { subscriber.onSuccess(value)