From 293ee497d4538bae933d73a5806c8b94a1f222fa Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 27 Apr 2020 21:12:33 +0300 Subject: [PATCH] Setup cancellation eagerly in suspendCancellableCoroutine to properly integrate with APIs that may block the current thread, but react on cancellation (#1680) Fixes #1671 --- .../api/kotlinx-coroutines-core.api | 4 +- .../common/src/CancellableContinuation.kt | 27 ++++----- .../common/src/CancellableContinuationImpl.kt | 10 ++-- .../common/test/CancellableResumeTest.kt | 21 ++++++- .../test/CancellableContinuationJvmTest.kt | 59 +++++++++++++++++++ 5 files changed, 98 insertions(+), 23 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 08f46e5108..f908f964d4 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -37,7 +37,7 @@ public final class kotlinx/coroutines/BuildersKt { public abstract interface class kotlinx/coroutines/CancellableContinuation : kotlin/coroutines/Continuation { public abstract fun cancel (Ljava/lang/Throwable;)Z public abstract fun completeResume (Ljava/lang/Object;)V - public abstract synthetic fun initCancellability ()V + public abstract fun initCancellability ()V public abstract fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V public abstract fun isActive ()Z public abstract fun isCancelled ()Z @@ -63,7 +63,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/ public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable; public final fun getResult ()Ljava/lang/Object; public fun getStackTraceElement ()Ljava/lang/StackTraceElement; - public synthetic fun initCancellability ()V + public fun initCancellability ()V public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V public fun isActive ()Z public fun isCancelled ()Z diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index e01dc82521..0d3fe847dc 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -100,14 +100,8 @@ public interface CancellableContinuation : Continuation { * Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0. * This function does nothing and is left only for binary compatibility with old compiled code. * - * @suppress **Deprecated**: This function is no longer used. - * It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0. + * @suppress **This is unstable API and it is subject to change.** */ - @Deprecated( - level = DeprecationLevel.HIDDEN, - message = "This function is no longer used. " + - "It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0. " - ) @InternalCoroutinesApi public fun initCancellability() @@ -183,7 +177,7 @@ public interface CancellableContinuation : Continuation { * It can be invoked concurrently with the surrounding code. * There is no guarantee on the execution context of its invocation. */ - @ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0 + @ExperimentalCoroutinesApi // since 1.2.0 public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) } @@ -196,9 +190,12 @@ public suspend inline fun suspendCancellableCoroutine( ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) - // NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this - // method indicates that the code was compiled by kotlinx.coroutines < 1.1.0 - // cancellable.initCancellability() + /* + * For non-atomic cancellation we setup parent-child relationship immediately + * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but + * properly supports cancellation. + */ + cancellable.initCancellability() block(cancellable) cancellable.getResult() } @@ -229,10 +226,10 @@ public suspend inline fun suspendAtomicCancellableCoroutine( internal suspend inline fun suspendAtomicCancellableCoroutineReusable( crossinline block: (CancellableContinuation) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> - val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) - block(cancellable) - cancellable.getResult() - } + val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) + block(cancellable) + cancellable.getResult() +} internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { // If used outside of our dispatcher diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 1f67dd3c6c..e25ebd3a37 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -32,8 +32,8 @@ internal open class CancellableContinuationImpl( /* * Implementation notes * - * AbstractContinuation is a subset of Job with following limitations: - * 1) It can have only cancellation listeners + * CancellableContinuationImpl is a subset of Job with following limitations: + * 1) It can have only cancellation listener (no "on cancelling") * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately') * 3) It can have at most one cancellation listener * 4) Its cancellation listeners cannot be deregistered @@ -82,7 +82,7 @@ internal open class CancellableContinuationImpl( public override val isCancelled: Boolean get() = state is CancelledContinuation public override fun initCancellability() { - // This method does nothing. Leftover for binary compatibility with old compiled code + setupCancellation() } private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) @@ -107,7 +107,8 @@ internal open class CancellableContinuationImpl( /** * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. - * It is only invoked from an internal [getResult] function. + * It is only invoked from an internal [getResult] function for reusable continuations + * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. */ private fun setupCancellation() { if (checkCompleted()) return @@ -453,4 +454,3 @@ private class CompletedWithCancellation( ) { override fun toString(): String = "CompletedWithCancellation[$result]" } - diff --git a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt index b2cde6b978..035e2fb55d 100644 --- a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt +++ b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt @@ -30,7 +30,7 @@ class CancellableResumeTest : TestBase() { expected = { it is TestException } ) { expect(1) - val ok = suspendCancellableCoroutine { cont -> + suspendCancellableCoroutine { cont -> expect(2) cont.invokeOnCancellation { expect(3) } cont.cancel(TestException("FAIL")) @@ -44,6 +44,25 @@ class CancellableResumeTest : TestBase() { expectUnreached() } + @Test + fun testResumeImmediateAfterIndirectCancel() = runTest( + expected = { it is CancellationException } + ) { + expect(1) + val ctx = coroutineContext + suspendCancellableCoroutine { cont -> + expect(2) + cont.invokeOnCancellation { expect(3) } + ctx.cancel() + expect(4) + cont.resume("OK") { cause -> + expect(5) + } + finish(6) + } + expectUnreached() + } + @Test fun testResumeLaterNormally() = runTest { expect(1) diff --git a/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt b/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt index 4e25da96f5..f1a957adca 100644 --- a/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt @@ -20,4 +20,63 @@ class CancellableContinuationJvmTest : TestBase() { } suspend {}() // Eliminate tail-call optimization } + + @Test + fun testExceptionIsNotReported() = runTest({ it is CancellationException }) { + val ctx = coroutineContext + suspendCancellableCoroutine { + ctx.cancel() + it.resumeWith(Result.failure(TestException())) + } + } + + @Test + fun testBlockingIntegration() = runTest { + val source = BlockingSource() + val job = launch(Dispatchers.Default) { + source.await() + } + source.cancelAndJoin(job) + } + + @Test + fun testBlockingIntegrationAlreadyCancelled() = runTest { + val source = BlockingSource() + val job = launch(Dispatchers.Default) { + cancel() + source.await() + } + source.cancelAndJoin(job) + } + + private suspend fun BlockingSource.cancelAndJoin(job: Job) { + while (!hasSubscriber) { + Thread.sleep(10) + } + job.cancelAndJoin() + } + + private suspend fun BlockingSource.await() = suspendCancellableCoroutine { + it.invokeOnCancellation { this.cancel() } + subscribe() + } + + private class BlockingSource { + @Volatile + private var isCancelled = false + + @Volatile + public var hasSubscriber = false + + public fun subscribe() { + hasSubscriber = true + while (!isCancelled) { + Thread.sleep(10) + } + } + + public fun cancel() { + isCancelled = true + } + } }