Skip to content

Commit

Permalink
Complete mono { } on cancellation (Kotlin#2606)
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb authored and pablobaxter committed Sep 14, 2022
1 parent 6757b76 commit cb682f5
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 13 deletions.
27 changes: 14 additions & 13 deletions reactive/kotlinx-coroutines-reactor/src/Mono.kt
Expand Up @@ -7,21 +7,23 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.reactivestreams.*
import reactor.core.*
import reactor.core.publisher.*
import kotlin.coroutines.*
import kotlin.internal.*

/**
* Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result.
* Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result.
* Every time the returned mono is subscribed, it starts a new coroutine.
* If [block] result is `null`, [MonoSink.success] is invoked without a value.
* Unsubscribing cancels running coroutine.
* If the result of [block] is `null`, [MonoSink.success] is invoked without a value.
* Unsubscribing cancels the running coroutine.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
*
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
public fun <T> mono(
context: CoroutineContext = EmptyCoroutineContext,
Expand Down Expand Up @@ -67,17 +69,16 @@ private class MonoCoroutine<in T>(
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
try {
/*
* sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op.
* Guard potentially non-empty handlers against meaningless cancellation exceptions
*/
if (getCancellationException() !== cause) {
/** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */
if (getCancellationException() !== cause || !disposed) {
try {
/** If [sink] turns out to already be in a terminal state, this exception will be passed through the
* [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */
sink.error(cause)
} catch (e: Throwable) {
// In case of improper error implementation or fatal exceptions
handleCoroutineException(context, cause)
}
} catch (e: Throwable) {
// In case of improper error implementation or fatal exceptions
handleCoroutineException(context, cause)
}
}

Expand Down
93 changes: 93 additions & 0 deletions reactive/kotlinx-coroutines-reactor/test/MonoTest.kt
Expand Up @@ -5,6 +5,7 @@
package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.*
Expand All @@ -21,6 +22,7 @@ class MonoTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("timer-", "parallel-")
Hooks.onErrorDropped { expectUnreached() }
}

@Test
Expand Down Expand Up @@ -285,4 +287,95 @@ class MonoTest : TestBase() {
.collect { }
}
}

/** Test that cancelling a [mono] due to a timeout does throw an exception. */
@Test
fun testTimeout() {
val mono = mono {
withTimeout(1) { delay(100) }
}
try {
mono.doOnSubscribe { expect(1) }
.doOnNext { expectUnreached() }
.doOnSuccess { expectUnreached() }
.doOnError { expect(2) }
.doOnCancel { expectUnreached() }
.block()
} catch (e: CancellationException) {
expect(3)
}
finish(4)
}

/** Test that when the reason for cancellation of a [mono] is that the downstream doesn't want its results anymore,
* this is considered normal behavior and exceptions are not propagated. */
@Test
fun testDownstreamCancellationDoesNotThrow() = runTest {
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it
* to be fired in this case, as the reason for the publisher in this test to accept an exception is simply
* cancellation from the downstream. */
Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
expectUnreached()
t
}
/** A Mono that doesn't emit a value and instead waits indefinitely. */
val mono = mono { expect(3); delay(Long.MAX_VALUE) }
.doOnSubscribe { expect(2) }
.doOnNext { expectUnreached() }
.doOnSuccess { expectUnreached() }
.doOnError { expectUnreached() }
.doOnCancel { expect(4) }
expect(1)
mono.awaitCancelAndJoin()
finish(5)
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
}

/** Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting
* error is propagated to [Hooks.onOperatorError]. */
@Test
fun testRethrowingDownstreamCancellation() = runTest {
/** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it
* to be fired in this case. */
Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a ->
expect(5)
t
}
/** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */
val mono = mono {
expect(3);
try {
delay(Long.MAX_VALUE)
} catch (e: CancellationException) {
throw TestException()
}
}
.doOnSubscribe { expect(2) }
.doOnNext { expectUnreached() }
.doOnSuccess { expectUnreached() }
.doOnError { expectUnreached() }
.doOnCancel { expect(4) }
expect(1)
mono.awaitCancelAndJoin()
finish(6) /** if this line fails, see the comment for [awaitCancelAndJoin] */
Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow")
}

/** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and *return only then*.
*
* There are no guarantees about the execution context in which the cancellation handler will run, but we have
* to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out.
* If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less
* consistently because the hook won't have enough time to fire before a call to [finish].
*/
private suspend fun <T> Mono<T>.awaitCancelAndJoin() = coroutineScope {
val job = async(start = CoroutineStart.UNDISPATCHED) {
awaitFirstOrNull()
}
newSingleThreadContext("monoCancellationCleanup").use { pool ->
launch(pool) {
job.cancelAndJoin()
}
}.join()
}
}

0 comments on commit cb682f5

Please sign in to comment.