From d680cbac60bd2b2f75a1e12d52665ef3b7111b8f Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 28 Nov 2019 16:32:51 +0300 Subject: [PATCH] Setup cancellation eagerly in suspendCancellableCoroutine to properly integrate with APIs that may block the current thread, but react on cancellation Fixes #1671 --- .../kotlinx-coroutines-core.txt | 1 + .../common/src/CancellableContinuation.kt | 16 +++-- .../common/src/CancellableContinuationImpl.kt | 11 ++-- .../common/test/CancellableResumeTest.kt | 21 ++++++- .../test/CancellableContinuationJvmTest.kt | 59 +++++++++++++++++++ 5 files changed, 97 insertions(+), 11 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 5b7955c499..e3c17c1025 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -73,6 +73,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/ public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V public fun resumeWith (Ljava/lang/Object;)V + public final fun setupCancellation ()V public fun toString ()Ljava/lang/String; public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object; public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object; diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index d1e99529a5..686c4fb5b1 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -183,7 +183,7 @@ public interface CancellableContinuation : Continuation { * It can be invoked concurrently with the surrounding code. * There is no guarantee on the execution context of its invocation. */ - @ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0 + @ExperimentalCoroutinesApi // since 1.2.0 public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit) } @@ -199,6 +199,12 @@ public suspend inline fun suspendCancellableCoroutine( // NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this // method indicates that the code was compiled by kotlinx.coroutines < 1.1.0 // cancellable.initCancellability() + /* + * For non-atomic cancellation we setup parent-child relationship immediately + * in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but + * properly supports cancellation. + */ + cancellable.setupCancellation() block(cancellable) cancellable.getResult() } @@ -229,10 +235,10 @@ public suspend inline fun suspendAtomicCancellableCoroutine( internal suspend inline fun suspendAtomicCancellableCoroutineReusable( crossinline block: (CancellableContinuation) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> - val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) - block(cancellable) - cancellable.getResult() - } + val cancellable = getOrCreateCancellableContinuation(uCont.intercepted()) + block(cancellable) + cancellable.getResult() +} internal fun getOrCreateCancellableContinuation(delegate: Continuation): CancellableContinuationImpl { // If used outside of our dispatcher diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index f5b5900cb6..1869797d07 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -31,8 +31,8 @@ internal open class CancellableContinuationImpl( /* * Implementation notes * - * AbstractContinuation is a subset of Job with following limitations: - * 1) It can have only cancellation listeners + * CancellableContinuationImpl is a subset of Job with following limitations: + * 1) It can have only cancellation listener (no "on cancelling") * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately') * 3) It can have at most one cancellation listener * 4) Its cancellation listeners cannot be deregistered @@ -105,9 +105,11 @@ internal open class CancellableContinuationImpl( /** * Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations. - * It is only invoked from an internal [getResult] function. + * It is only invoked from an internal [getResult] function for reusable continuations + * and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere. */ - private fun setupCancellation() { + @PublishedApi + internal fun setupCancellation() { if (checkCompleted()) return if (parentHandle !== null) return // fast path 2 -- was already initialized val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent @@ -451,4 +453,3 @@ private class CompletedWithCancellation( ) { override fun toString(): String = "CompletedWithCancellation[$result]" } - diff --git a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt index b2cde6b978..035e2fb55d 100644 --- a/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt +++ b/kotlinx-coroutines-core/common/test/CancellableResumeTest.kt @@ -30,7 +30,7 @@ class CancellableResumeTest : TestBase() { expected = { it is TestException } ) { expect(1) - val ok = suspendCancellableCoroutine { cont -> + suspendCancellableCoroutine { cont -> expect(2) cont.invokeOnCancellation { expect(3) } cont.cancel(TestException("FAIL")) @@ -44,6 +44,25 @@ class CancellableResumeTest : TestBase() { expectUnreached() } + @Test + fun testResumeImmediateAfterIndirectCancel() = runTest( + expected = { it is CancellationException } + ) { + expect(1) + val ctx = coroutineContext + suspendCancellableCoroutine { cont -> + expect(2) + cont.invokeOnCancellation { expect(3) } + ctx.cancel() + expect(4) + cont.resume("OK") { cause -> + expect(5) + } + finish(6) + } + expectUnreached() + } + @Test fun testResumeLaterNormally() = runTest { expect(1) diff --git a/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt b/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt index 4e25da96f5..f1a957adca 100644 --- a/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt +++ b/kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt @@ -20,4 +20,63 @@ class CancellableContinuationJvmTest : TestBase() { } suspend {}() // Eliminate tail-call optimization } + + @Test + fun testExceptionIsNotReported() = runTest({ it is CancellationException }) { + val ctx = coroutineContext + suspendCancellableCoroutine { + ctx.cancel() + it.resumeWith(Result.failure(TestException())) + } + } + + @Test + fun testBlockingIntegration() = runTest { + val source = BlockingSource() + val job = launch(Dispatchers.Default) { + source.await() + } + source.cancelAndJoin(job) + } + + @Test + fun testBlockingIntegrationAlreadyCancelled() = runTest { + val source = BlockingSource() + val job = launch(Dispatchers.Default) { + cancel() + source.await() + } + source.cancelAndJoin(job) + } + + private suspend fun BlockingSource.cancelAndJoin(job: Job) { + while (!hasSubscriber) { + Thread.sleep(10) + } + job.cancelAndJoin() + } + + private suspend fun BlockingSource.await() = suspendCancellableCoroutine { + it.invokeOnCancellation { this.cancel() } + subscribe() + } + + private class BlockingSource { + @Volatile + private var isCancelled = false + + @Volatile + public var hasSubscriber = false + + public fun subscribe() { + hasSubscriber = true + while (!isCancelled) { + Thread.sleep(10) + } + } + + public fun cancel() { + isCancelled = true + } + } }