Skip to content

Commit

Permalink
Setup cancellation eagerly in suspendCancellableCoroutine to properly…
Browse files Browse the repository at this point in the history
… integrate with APIs that may block the current thread, but react on cancellation

Fixes #1671
  • Loading branch information
qwwdfsad committed Nov 28, 2019
1 parent 7e895fc commit 7793e96
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 6 deletions.
Expand Up @@ -73,6 +73,7 @@ public class kotlinx/coroutines/CancellableContinuationImpl : kotlin/coroutines/
public fun resumeUndispatched (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Object;)V
public fun resumeUndispatchedWithException (Lkotlinx/coroutines/CoroutineDispatcher;Ljava/lang/Throwable;)V
public fun resumeWith (Ljava/lang/Object;)V
public final fun setupCancellation ()V
public fun toString ()Ljava/lang/String;
public fun tryResume (Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;
public fun tryResumeWithException (Ljava/lang/Throwable;)Ljava/lang/Object;
Expand Down
Expand Up @@ -31,8 +31,8 @@ internal open class CancellableContinuationImpl<in T>(
/*
* Implementation notes
*
* AbstractContinuation is a subset of Job with following limitations:
* 1) It can have only cancellation listeners
* CancellableContinuationImpl is a subset of Job with following limitations:
* 1) It can have only cancellation listener (no "on cancelling")
* 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
* 3) It can have at most one cancellation listener
* 4) Its cancellation listeners cannot be deregistered
Expand Down Expand Up @@ -105,9 +105,11 @@ internal open class CancellableContinuationImpl<in T>(

/**
* Setups parent cancellation and checks for postponed cancellation in the case of reusable continuations.
* It is only invoked from an internal [getResult] function.
* It is only invoked from an internal [getResult] function for reusable continuations
* and from [suspendCancellableCoroutine] to establish a cancellation before registering CC anywhere.
*/
private fun setupCancellation() {
@PublishedApi
internal fun setupCancellation() {
if (checkCompleted()) return
if (parentHandle !== null) return // fast path 2 -- was already initialized
val parent = delegate.context[Job] ?: return // fast path 3 -- don't do anything without parent
Expand Down Expand Up @@ -451,4 +453,3 @@ private class CompletedWithCancellation(
) {
override fun toString(): String = "CompletedWithCancellation[$result]"
}

21 changes: 20 additions & 1 deletion kotlinx-coroutines-core/common/test/CancellableResumeTest.kt
Expand Up @@ -30,7 +30,7 @@ class CancellableResumeTest : TestBase() {
expected = { it is TestException }
) {
expect(1)
val ok = suspendCancellableCoroutine<String> { cont ->
suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
cont.cancel(TestException("FAIL"))
Expand All @@ -44,6 +44,25 @@ class CancellableResumeTest : TestBase() {
expectUnreached()
}

@Test
fun testResumeImmediateAfterIndirectCancel() = runTest(
expected = { it is CancellationException }
) {
expect(1)
val ctx = coroutineContext
suspendCancellableCoroutine<String> { cont ->
expect(2)
cont.invokeOnCancellation { expect(3) }
ctx.cancel()
expect(4)
cont.resume("OK") { cause ->
expect(5)
}
finish(6)
}
expectUnreached()
}

@Test
fun testResumeLaterNormally() = runTest {
expect(1)
Expand Down
59 changes: 59 additions & 0 deletions kotlinx-coroutines-core/jvm/test/CancellableContinuationJvmTest.kt
Expand Up @@ -20,4 +20,63 @@ class CancellableContinuationJvmTest : TestBase() {
}
suspend {}() // Eliminate tail-call optimization
}

@Test
fun testExceptionIsNotReported() = runTest({ it is CancellationException }) {
val ctx = coroutineContext
suspendCancellableCoroutine<Unit> {
ctx.cancel()
it.resumeWith(Result.failure(TestException()))
}
}

@Test
fun testBlockingIntegration() = runTest {
val source = BlockingSource()
val job = launch(Dispatchers.Default) {
source.await()
}
source.cancelAndJoin(job)
}

@Test
fun testBlockingIntegrationAlreadyCancelled() = runTest {
val source = BlockingSource()
val job = launch(Dispatchers.Default) {
cancel()
source.await()
}
source.cancelAndJoin(job)
}

private suspend fun BlockingSource.cancelAndJoin(job: Job) {
while (!hasSubscriber) {
Thread.sleep(10)
}
job.cancelAndJoin()
}

private suspend fun BlockingSource.await() = suspendCancellableCoroutine<Unit> {
it.invokeOnCancellation { this.cancel() }
subscribe()
}

private class BlockingSource {
@Volatile
private var isCancelled = false

@Volatile
public var hasSubscriber = false

public fun subscribe() {
hasSubscriber = true
while (!isCancelled) {
Thread.sleep(10)
}
}

public fun cancel() {
isCancelled = true
}
}
}

0 comments on commit 7793e96

Please sign in to comment.