diff --git a/reactive/kotlinx-coroutines-jdk9/src/Await.kt b/reactive/kotlinx-coroutines-jdk9/src/Await.kt index 4febf4079c..dfe6ec52f2 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/Await.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/Await.kt @@ -4,78 +4,82 @@ package kotlinx.coroutines.jdk9 +import kotlinx.coroutines.* import java.util.concurrent.* import org.reactivestreams.FlowAdapters import kotlinx.coroutines.reactive.* /** - * Awaits for the first value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if + * the publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value + * @throws NoSuchElementException if the publisher does not emit any value */ -public suspend fun Flow.Publisher.awaitFirst(): T = FlowAdapters.toPublisher(this).awaitFirst() +public suspend fun Flow.Publisher.awaitFirst(): T = + FlowAdapters.toPublisher(this).awaitFirst() /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking + * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException]. */ public suspend fun Flow.Publisher.awaitFirstOrDefault(default: T): T = - FlowAdapters.toPublisher(this).awaitFirstOrDefault(default) + FlowAdapters.toPublisher(this).awaitFirstOrDefault(default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread, + * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException]. */ public suspend fun Flow.Publisher.awaitFirstOrNull(): T? = - FlowAdapters.toPublisher(this).awaitFirstOrNull() + FlowAdapters.toPublisher(this).awaitFirstOrNull() /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException]. */ public suspend fun Flow.Publisher.awaitFirstOrElse(defaultValue: () -> T): T = - FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue) + FlowAdapters.toPublisher(this).awaitFirstOrElse(defaultValue) /** - * Awaits for the last value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the last value from the given publisher without blocking the thread and + * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value + * @throws NoSuchElementException if the publisher does not emit any value */ public suspend fun Flow.Publisher.awaitLast(): T = - FlowAdapters.toPublisher(this).awaitLast() + FlowAdapters.toPublisher(this).awaitLast() /** - * Awaits for the single value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, + * if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Flow.Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * @throws NoSuchElementException if the publisher does not emit any value + * @throws IllegalArgumentException if the publisher emits more than one value */ public suspend fun Flow.Publisher.awaitSingle(): T = - FlowAdapters.toPublisher(this).awaitSingle() + FlowAdapters.toPublisher(this).awaitSingle() diff --git a/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt b/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt new file mode 100644 index 0000000000..5a95d098fd --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/AwaitTest.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import org.junit.* +import java.util.concurrent.Flow as JFlow + +class AwaitTest: TestBase() { + + /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and + * unsubscribe from the publisher when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val publisher = JFlow.Publisher { s -> + s.onSubscribe(object : JFlow.Subscription { + override fun request(n: Long) { + expect(3) + } + + override fun cancel() { + expect(5) + } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + publisher.awaitFirst() + } catch (e: CancellationException) { + expect(6) + throw e + } + } + expect(4) + job.cancelAndJoin() + finish(7) + } + +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/Await.kt b/reactive/kotlinx-coroutines-reactive/src/Await.kt index 9af134cb94..067f5e8031 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Await.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Await.kt @@ -9,114 +9,161 @@ import org.reactivestreams.Publisher import org.reactivestreams.Subscriber import org.reactivestreams.Subscription import java.lang.IllegalStateException -import java.util.* import kotlin.coroutines.* /** - * Awaits for the first value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the first value from the given publisher without blocking the thread and returns the resulting value, or, if + * the publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value + * @throws NoSuchElementException if the publisher does not emit any value */ public suspend fun Publisher.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or returns the [default] value if none is emitted, without blocking + * the thread, and returns the resulting value, or, if this publisher has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun Publisher.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or returns `null` if none is emitted, without blocking the thread, + * and returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun Publisher.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. */ public suspend fun Publisher.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the last value from the given publisher without blocking the thread and + * returns the resulting value, or, if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value + * @throws NoSuchElementException if the publisher does not emit any value */ public suspend fun Publisher.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given publisher without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, + * if this publisher has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * @throws NoSuchElementException if the publisher does not emit any value + * @throws IllegalArgumentException if the publisher emits more than one value */ public suspend fun Publisher.awaitSingle(): T = awaitOne(Mode.SINGLE) /** - * Awaits for the single value from the given publisher or the [default] value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher, or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrDefault` returns the default value instead of throwing in case there is an error; however, this would + * also mean that this method would return the default value if there are *too many* values. This could be confusing to + * those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * + * @throws NoSuchElementException if the publisher does not emit any value + * @throws IllegalArgumentException if the publisher emits more than one value */ +@Deprecated( + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "Please consider using awaitFirstOrDefault().", + level = DeprecationLevel.WARNING +) // Warning since 1.5, error in 1.6, hidden in 1.7 public suspend fun Publisher.awaitSingleOrDefault(default: T): T = awaitOne(Mode.SINGLE_OR_DEFAULT, default) /** - * Awaits for the single value from the given publisher or `null` value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher without blocking the thread and returns the resulting value, or, if + * this publisher has produced an error, throws the corresponding exception. If more than one value or none were + * produced by the publisher, `null` is returned. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * ### Deprecation * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrNull` returns `null` instead of throwing in case there is an error; however, this would + * also mean that this method would return `null` if there are *too many* values. This could be confusing to + * those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. + * + * @throws IllegalArgumentException if the publisher emits more than one value */ -public suspend fun Publisher.awaitSingleOrNull(): T = awaitOne(Mode.SINGLE_OR_DEFAULT) +@Deprecated( + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "There is a specialized version for Reactor's Mono, please use that where applicable. " + + "Alternatively, please consider using awaitFirstOrNull().", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()", "kotlinx.coroutines.reactor") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun Publisher.awaitSingleOrNull(): T? = awaitOne(Mode.SINGLE_OR_DEFAULT) /** - * Awaits for the single value from the given publisher or call [defaultValue] to get a value if none is emitted without blocking a thread and - * returns the resulting value or throws the corresponding exception if this publisher had produced error. + * Awaits the single value from the given publisher, or calls [defaultValue] to get a value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this publisher has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * ### Deprecation + * + * This method is deprecated because the conventions established in Kotlin mandate that an operation with the name + * `awaitSingleOrElse` returns the calculated value instead of throwing in case there is an error; however, this would + * also mean that this method would return the calculated value if there are *too many* values. This could be confusing + * to those who expect this function to validate that there is a single element or none at all emitted, and cases where + * there are no elements are indistinguishable from those where there are too many, though these cases have different + * meaning. * - * @throws NoSuchElementException if publisher does not emit any value - * @throws IllegalArgumentException if publisher emits more than one value + * @throws IllegalArgumentException if the publisher emits more than one value */ -public suspend fun Publisher.awaitSingleOrElse(defaultValue: () -> T): T = awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue() +@Deprecated( + message = "Deprecated without a replacement due to its name incorrectly conveying the behavior. " + + "Please consider using awaitFirstOrElse().", + level = DeprecationLevel.WARNING +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun Publisher.awaitSingleOrElse(defaultValue: () -> T): T = + awaitOne(Mode.SINGLE_OR_DEFAULT) ?: defaultValue() // ------------------------ private ------------------------ diff --git a/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt new file mode 100644 index 0000000000..6749423f80 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/AwaitTest.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import org.junit.* +import org.reactivestreams.* + +class AwaitTest: TestBase() { + + /** Tests that calls to [awaitFirst] (and, thus, to the rest of these functions) throw [CancellationException] and + * unsubscribe from the publisher when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val publisher = Publisher { s -> + s.onSubscribe(object: Subscription { + override fun request(n: Long) { + expect(3) + } + + override fun cancel() { + expect(5) + } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + publisher.awaitFirst() + } catch (e: CancellationException) { + expect(6) + throw e + } + } + expect(4) + job.cancelAndJoin() + finish(7) + } + +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api index 3a1c8b7d31..0a10aa12a9 100644 --- a/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api +++ b/reactive/kotlinx-coroutines-reactor/api/kotlinx-coroutines-reactor.api @@ -17,6 +17,13 @@ public final class kotlinx/coroutines/reactor/FluxKt { } public final class kotlinx/coroutines/reactor/MonoKt { + public static final fun awaitFirst (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrDefault (Lreactor/core/publisher/Mono;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrElse (Lreactor/core/publisher/Mono;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitFirstOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitLast (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lreactor/core/publisher/Mono;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun mono (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static final synthetic fun mono (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;)Lreactor/core/publisher/Mono; public static synthetic fun mono$default (Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lreactor/core/publisher/Mono; diff --git a/reactive/kotlinx-coroutines-reactor/src/Mono.kt b/reactive/kotlinx-coroutines-reactor/src/Mono.kt index 6e7b95ba6e..6a4a38f379 100644 --- a/reactive/kotlinx-coroutines-reactor/src/Mono.kt +++ b/reactive/kotlinx-coroutines-reactor/src/Mono.kt @@ -12,7 +12,6 @@ import org.reactivestreams.* import reactor.core.* import reactor.core.publisher.* import kotlin.coroutines.* -import kotlin.internal.* import kotlinx.coroutines.internal.* /** @@ -35,6 +34,50 @@ public fun mono( return monoInternal(GlobalScope, context, block) } +/** + * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, if + * this publisher has produced an error, throws the corresponding exception. If the Mono completed without a value, + * `null` is returned. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + */ +public suspend fun Mono.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + injectCoroutineContext(cont.context).subscribe(object : Subscriber { + private var seenValue = false + + override fun onSubscribe(s: Subscription) { + cont.invokeOnCancellation { s.cancel() } + s.request(Long.MAX_VALUE) + } + + override fun onComplete() { + if (!seenValue) cont.resume(null) + } + + override fun onNext(t: T) { + seenValue = true + cont.resume(t) + } + + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits the single value from the given [Mono] without blocking the thread and returns the resulting value, or, + * if this Mono has produced an error, throws the corresponding exception. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately cancels its [Subscription] and resumes with [CancellationException]. + * + * @throws NoSuchElementException if the Mono does not emit any value + */ +// TODO: consider using https://github.com/Kotlin/kotlinx.coroutines/issues/2607 once that lands +public suspend fun Mono.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + private fun monoInternal( scope: CoroutineScope, // support for legacy mono in scope context: CoroutineContext, @@ -92,3 +135,112 @@ public fun CoroutineScope.mono( block: suspend CoroutineScope.() -> T? ): Mono = monoInternal(this, context, block) +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirst] and [awaitSingle] await the first value, or throw [NoSuchElementException] if there is none, but + * the name [Mono.awaitSingle] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirst() // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirst] discards the extra elements, + * when in fact, at most a single value can be present. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingle() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingle()") +) // Warning since 1.5, error in 1.6 +public suspend fun Mono.awaitFirst(): T = awaitSingle() + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirstOrDefault] and [awaitSingleOrNull] await the first value, or return some special value if there + * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirstOrDefault(default) // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirstOrDefault] discards the extra + * elements, when in fact, at most a single value can be present. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6 +public suspend fun Mono.awaitFirstOrDefault(default: T): T = awaitSingleOrNull() ?: default + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirstOrNull] and [awaitSingleOrNull] await the first value, or return some special value if there + * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirstOrNull() // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirstOrNull] discards the extra + * elements, when in fact, at most a single value can be present. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6 +public suspend fun Mono.awaitFirstOrNull(): T? = awaitSingleOrNull() + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitFirstOrElse] and [awaitSingleOrNull] await the first value, or return some special value if there + * is none, but the name [Mono.awaitSingleOrNull] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitFirstOrElse(defaultValue) // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitFirstOrElse] discards the extra + * elements, when in fact, at most a single value can be present. + */ +@Deprecated( + message = "Mono produces at most one value, so the semantics of dropping the remaining elements are not useful. " + + "Please use awaitSingleOrNull() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: defaultValue()") +) // Warning since 1.5, error in 1.6 +public suspend fun Mono.awaitFirstOrElse(defaultValue: () -> T): T = awaitSingleOrNull() ?: defaultValue() + +/** + * This is a lint function that was added already deprecated in order to guard against confusing usages on [Mono]. + * On [Publisher] instances other than [Mono], this function is not deprecated. + * + * Both [awaitLast] and [awaitSingle] await the single value, or throw [NoSuchElementException] if there is none, but + * the name [Mono.awaitSingle] better reflects the semantics of [Mono]. + * + * For example, consider this code: + * ``` + * myDbClient.findById(uniqueId).awaitLast() // findById returns a `Mono` + * ``` + * It looks like more than one value could be returned from `findById` and [awaitLast] discards the initial elements, + * when in fact, at most a single value can be present. + */ +@Deprecated( + message = "Mono produces at most one value, so the last element is the same as the first. " + + "Please use awaitSingle() instead.", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingle()") +) // Warning since 1.5, error in 1.6 +public suspend fun Mono.awaitLast(): T = awaitSingle() \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt index be4b2c7d45..333f056d97 100644 --- a/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt +++ b/reactive/kotlinx-coroutines-reactor/src/ReactorContext.kt @@ -16,8 +16,8 @@ import kotlinx.coroutines.reactive.* * * This context element is implicitly propagated through subscriber's context by all Reactive integrations, such as [mono], [flux], * [Publisher.asFlow][asFlow], [Flow.asPublisher][asPublisher] and [Flow.asFlux][asFlux]. - * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][awaitFirst]) also propagate [ReactorContext] to the - * subscriber's [Context]. + * Functions that subscribe to the reactive stream (e.g. [Publisher.awaitFirst][kotlinx.coroutines.reactive.awaitFirst]) + * also propagate the [ReactorContext] to the subscriber's [Context]. ** * ### Examples of Reactive context integration. * diff --git a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt index dbe97b17d8..d8807385f0 100644 --- a/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt @@ -14,8 +14,8 @@ import kotlin.test.* class FlowAsFluxTest : TestBase() { @Test fun testFlowAsFluxContextPropagation() { - val flux = flow { - (1..4).forEach { i -> emit(createMono(i).awaitFirst()) } + val flux = flow { + (1..4).forEach { i -> emit(createMono(i).awaitSingle()) } } .asFlux() .contextWrite(Context.of(1, "1")) diff --git a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt index a98c514f19..421295d115 100644 --- a/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/MonoTest.kt @@ -13,7 +13,6 @@ import org.junit.Test import org.reactivestreams.* import reactor.core.publisher.* import reactor.util.context.* -import java.time.* import java.time.Duration.* import java.util.function.* import kotlin.test.* @@ -115,6 +114,52 @@ class MonoTest : TestBase() { @Test fun testMonoAwait() = runBlocking { assertEquals("OK", Mono.just("O").awaitSingle() + "K") + assertEquals("OK", Mono.just("O").awaitSingleOrNull() + "K") + assertFailsWith{ Mono.empty().awaitSingle() } + assertNull(Mono.empty().awaitSingleOrNull()) + } + + /** Tests that the versions of the await methods specialized for Mono for deprecation behave correctly and we don't + * break any code by introducing them. */ + @Test + @Suppress("DEPRECATION") + fun testDeprecatedAwaitMethods() = runBlocking { + val filledMono = mono { "OK" } + assertEquals("OK", filledMono.awaitFirst()) + assertEquals("OK", filledMono.awaitFirstOrDefault("!")) + assertEquals("OK", filledMono.awaitFirstOrNull()) + assertEquals("OK", filledMono.awaitFirstOrElse { "ELSE" }) + assertEquals("OK", filledMono.awaitLast()) + assertEquals("OK", filledMono.awaitSingleOrDefault("!")) + assertEquals("OK", filledMono.awaitSingleOrElse { "ELSE" }) + val emptyMono = mono { null } + assertFailsWith { emptyMono.awaitFirst() } + assertEquals("OK", emptyMono.awaitFirstOrDefault("OK")) + assertNull(emptyMono.awaitFirstOrNull()) + assertEquals("ELSE", emptyMono.awaitFirstOrElse { "ELSE" }) + assertFailsWith { emptyMono.awaitLast() } + assertEquals("OK", emptyMono.awaitSingleOrDefault("OK")) + assertEquals("ELSE", emptyMono.awaitSingleOrElse { "ELSE" }) + } + + /** Tests that calls to [awaitSingleOrNull] (and, thus, to the rest of such functions) throw [CancellationException] + * and unsubscribe from the publisher when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val mono = mono { delay(Long.MAX_VALUE) }.doOnSubscribe { expect(3) }.doOnCancel { expect(5) } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + mono.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(6) + throw e + } + } + expect(4) + job.cancelAndJoin() + finish(7) } @Test @@ -264,7 +309,7 @@ class MonoTest : TestBase() { .interval(ofMillis(1)) .switchMap { mono(coroutineContext) { - timeBomb().awaitFirst() + timeBomb().awaitSingle() } } .onErrorReturn({ @@ -275,14 +320,14 @@ class MonoTest : TestBase() { finish(2) } - private fun timeBomb() = Mono.delay(Duration.ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } + private fun timeBomb() = Mono.delay(ofMillis(1)).doOnSuccess { throw Exception("something went wrong") } @Test fun testLeakedException() = runBlocking { // Test exception is not reported to global handler val flow = mono { throw TestException() }.toFlux().asFlow() repeat(10000) { - combine(flow, flow) { _, _ -> Unit } + combine(flow, flow) { _, _ -> } .catch {} .collect { } } @@ -373,13 +418,13 @@ class MonoTest : TestBase() { Hooks.resetOnOperatorError("testDownstreamCancellationDoesNotThrow") } - /** Run the given [Publisher], cancel it, wait for the cancellation handler to finish, and return only then. + /** Run the given [Mono], cancel it, wait for the cancellation handler to finish, and return only then. * * Will not work in the general case, but here, when the publisher uses [Dispatchers.Unconfined], this seems to * ensure that the cancellation handler will have nowhere to execute but serially with the cancellation. */ - private suspend fun Publisher.awaitCancelAndJoin() = coroutineScope { + private suspend fun Mono.awaitCancelAndJoin() = coroutineScope { async(start = CoroutineStart.UNDISPATCHED) { - awaitFirstOrNull() + awaitSingleOrNull() }.cancelAndJoin() } } diff --git a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt index aff29241c9..577238be1d 100644 --- a/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt +++ b/reactive/kotlinx-coroutines-reactor/test/ReactorContextTest.kt @@ -20,7 +20,7 @@ class ReactorContextTest : TestBase() { } } .contextWrite(Context.of(2, "2", 3, "3", 4, "4", 5, "5")) .contextWrite { ctx -> ctx.put(6, "6") } - assertEquals(mono.awaitFirst(), "1234567") + assertEquals(mono.awaitSingle(), "1234567") } @Test @@ -43,22 +43,18 @@ class ReactorContextTest : TestBase() { (1..3).forEach { append(ctx.getOrDefault(it, "noValue")) } } } .contextWrite(Context.of(2, "2")) - .awaitFirst() + .awaitSingle() assertEquals(result, "123") } @Test fun testMonoAwaitContextPropagation() = runBlocking(Context.of(7, "7").asCoroutineContext()) { - assertEquals(createMono().awaitFirst(), "7") - assertEquals(createMono().awaitFirstOrDefault("noValue"), "7") - assertEquals(createMono().awaitFirstOrNull(), "7") - assertEquals(createMono().awaitFirstOrElse { "noValue" }, "7") - assertEquals(createMono().awaitLast(), "7") assertEquals(createMono().awaitSingle(), "7") + assertEquals(createMono().awaitSingleOrNull(), "7") } @Test - fun testFluxAwaitContextPropagation() = runBlocking( + fun testFluxAwaitContextPropagation() = runBlocking( Context.of(1, "1", 2, "2", 3, "3").asCoroutineContext() ) { assertEquals(createFlux().awaitFirst(), "1") diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index eb5e2fb4ad..c27ef4d796 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx2/RxAwaitKt { public static final fun awaitFirstOrNull (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitLast (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitOrDefault (Lio/reactivex/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitSingle (Lio/reactivex/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lio/reactivex/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/rx2/RxChannelKt { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt index 6e162c9a4d..d7b8ee26f6 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxAwait.kt @@ -15,11 +15,12 @@ import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** - * Awaits for completion of this completable without blocking a thread. - * Returns `Unit` or throws the corresponding exception if this completable had produced error. + * Awaits for completion of this completable without blocking the thread. + * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this - * suspending function is suspended, this function immediately resumes with [CancellationException]. + * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its + * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { @@ -31,6 +32,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine // ------------------------ MaybeSource ------------------------ +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver { + override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } + override fun onComplete() { cont.resume(null) } + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + * + * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. + */ +public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + /** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this @@ -39,9 +71,18 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. */ -@Suppress("UNCHECKED_CAST") -public suspend fun MaybeSource.await(): T? = (this as MaybeSource).awaitOrDefault(null) +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -51,25 +92,28 @@ public suspend fun MaybeSource.await(): T? = (this as MaybeSource).aw * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). */ -public suspend fun MaybeSource.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(default) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) -} +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** - * Awaits for completion of the single value without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this single had produced error. + * Awaits for completion of the single value response without blocking the thread. + * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver { @@ -82,69 +126,73 @@ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine // ------------------------ ObservableSource ------------------------ /** - * Awaits for the first value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, + * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun ObservableSource.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the + * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, + * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws + * the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() +public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = + awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the last value from the given [Observable] without blocking the thread and + * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun ObservableSource.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, + * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value - * @throws IllegalArgumentException if observable emits more than one value + * @throws NoSuchElementException if the observable does not emit any value + * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun ObservableSource.awaitSingle(): T = awaitOne(Mode.SINGLE) diff --git a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt index 298b32bf32..16f0005ba1 100644 --- a/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/CompletableTest.kt @@ -98,6 +98,31 @@ class CompletableTest : TestBase() { } } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is + * cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val completable = CompletableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + completable.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSuppressedException() = runTest { val completable = rxCompletable(currentDispatcher()) { @@ -119,7 +144,7 @@ class CompletableTest : TestBase() { } @Test - fun testUnhandledException() = runTest() { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null val handler = { e: Throwable -> @@ -165,8 +190,7 @@ class CompletableTest : TestBase() { withExceptionHandler(handler) { rxCompletable(Dispatchers.Unconfined) { expect(1) - 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 08427dcf3d..292f6187fa 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -7,13 +7,12 @@ package kotlinx.coroutines.rx2 import io.reactivex.* import io.reactivex.disposables.* import io.reactivex.exceptions.* -import io.reactivex.functions.* import io.reactivex.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -47,7 +46,7 @@ class MaybeTest : TestBase() { null } expect(2) - maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action { + maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, { expect(5) }) expect(3) @@ -112,18 +111,45 @@ class MaybeTest : TestBase() { @Test fun testMaybeAwait() = runBlocking { - assertEquals("OK", Maybe.just("O").await() + "K") + assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K") + assertEquals("OK", Maybe.just("O").awaitSingle() + "K") } @Test - fun testMaybeAwaitForNull() = runBlocking { - assertNull(Maybe.empty().await()) + fun testMaybeAwaitForNull(): Unit = runBlocking { + assertNull(Maybe.empty().awaitSingleOrNull()) + assertFailsWith { Maybe.empty().awaitSingle() } + } + + /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testMaybeAwaitCancellation() = runTest { + expect(1) + val maybe = MaybeSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + maybe.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) } @Test fun testMaybeEmitAndAwait() { val maybe = rxMaybe { - Maybe.just("O").await() + "K" + Maybe.just("O").awaitSingleOrNull() + "K" } checkMaybeValue(maybe) { @@ -205,7 +231,7 @@ class MaybeTest : TestBase() { @Test fun testCancelledConsumer() = runTest { expect(1) - val maybe = rxMaybe(currentDispatcher()) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) try { delay(Long.MAX_VALUE) @@ -241,7 +267,7 @@ class MaybeTest : TestBase() { } } try { - maybe.await() + maybe.awaitSingleOrNull() expectUnreached() } catch (e: TestException) { assertTrue(e.suppressed[0] is TestException2) @@ -301,7 +327,7 @@ class MaybeTest : TestBase() { rxMaybe(Dispatchers.Unconfined) { expect(1) 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt index 4454190f8f..e246407a17 100644 --- a/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableSingleTest.kt @@ -5,7 +5,9 @@ package kotlinx.coroutines.rx2 import io.reactivex.* +import io.reactivex.disposables.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val observable = rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrNull() ?: "OK") } @@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() { } } + /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of + * the subscription when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val observable = ObservableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + observable.awaitFirst() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + + @Test fun testExceptionFromObservable() { val observable = rxObservable { diff --git a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt index c66188a183..b359d963b8 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SingleTest.kt @@ -9,6 +9,7 @@ import io.reactivex.disposables.* import io.reactivex.exceptions.* import io.reactivex.functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -98,6 +99,31 @@ class SingleTest : TestBase() { assertEquals("OK", Single.just("O").await() + "K") } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testSingleAwaitCancellation() = runTest { + expect(1) + val single = SingleSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + single.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSingleEmitAndAwait() { val single = rxSingle { @@ -221,7 +247,7 @@ class SingleTest : TestBase() { fun testFatalExceptionInSingle() = runTest { rxSingle(Dispatchers.Unconfined) { throw LinkageError() - }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) } finish(2) } diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index 6d2dd63d2c..f6f3f1d06d 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -8,7 +8,9 @@ public final class kotlinx/coroutines/rx3/RxAwaitKt { public static final fun awaitFirstOrNull (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitLast (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitOrDefault (Lio/reactivex/rxjava3/core/MaybeSource;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingle (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun awaitSingle (Lio/reactivex/rxjava3/core/ObservableSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun awaitSingleOrNull (Lio/reactivex/rxjava3/core/MaybeSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/rx3/RxChannelKt { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt index 8ac0a10ce7..a4435b88e2 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxAwait.kt @@ -15,11 +15,12 @@ import kotlin.coroutines.* // ------------------------ CompletableSource ------------------------ /** - * Awaits for completion of this completable without blocking a thread. - * Returns `Unit` or throws the corresponding exception if this completable had produced error. + * Awaits for completion of this completable without blocking the thread. + * Returns `Unit`, or throws the corresponding exception if this completable produces an error. * * This suspending function is cancellable. If the [Job] of the invoking coroutine is cancelled or completed while this - * suspending function is suspended, this function immediately resumes with [CancellationException]. + * suspending function is suspended, this function immediately resumes with [CancellationException] and disposes of its + * subscription. */ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine { cont -> subscribe(object : CompletableObserver { @@ -31,6 +32,37 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine // ------------------------ MaybeSource ------------------------ +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or `null` if no value is produced, or throws the corresponding exception if this + * [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + */ +@Suppress("UNCHECKED_CAST") +public suspend fun MaybeSource.awaitSingleOrNull(): T? = suspendCancellableCoroutine { cont -> + subscribe(object : MaybeObserver { + override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } + override fun onComplete() { cont.resume(null) } + override fun onSuccess(t: T) { cont.resume(t) } + override fun onError(error: Throwable) { cont.resumeWithException(error) } + }) +} + +/** + * Awaits for completion of the [MaybeSource] without blocking the thread. + * Returns the resulting value, or throws if either no value is produced or this [MaybeSource] produces an error. + * + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this + * function immediately resumes with [CancellationException] and disposes of its subscription. + * + * @throws NoSuchElementException if no elements were produced by this [MaybeSource]. + */ +public suspend fun MaybeSource.awaitSingle(): T = awaitSingleOrNull() ?: throw NoSuchElementException() + /** * Awaits for completion of the maybe without blocking a thread. * Returns the resulting value, null if no value was produced or throws the corresponding exception if this @@ -39,9 +71,18 @@ public suspend fun CompletableSource.await(): Unit = suspendCancellableCoroutine * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] in order to reflect that `null` can be returned to denote the absence of + * a value, as opposed to throwing in such case. */ -@Suppress("UNCHECKED_CAST") -public suspend fun MaybeSource.await(): T? = (this as MaybeSource).awaitOrDefault(null) +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull()") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun MaybeSource.await(): T? = awaitSingleOrNull() /** * Awaits for completion of the maybe without blocking a thread. @@ -51,25 +92,28 @@ public suspend fun MaybeSource.await(): T? = (this as MaybeSource).aw * This suspending function is cancellable. * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function * immediately resumes with [CancellationException]. + * + * ### Deprecation + * + * Deprecated in favor of [awaitSingleOrNull] for naming consistency (see the deprecation of [MaybeSource.await] for + * details). */ -public suspend fun MaybeSource.awaitOrDefault(default: T): T = suspendCancellableCoroutine { cont -> - subscribe(object : MaybeObserver { - override fun onSubscribe(d: Disposable) { cont.disposeOnCancellation(d) } - override fun onComplete() { cont.resume(default) } - override fun onSuccess(t: T) { cont.resume(t) } - override fun onError(error: Throwable) { cont.resumeWithException(error) } - }) -} +@Deprecated( + message = "Deprecated in favor of awaitSingleOrNull()", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("this.awaitSingleOrNull() ?: default") +) // Warning since 1.5, error in 1.6, hidden in 1.7 +public suspend fun MaybeSource.awaitOrDefault(default: T): T = awaitSingleOrNull() ?: default // ------------------------ SingleSource ------------------------ /** - * Awaits for completion of the single value without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this single had produced error. + * Awaits for completion of the single value response without blocking the thread. + * Returns the resulting value, or throws the corresponding exception if this response produces an error. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine { cont -> subscribe(object : SingleObserver { @@ -82,69 +126,73 @@ public suspend fun SingleSource.await(): T = suspendCancellableCoroutine // ------------------------ ObservableSource ------------------------ /** - * Awaits for the first value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable] without blocking the thread and returns the resulting value, or, + * if the observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun ObservableSource.awaitFirst(): T = awaitOne(Mode.FIRST) /** - * Awaits for the first value from the given observable or the [default] value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns the [default] value if none is emitted, without + * blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws the + * corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrDefault(default: T): T = awaitOne(Mode.FIRST_OR_DEFAULT, default) /** - * Awaits for the first value from the given observable or `null` value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or returns `null` if none is emitted, without blocking the + * thread, and returns the resulting value, or, if this observable has produced an error, throws the corresponding + * exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ public suspend fun ObservableSource.awaitFirstOrNull(): T? = awaitOne(Mode.FIRST_OR_DEFAULT) /** - * Awaits for the first value from the given observable or call [defaultValue] to get a value if none is emitted without blocking a - * thread and returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the first value from the given [Observable], or calls [defaultValue] to get a value if none is emitted, + * without blocking the thread, and returns the resulting value, or, if this observable has produced an error, throws + * the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. */ -public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() +public suspend fun ObservableSource.awaitFirstOrElse(defaultValue: () -> T): T = + awaitOne(Mode.FIRST_OR_DEFAULT) ?: defaultValue() /** - * Awaits for the last value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the last value from the given [Observable] without blocking the thread and + * returns the resulting value, or, if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value + * @throws NoSuchElementException if the observable does not emit any value */ public suspend fun ObservableSource.awaitLast(): T = awaitOne(Mode.LAST) /** - * Awaits for the single value from the given observable without blocking a thread. - * Returns the resulting value or throws the corresponding exception if this observable had produced error. + * Awaits the single value from the given observable without blocking the thread and returns the resulting value, or, + * if this observable has produced an error, throws the corresponding exception. * * This suspending function is cancellable. - * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function - * immediately resumes with [CancellationException]. + * If the [Job] of the current coroutine is cancelled or completed while the suspending function is waiting, this + * function immediately disposes of its subscription and resumes with [CancellationException]. * - * @throws NoSuchElementException if observable does not emit any value - * @throws IllegalArgumentException if observable emits more than one value + * @throws NoSuchElementException if the observable does not emit any value + * @throws IllegalArgumentException if the observable emits more than one value */ public suspend fun ObservableSource.awaitSingle(): T = awaitOne(Mode.SINGLE) @@ -218,3 +266,4 @@ private suspend fun ObservableSource.awaitOne( } }) } + diff --git a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt index e5399d16d1..cfdb6d4170 100644 --- a/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/CompletableTest.kt @@ -98,6 +98,31 @@ class CompletableTest : TestBase() { } } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their [Job] is + * cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val completable = CompletableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + completable.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSuppressedException() = runTest { val completable = rxCompletable(currentDispatcher()) { @@ -119,7 +144,7 @@ class CompletableTest : TestBase() { } @Test - fun testUnhandledException() = runTest() { + fun testUnhandledException() = runTest { expect(1) var disposable: Disposable? = null val handler = { e: Throwable -> @@ -165,8 +190,7 @@ class CompletableTest : TestBase() { withExceptionHandler(handler) { rxCompletable(Dispatchers.Unconfined) { expect(1) - 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt index e0cec748f8..bdb5481d80 100644 --- a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt @@ -7,13 +7,12 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* -import io.reactivex.rxjava3.functions.* import io.reactivex.rxjava3.internal.functions.Functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* -import java.util.concurrent.CancellationException import kotlin.test.* class MaybeTest : TestBase() { @@ -47,7 +46,7 @@ class MaybeTest : TestBase() { null } expect(2) - maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, Action { + maybe.subscribe (emptyConsumer(), ON_ERROR_MISSING, { expect(5) }) expect(3) @@ -112,18 +111,45 @@ class MaybeTest : TestBase() { @Test fun testMaybeAwait() = runBlocking { - assertEquals("OK", Maybe.just("O").await() + "K") + assertEquals("OK", Maybe.just("O").awaitSingleOrNull() + "K") + assertEquals("OK", Maybe.just("O").awaitSingle() + "K") } @Test - fun testMaybeAwaitForNull() = runBlocking { - assertNull(Maybe.empty().await()) + fun testMaybeAwaitForNull(): Unit = runBlocking { + assertNull(Maybe.empty().awaitSingleOrNull()) + assertFailsWith { Maybe.empty().awaitSingle() } + } + + /** Tests that calls to [awaitSingleOrNull] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testMaybeAwaitCancellation() = runTest { + expect(1) + val maybe = MaybeSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + maybe.awaitSingleOrNull() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) } @Test fun testMaybeEmitAndAwait() { val maybe = rxMaybe { - Maybe.just("O").await() + "K" + Maybe.just("O").awaitSingleOrNull() + "K" } checkMaybeValue(maybe) { @@ -205,7 +231,7 @@ class MaybeTest : TestBase() { @Test fun testCancelledConsumer() = runTest { expect(1) - val maybe = rxMaybe(currentDispatcher()) { + val maybe = rxMaybe(currentDispatcher()) { expect(4) try { delay(Long.MAX_VALUE) @@ -241,7 +267,7 @@ class MaybeTest : TestBase() { } } try { - maybe.await() + maybe.awaitSingleOrNull() expectUnreached() } catch (e: TestException) { assertTrue(e.suppressed[0] is TestException2) @@ -301,7 +327,7 @@ class MaybeTest : TestBase() { rxMaybe(Dispatchers.Unconfined) { expect(1) 42 - }.subscribe({ throw LinkageError() }) + }.subscribe { throw LinkageError() } finish(3) } } diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt index 2a3ce04638..692f014418 100644 --- a/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableSingleTest.kt @@ -5,7 +5,9 @@ package kotlinx.coroutines.rx3 import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -101,7 +103,7 @@ class ObservableSingleTest : TestBase() { @Test fun testAwaitFirstOrNull() { - val observable = rxObservable { + val observable = rxObservable { send(Observable.empty().awaitFirstOrNull() ?: "OK") } @@ -154,6 +156,32 @@ class ObservableSingleTest : TestBase() { } } + /** Tests that calls to [awaitFirst] (and, thus, the other methods) throw [CancellationException] and dispose of + * the subscription when their [Job] is cancelled. */ + @Test + fun testAwaitCancellation() = runTest { + expect(1) + val observable = ObservableSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + observable.awaitFirst() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + + @Test fun testExceptionFromObservable() { val observable = rxObservable { diff --git a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt index 46bcaf84dc..3e763aa58c 100644 --- a/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/SingleTest.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.* import io.reactivex.rxjava3.exceptions.* import io.reactivex.rxjava3.functions.* import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException import org.junit.* import org.junit.Test import java.util.concurrent.* @@ -98,6 +99,31 @@ class SingleTest : TestBase() { assertEquals("OK", Single.just("O").await() + "K") } + /** Tests that calls to [await] throw [CancellationException] and dispose of the subscription when their + * [Job] is cancelled. */ + @Test + fun testSingleAwaitCancellation() = runTest { + expect(1) + val single = SingleSource { s -> + s.onSubscribe(object: Disposable { + override fun dispose() { expect(4) } + override fun isDisposed(): Boolean { expectUnreached(); return false } + }) + } + val job = launch(start = CoroutineStart.UNDISPATCHED) { + try { + expect(2) + single.await() + } catch (e: CancellationException) { + expect(5) + throw e + } + } + expect(3) + job.cancelAndJoin() + finish(6) + } + @Test fun testSingleEmitAndAwait() { val single = rxSingle { @@ -221,7 +247,7 @@ class SingleTest : TestBase() { fun testFatalExceptionInSingle() = runTest { rxSingle(Dispatchers.Unconfined) { throw LinkageError() - }.subscribe({ _, e -> assertTrue(e is LinkageError); expect(1) }) + }.subscribe { _, e -> assertTrue(e is LinkageError); expect(1) } finish(2) }