Skip to content

Commit

Permalink
Setup cancellation eagerly in suspendCancellableCoroutine to properly…
Browse files Browse the repository at this point in the history
… integrate with APIs that may block the current thread, but react on cancellation

Fixes #1671
  • Loading branch information
qwwdfsad committed Apr 27, 2020
1 parent eb4e7d3 commit 8fb8d60
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 23 deletions.
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -37,7 +37,7 @@ public final class kotlinx/coroutines/BuildersKt {
public abstract interface class kotlinx/coroutines/CancellableContinuation : kotlin/coroutines/Continuation {
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract fun completeResume (Ljava/lang/Object;)V
public abstract synthetic fun initCancellability ()V
public abstract fun initCancellability ()V
public abstract fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
public abstract fun isActive ()Z
public abstract fun isCancelled ()Z
Expand All @@ -63,7 +63,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun getContinuationCancellationCause (Lkotlinx/coroutines/Job;)Ljava/lang/Throwable;
public final fun getResult ()Ljava/lang/Object;
public fun getStackTraceElement ()Ljava/lang/StackTraceElement;
public synthetic fun initCancellability ()V
public fun initCancellability ()V
public fun invokeOnCancellation (Lkotlin/jvm/functions/Function1;)V
public fun isActive ()Z
public fun isCancelled ()Z
Expand Down
27 changes: 12 additions & 15 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -100,14 +100,8 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* Legacy function that turned on cancellation behavior in [suspendCancellableCoroutine] before kotlinx.coroutines 1.1.0.
* This function does nothing and is left only for binary compatibility with old compiled code.
*
* @suppress **Deprecated**: This function is no longer used.
* It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0.
* @suppress **This is unstable API and it is subject to change.**
*/
@Deprecated(
level = DeprecationLevel.HIDDEN,
message = "This function is no longer used. " +
"It is left for binary compatibility with code compiled before kotlinx.coroutines 1.1.0. "
)
@InternalCoroutinesApi
public fun initCancellability()

Expand Down Expand Up @@ -183,7 +177,7 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* It can be invoked concurrently with the surrounding code.
* There is no guarantee on the execution context of its invocation.
*/
@ExperimentalCoroutinesApi // since 1.2.0, tentatively graduates in 1.3.0
@ExperimentalCoroutinesApi // since 1.2.0
public fun resume(value: T, onCancellation: (cause: Throwable) -> Unit)
}

Expand All @@ -196,9 +190,12 @@ public suspend inline fun <T> suspendCancellableCoroutine(
): T =
suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE)
// NOTE: Before version 1.1.0 the following invocation was inlined here, so invocation of this
// method indicates that the code was compiled by kotlinx.coroutines < 1.1.0
// cancellable.initCancellability()
/*
* For non-atomic cancellation we setup parent-child relationship immediately
* in case when `block` blocks the current thread (e.g. Rx2 with trampoline scheduler), but
* properly supports cancellation.
*/
cancellable.initCancellability()
block(cancellable)
cancellable.getResult()
}
Expand Down Expand Up @@ -229,10 +226,10 @@ public suspend inline fun <T> suspendAtomicCancellableCoroutine(
internal suspend inline fun <T> suspendAtomicCancellableCoroutineReusable(
crossinline block: (CancellableContinuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { uCont ->
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
block(cancellable)
cancellable.getResult()
}
val cancellable = getOrCreateCancellableContinuation(uCont.intercepted())
block(cancellable)
cancellable.getResult()
}

internal fun <T> getOrCreateCancellableContinuation(delegate: Continuation<T>): CancellableContinuationImpl<T> {
// If used outside of our dispatcher
Expand Down
Expand Up @@ -32,8 +32,8 @@ internal open class CancellableContinuationImpl<in T>(
/*
* Implementation notes
*
* AbstractContinuation is a subset of Job with following limitations:
* 1) It can have only cancellation listeners
* CancellableContinuationImpl is a subset of Job with following limitations:
* 1) It can have only cancellation listener (no "on cancelling")
* 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
* 3) It can have at most one cancellation listener
* 4) Its cancellation listeners cannot be deregistered
Expand Down Expand Up @@ -82,7 +82,7 @@ internal open class CancellableContinuationImpl<in T>(
public override val isCancelled: Boolean get() = state is CancelledContinuation

public override fun initCancellability() {
// This method does nothing. Leftover for binary compatibility with old compiled code
setupCancellation()
}

private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
Expand All @@ -107,7 +107,8 @@ internal open class CancellableContinuationImpl<in T>(

/**
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
* It is only invoked from an internal [getResult] function.
* It is only invoked from an internal [getResult] function for reusable continuations
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
*/
private fun setupCancellation() {
if (checkCompleted()) return
Expand Down Expand Up @@ -453,4 +454,3 @@ private class CompletedWithCancellation(
) {
override fun toString(): String = "CompletedWithCancellation[$result]"
}

21 changes: 20 additions & 1 deletion kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
Expand Up @@ -30,7 +30,7 @@ class CancellableResumeTest : TestBase() {
expected = { it is TestException }
) {
expect(1)
val ok = suspendCancellableCoroutine<String> { cont ->
suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
cont.cancel(TestException("FAIL"))
Expand All @@ -44,6 +44,25 @@ class CancellableResumeTest : TestBase() {
expectUnreached()
}

@Test
fun testResumeImmediateAfterIndirectCancel() = runTest(
expected = { it is CancellationException }
) {
expect(1)
val ctx = coroutineContext
suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
ctx.cancel()
expect(4)
cont.resume("OK") { cause ->
expect(5)
}
finish(6)
}
expectUnreached()
}

@Test
fun testResumeLaterNormally() = runTest {
expect(1)
Expand Down
59 changes: 59 additions & 0 deletions kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt
Expand Up @@ -20,4 +20,63 @@ class CancellableContinuationJvmTest : TestBase() {
}
suspend {}() // Eliminate tail-call optimization
}

@Test
fun testExceptionIsNotReported() = runTest({ it is CancellationException }) {
val ctx = coroutineContext
suspendCancellableCoroutine<Unit> {
ctx.cancel()
it.resumeWith(Result.failure(TestException()))
}
}

@Test
fun testBlockingIntegration() = runTest {
val source = BlockingSource()
val job = launch(Dispatchers.Default) {
source.await()
}
source.cancelAndJoin(job)
}

@Test
fun testBlockingIntegrationAlreadyCancelled() = runTest {
val source = BlockingSource()
val job = launch(Dispatchers.Default) {
cancel()
source.await()
}
source.cancelAndJoin(job)
}

private suspend fun BlockingSource.cancelAndJoin(job: Job) {
while (!hasSubscriber) {
Thread.sleep(10)
}
job.cancelAndJoin()
}

private suspend fun BlockingSource.await() = suspendCancellableCoroutine<Unit> {
it.invokeOnCancellation { this.cancel() }
subscribe()
}

private class BlockingSource {
@Volatile
private var isCancelled = false

@Volatile
public var hasSubscriber = false

public fun subscribe() {
hasSubscriber = true
while (!isCancelled) {
Thread.sleep(10)
}
}

public fun cancel() {
isCancelled = true
}
}
}

0 comments on commit 8fb8d60

Please sign in to comment.