From cb682f5d0249851d5aae02146fe54c4f3f9ed966 Mon Sep 17 00:00:00 2001 From: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> Date: Wed, 7 Apr 2021 11:44:14 +0300 Subject: [PATCH] Complete mono { } on cancellation (#2606) Fixes https://github.com/Kotlin/kotlinx.coroutines/issues/2262 --- .../kotlinx-coroutines-reactor/src/Mono.kt | 27 +++--- .../test/MonoTest.kt | 93 +++++++++++++++++++ 2 files changed, 107 insertions(+), 13 deletions(-) diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index e146dca9f6..6786f1abab 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -7,21 +7,23 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.reactive.* +import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* import kotlin.internal.* /** - * Creates cold [mono][Mono] that will run a given [block] in a coroutine and emits its result. + * Creates a cold [mono][Mono] that runs a given [block] in a coroutine and emits its result. * Every time the returned mono is subscribed, it starts a new coroutine. - * If [block] result is `null`, [MonoSink.success] is invoked without a value. - * Unsubscribing cancels running coroutine. + * If the result of [block] is `null`, [MonoSink.success] is invoked without a value. + * Unsubscribing cancels the running coroutine. * * Coroutine context can be specified with [context] argument. * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. * - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ public fun mono( context: CoroutineContext = EmptyCoroutineContext, @@ -67,17 +69,16 @@ private class MonoCoroutine( } override fun onCancelled(cause: Throwable, handled: Boolean) { - try { - /* - * sink.error handles exceptions on its own and, by default, handling of undeliverable exceptions is a no-op. - * Guard potentially non-empty handlers against meaningless cancellation exceptions - */ - if (getCancellationException() !== cause) { + /** Cancellation exceptions that were caused by [dispose], that is, came from downstream, are not errors. */ + if (getCancellationException() !== cause || !disposed) { + try { + /** If [sink] turns out to already be in a terminal state, this exception will be passed through the + * [Hooks.onOperatorError] hook, which is the way to signal undeliverable exceptions in Reactor. */ sink.error(cause) + } catch (e: Throwable) { + // In case of improper error implementation or fatal exceptions + handleCoroutineException(context, cause) } - } catch (e: Throwable) { - // In case of improper error implementation or fatal exceptions - handleCoroutineException(context, cause) } } diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index 0271483fc1..97b195c517 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.reactor import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.* import org.junit.* @@ -21,6 +22,7 @@ class MonoTest : TestBase() { @Before fun setup() { ignoreLostThreads("timer-", "parallel-") + Hooks.onErrorDropped { expectUnreached() } } @Test @@ -285,4 +287,95 @@ class MonoTest : TestBase() { .collect { } } } + + /** Test that cancelling a [mono] due to a timeout does throw an exception. */ + @Test + fun testTimeout() { + val mono = mono { + withTimeout(1) { delay(100) } + } + try { + mono.doOnSubscribe { expect(1) } + .doOnNext { expectUnreached() } + .doOnSuccess { expectUnreached() } + .doOnError { expect(2) } + .doOnCancel { expectUnreached() } + .block() + } catch (e: CancellationException) { + expect(3) + } + finish(4) + } + + /** Test that when the reason for cancellation of a [mono] is that the downstream doesn't want its results anymore, + * this is considered normal behavior and exceptions are not propagated. */ + @Test + fun testDownstreamCancellationDoesNotThrow() = runTest { + /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We don't expect it + * to be fired in this case, as the reason for the publisher in this test to accept an exception is simply + * cancellation from the downstream. */ + Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a -> + expectUnreached() + t + } + /** A Mono that doesn't emit a value and instead waits indefinitely. */ + val mono = mono { expect(3); delay(Long.MAX_VALUE) } + .doOnSubscribe { expect(2) } + .doOnNext { expectUnreached() } + .doOnSuccess { expectUnreached() } + .doOnError { expectUnreached() } + .doOnCancel { expect(4) } + expect(1) + mono.awaitCancelAndJoin() + finish(5) + Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") + } + + /** Test that, when [Mono] is cancelled by the downstream and throws during handling the cancellation, the resulting + * error is propagated to [Hooks.onOperatorError]. */ + @Test + fun testRethrowingDownstreamCancellation() = runTest { + /** Attach a hook that handles exceptions from publishers that are known to be disposed of. We expect it + * to be fired in this case. */ + Hooks.onOperatorError("testDownstreamCancellationDoesNotThrow") { t, a -> + expect(5) + t + } + /** A Mono that doesn't emit a value and instead waits indefinitely, and, when cancelled, throws. */ + val mono = mono { + expect(3); + try { + delay(Long.MAX_VALUE) + } catch (e: CancellationException) { + throw TestException() + } + } + .doOnSubscribe { expect(2) } + .doOnNext { expectUnreached() } + .doOnSuccess { expectUnreached() } + .doOnError { expectUnreached() } + .doOnCancel { expect(4) } + expect(1) + mono.awaitCancelAndJoin() + finish(6) /** if this line fails, see the comment for [awaitCancelAndJoin] */ + Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") + } + + /** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and *return only then*. + * + * There are no guarantees about the execution context in which the cancellation handler will run, but we have + * to wait for it to finish to check its behavior. The contraption below seems to ensure that everything works out. + * If it stops giving that guarantee, then [testRethrowingDownstreamCancellation] should fail more or less + * consistently because the hook won't have enough time to fire before a call to [finish]. + */ + private suspend fun Mono.awaitCancelAndJoin() = coroutineScope { + val job = async(start = CoroutineStart.UNDISPATCHED) { + awaitFirstOrNull() + } + newSingleThreadContext("monoCancellationCleanup").use { pool -> + launch(pool) { + job.cancelAndJoin() + } + }.join() + } }