From c1098c7e99e727c5ef911db0effeeeca418447c2 Mon Sep 17 00:00:00 2001 From: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> Date: Thu, 22 Apr 2021 14:41:48 +0300 Subject: [PATCH] Fixes for the reactive integrations (#2617) Reworked the comments, added new tests. Fixed a bug where `Maybe.collect()` would hang on success. --- .../kotlinx-coroutines-jdk9/src/Publish.kt | 29 ++-- .../src/ReactiveFlow.kt | 36 +++-- .../test/PublisherCollectTest.kt | 146 ++++++++++++++++++ .../src/Channel.kt | 14 +- .../src/Publish.kt | 21 +-- .../src/ReactiveFlow.kt | 20 +-- .../test/PublisherCollectTest.kt | 144 +++++++++++++++++ .../kotlinx-coroutines-rx2/src/RxChannel.kt | 12 +- .../kotlinx-coroutines-rx2/test/MaybeTest.kt | 50 ++++++ .../test/ObservableCollectTest.kt | 69 +++++++++ .../kotlinx-coroutines-rx3/src/RxChannel.kt | 12 +- .../kotlinx-coroutines-rx3/test/MaybeTest.kt | 52 ++++++- .../test/ObservableCollectTest.kt | 68 ++++++++ 13 files changed, 610 insertions(+), 63 deletions(-) create mode 100644 reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt create mode 100644 reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt create mode 100644 reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt create mode 100644 reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt diff --git a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt index 3db0d5da4c..cfd18d2b7a 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/Publish.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/Publish.kt @@ -6,33 +6,34 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +import kotlinx.coroutines.reactive.* import java.util.concurrent.* import kotlin.coroutines.* import org.reactivestreams.FlowAdapters /** - * Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine. + * Creates a cold reactive [Flow.Publisher] that runs a given [block] in a coroutine. + * * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * The coroutine emits (via [Flow.Subscriber.onNext]) values with [send][ProducerScope.send], + * completes (via [Flow.Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits + * errors (via [Flow.Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Flow.Subscriber.onNext] is not invoked concurrently. * * Coroutine context can be specified with [context] argument. - * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is + * used. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ -@ExperimentalCoroutinesApi // Since 1.3.x +@ExperimentalCoroutinesApi public fun flowPublish( context: CoroutineContext = EmptyCoroutineContext, @BuilderInference block: suspend ProducerScope.() -> Unit -): Flow.Publisher { - val reactivePublisher : org.reactivestreams.Publisher = kotlinx.coroutines.reactive.publish(context, block) - return FlowAdapters.toFlowPublisher(reactivePublisher) -} +): Flow.Publisher = FlowAdapters.toFlowPublisher(publish(context, block)) diff --git a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt index d3942a1629..6031e0a8ad 100644 --- a/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt @@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9 import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlinx.coroutines.reactive.asFlow -import kotlinx.coroutines.reactive.asPublisher +import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher import kotlinx.coroutines.reactive.collect +import kotlinx.coroutines.channels.* import org.reactivestreams.* import kotlin.coroutines.* import java.util.concurrent.Flow as JFlow /** - * Transforms the given reactive [Publisher] into [Flow]. - * Use [buffer] operator on the resulting flow to specify the size of the backpressure. - * More precisely, it specifies the value of the subscription's [request][Subscription.request]. - * [buffer] default capacity is used by default. + * Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow]. + * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure. + * In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request]. + * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default. * - * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements - * are discarded. + * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight + * elements are discarded. */ public fun JFlow.Publisher.asFlow(): Flow = - FlowAdapters.toPublisher(this).asFlow() + FlowAdapters.toPublisher(this).asFlow() /** - * Transforms the given flow to a reactive specification compliant [Publisher]. + * Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher]. * - * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. - * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber] + * methods. + * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ @JvmOverloads // binary compatibility -public fun Flow.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher { - val reactivePublisher : org.reactivestreams.Publisher = this.asPublisher(context) - return FlowAdapters.toFlowPublisher(reactivePublisher) -} +public fun Flow.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher = + FlowAdapters.toFlowPublisher(asReactivePublisher(context)) /** - * Subscribes to this [Publisher] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the publisher signals an error, that error is rethrown from [collect]. */ public suspend inline fun JFlow.Publisher.collect(action: (T) -> Unit): Unit = FlowAdapters.toPublisher(this).collect(action) diff --git a/reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt b/reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt new file mode 100644 index 0000000000..c2e88483a7 --- /dev/null +++ b/reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt @@ -0,0 +1,146 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.jdk9 + +import kotlinx.coroutines.* +import kotlinx.coroutines.reactive.* +import org.junit.Test +import org.reactivestreams.* +import kotlin.test.* +import java.util.concurrent.Flow as JFlow + +class PublisherCollectTest: TestBase() { + + /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */ + @Test + fun testCollect() = runTest { + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = JFlow.Publisher { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: JFlow.Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + if (lastOutput == x) + subscriber.onComplete() + } + + override fun cancel() { + /** According to rule 3.5 of the + * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5), + * this method can be called by the subscriber at any point, so it's not an error if it's called + * in this scenario. */ + } + + }) + } + var sum = 0 + publisher.collect { + sum += it + } + assertEquals(xSum, sum) + } + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testCollectThrowingPublisher() = runTest { + val errorString = "Too many elements requested" + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + if (lastOutput == x) + subscriber.onError(IllegalArgumentException(errorString)) + } + + override fun cancel() { + /** See the comment for the corresponding part of [testCollect]. */ + } + + }) + } + var sum = 0 + try { + publisher.collect { + sum += it + } + } catch (e: IllegalArgumentException) { + assertEquals(errorString, e.message) + } + assertEquals(xSum, sum) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testCollectThrowingAction() = runTest { + val errorString = "Too many elements produced" + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + } + + override fun cancel() { + assertEquals(x, lastOutput) + expect(x + 2) + } + + }) + } + var sum = 0 + try { + expect(1) + var i = 1 + publisher.collect { + sum += it + i += 1 + expect(i) + if (sum >= xSum) { + throw IllegalArgumentException(errorString) + } + } + } catch (e: IllegalArgumentException) { + expect(x + 3) + assertEquals(errorString, e.message) + } + finish(x + 4) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactive/src/Channel.kt b/reactive/kotlinx-coroutines-reactive/src/Channel.kt index d9c9b91a2c..b7fbf134c5 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Channel.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Channel.kt @@ -11,10 +11,10 @@ import kotlinx.coroutines.internal.* import org.reactivestreams.* /** - * Subscribes to this [Publisher] and returns a channel to receive elements emitted by it. - * The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher. + * Subscribes to this [Publisher] and returns a channel to receive the elements emitted by it. + * The resulting channel needs to be [cancelled][ReceiveChannel.cancel] in order to unsubscribe from this publisher. - * @param request how many items to request from publisher in advance (optional, one by default). + * @param request how many items to request from the publisher in advance (optional, a single element by default). * * This method is deprecated in the favor of [Flow]. * Instead of iterating over the resulting channel please use [collect][Flow.collect]: @@ -35,7 +35,9 @@ public fun Publisher.openSubscription(request: Int = 1): ReceiveChannel Publisher.collect(action: (T) -> Unit): Unit = toChannel().consumeEach(action) @@ -61,7 +63,7 @@ private class SubscriptionChannel( // can be negative if we have receivers, but no subscription yet private val _requested = atomic(0) - // AbstractChannel overrides + // --------------------- AbstractChannel overrides ------------------------------- @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER") override fun onReceiveEnqueued() { _requested.loop { wasRequested -> @@ -89,7 +91,7 @@ private class SubscriptionChannel( _subscription.getAndSet(null)?.cancel() // cancel exactly once } - // Subscriber overrides + // --------------------- Subscriber overrides ------------------------------- override fun onSubscribe(s: Subscription) { _subscription.value = s while (true) { // lock-free loop on _requested diff --git a/reactive/kotlinx-coroutines-reactive/src/Publish.kt b/reactive/kotlinx-coroutines-reactive/src/Publish.kt index 37113849ab..8b94b95986 100644 --- a/reactive/kotlinx-coroutines-reactive/src/Publish.kt +++ b/reactive/kotlinx-coroutines-reactive/src/Publish.kt @@ -12,22 +12,25 @@ import org.reactivestreams.* import kotlin.coroutines.* /** - * Creates cold reactive [Publisher] that runs a given [block] in a coroutine. + * Creates a cold reactive [Publisher] that runs a given [block] in a coroutine. + * * Every time the returned flux is subscribed, it starts a new coroutine in the specified [context]. - * Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete]) - * when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError]) - * if coroutine throws an exception or closes channel with a cause. - * Unsubscribing cancels running coroutine. + * The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send], + * completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits + * errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause. + * Unsubscribing cancels the running coroutine. * - * Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that - * `onNext` is not invoked concurrently. + * Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to + * ensure that [onNext][Subscriber.onNext] is not invoked concurrently. * * Coroutine context can be specified with [context] argument. - * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used. - * Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance. + * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is + * used. * * **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect * to cancellation and error handling may change in the future. + * + * @throws IllegalArgumentException if the provided [context] contains a [Job] instance. */ @ExperimentalCoroutinesApi public fun publish( diff --git a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt index 1f197f94ed..1a527a3c2b 100644 --- a/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt +++ b/reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt @@ -17,12 +17,12 @@ import kotlinx.coroutines.internal.* /** * Transforms the given reactive [Publisher] into [Flow]. - * Use [buffer] operator on the resulting flow to specify the size of the backpressure. - * More precisely, it specifies the value of the subscription's [request][Subscription.request]. - * [buffer] default capacity is used by default. + * Use the [buffer] operator on the resulting flow to specify the size of the back-pressure. + * In effect, it specifies the value of the subscription's [request][Subscription.request]. + * The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default. * - * If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements - * are discarded. + * If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight + * elements are discarded. * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, * see its documentation for additional details. @@ -31,13 +31,13 @@ public fun Publisher.asFlow(): Flow = PublisherAsFlow(this) /** - * Transforms the given flow to a reactive specification compliant [Publisher]. + * Transforms the given flow into a reactive specification compliant [Publisher]. * * This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module, * see its documentation for additional details. * - * An optional [context] can be specified to control the execution context of calls to [Subscriber] methods. - * You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to + * An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods. + * A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to * inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher * is used, so calls are performed from an arbitrary thread. */ @@ -55,8 +55,8 @@ private class PublisherAsFlow( PublisherAsFlow(publisher, context, capacity, onBufferOverflow) /* - * Suppress for Channel.CHANNEL_DEFAULT_CAPACITY. - * It's too counter-intuitive to be public and moving it to Flow companion + * The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY. + * It's too counter-intuitive to be public, and moving it to Flow companion * will also create undesired effect. */ @Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") diff --git a/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt b/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt new file mode 100644 index 0000000000..e4753f0467 --- /dev/null +++ b/reactive/kotlinx-coroutines-reactive/test/PublisherCollectTest.kt @@ -0,0 +1,144 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.reactive + +import kotlinx.coroutines.* +import org.junit.Test +import org.reactivestreams.* +import kotlin.test.* + +class PublisherCollectTest: TestBase() { + + /** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */ + @Test + fun testCollect() = runTest { + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + if (lastOutput == x) + subscriber.onComplete() + } + + override fun cancel() { + /** According to rule 3.5 of the + * [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5), + * this method can be called by the subscriber at any point, so it's not an error if it's called + * in this scenario. */ + } + + }) + } + var sum = 0 + publisher.collect { + sum += it + } + assertEquals(xSum, sum) + } + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testCollectThrowingPublisher() = runTest { + val errorString = "Too many elements requested" + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + if (lastOutput == x) + subscriber.onError(IllegalArgumentException(errorString)) + } + + override fun cancel() { + /** See the comment for the corresponding part of [testCollect]. */ + } + + }) + } + var sum = 0 + try { + publisher.collect { + sum += it + } + } catch (e: IllegalArgumentException) { + assertEquals(errorString, e.message) + } + assertEquals(xSum, sum) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testCollectThrowingAction() = runTest { + val errorString = "Too many elements produced" + val x = 100 + val xSum = x * (x + 1) / 2 + val publisher = Publisher { subscriber -> + var requested = 0L + var lastOutput = 0 + subscriber.onSubscribe(object: Subscription { + + override fun request(n: Long) { + requested += n + if (n <= 0) { + subscriber.onError(IllegalArgumentException()) + return + } + while (lastOutput < x && lastOutput < requested) { + lastOutput += 1 + subscriber.onNext(lastOutput) + } + } + + override fun cancel() { + assertEquals(x, lastOutput) + expect(x + 2) + } + + }) + } + var sum = 0 + try { + expect(1) + var i = 1 + publisher.collect { + sum += it + i += 1 + expect(i) + if (sum >= xSum) { + throw IllegalArgumentException(errorString) + } + } + } catch (e: IllegalArgumentException) { + expect(x + 3) + assertEquals(errorString, e.message) + } + finish(x + 4) + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt index 3bc50c8d57..51a5d54e1c 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxChannel.kt @@ -10,6 +10,7 @@ import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.reactive.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -41,14 +42,18 @@ public fun ObservableSource.openSubscription(): ReceiveChannel { /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit = toChannel().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): Unit = toChannel().consumeEach(action) @@ -84,7 +89,8 @@ private class SubscriptionChannel : } override fun onSuccess(t: T) { - trySend(t) // Safe to ignore return value here, expectedly racing with cancellation + trySend(t) + close(cause = null) } override fun onNext(t: T) { diff --git a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt index 292f6187fa..f5d128d311 100644 --- a/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/MaybeTest.kt @@ -254,6 +254,56 @@ class MaybeTest : TestBase() { finish(7) } + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectEmpty() = runTest { + expect(1) + Maybe.empty().collect { + expectUnreached() + } + finish(2) + } + + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectSingle() = runTest { + expect(1) + Maybe.just("OK").collect { + assertEquals("OK", it) + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the Maybe raises an error. */ + @Test + fun testMaybeCollectThrowingMaybe() = runTest { + expect(1) + try { + Maybe.error(TestException()).collect { + expectUnreached() + } + } catch (e: TestException) { + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testMaybeCollectThrowingAction() = runTest { + expect(1) + try { + Maybe.just("OK").collect { + expect(2) + throw TestException() + } + } catch (e: TestException) { + expect(3) + } + finish(4) + } + @Test fun testSuppressedException() = runTest { val maybe = rxMaybe(currentDispatcher()) { diff --git a/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt new file mode 100644 index 0000000000..508f594ac3 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/ObservableCollectTest.kt @@ -0,0 +1,69 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.* +import io.reactivex.disposables.* +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class ObservableCollectTest: TestBase() { + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testObservableCollectThrowingObservable() = runTest { + expect(1) + var sum = 0 + try { + rxObservable { + for (i in 0..100) { + send(i) + } + throw TestException() + }.collect { + sum += it + } + } catch (e: TestException) { + assertTrue(sum > 0) + finish(2) + } + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testObservableCollectThrowingAction() = runTest { + expect(1) + var sum = 0 + val expectedSum = 5 + try { + var disposed = false + ObservableSource { observer -> + launch(Dispatchers.Default) { + observer.onSubscribe(object : Disposable { + override fun dispose() { + disposed = true + expect(expectedSum + 2) + } + + override fun isDisposed(): Boolean = disposed + }) + while (!disposed) { + observer.onNext(1) + } + } + }.collect { + expect(sum + 2) + sum += it + if (sum == expectedSum) { + throw TestException() + } + } + } catch (e: TestException) { + assertEquals(expectedSum, sum) + finish(expectedSum + 3) + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt index ad780f7504..21238d2491 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxChannel.kt @@ -9,6 +9,7 @@ import io.reactivex.rxjava3.disposables.* import kotlinx.atomicfu.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.internal.* +import kotlinx.coroutines.flow.* /** * Subscribes to this [MaybeSource] and returns a channel to receive elements emitted by it. @@ -40,14 +41,18 @@ internal fun ObservableSource.openSubscription(): ReceiveChannel { /** * Subscribes to this [MaybeSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point or if the [MaybeSource] raises an error, the exception is rethrown from + * [collect]. */ public suspend inline fun MaybeSource.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) /** * Subscribes to this [ObservableSource] and performs the specified action for each received element. - * Cancels subscription if any exception happens during collect. + * + * If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from + * [collect]. Also, if the [ObservableSource] signals an error, that error is rethrown from [collect]. */ public suspend inline fun ObservableSource.collect(action: (T) -> Unit): Unit = openSubscription().consumeEach(action) @@ -69,7 +74,8 @@ private class SubscriptionChannel : } override fun onSuccess(t: T) { - trySend(t) // Safe to ignore return value here, expectedly racing with cancellation + trySend(t) + close(cause = null) } override fun onNext(t: T) { diff --git a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt index bdb5481d80..bea939efde 100644 --- a/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/MaybeTest.kt @@ -84,7 +84,7 @@ class MaybeTest : TestBase() { expectUnreached() } expect(2) - // nothing is called on a disposed rx3 maybe + // nothing is called on a disposed rx2 maybe val sub = maybe.subscribe({ expectUnreached() }, { @@ -254,6 +254,56 @@ class MaybeTest : TestBase() { finish(7) } + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectEmpty() = runTest { + expect(1) + Maybe.empty().collect { + expectUnreached() + } + finish(2) + } + + /** Tests the simple scenario where the Maybe doesn't output a value. */ + @Test + fun testMaybeCollectSingle() = runTest { + expect(1) + Maybe.just("OK").collect { + assertEquals("OK", it) + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the Maybe raises an error. */ + @Test + fun testMaybeCollectThrowingMaybe() = runTest { + expect(1) + try { + Maybe.error(TestException()).collect { + expectUnreached() + } + } catch (e: TestException) { + expect(2) + } + finish(3) + } + + /** Tests the behavior of [collect] when the action throws. */ + @Test + fun testMaybeCollectThrowingAction() = runTest { + expect(1) + try { + Maybe.just("OK").collect { + expect(2) + throw TestException() + } + } catch (e: TestException) { + expect(3) + } + finish(4) + } + @Test fun testSuppressedException() = runTest { val maybe = rxMaybe(currentDispatcher()) { diff --git a/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt new file mode 100644 index 0000000000..680786f9b3 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/ObservableCollectTest.kt @@ -0,0 +1,68 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import io.reactivex.rxjava3.core.ObservableSource +import io.reactivex.rxjava3.disposables.* +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.test.* + +class ObservableCollectTest: TestBase() { + + /** Tests the behavior of [collect] when the publisher raises an error. */ + @Test + fun testObservableCollectThrowingObservable() = runTest { + expect(1) + var sum = 0 + try { + rxObservable { + for (i in 0..100) { + send(i) + } + throw TestException() + }.collect { + sum += it + } + } catch (e: TestException) { + assertTrue(sum > 0) + finish(2) + } + } + + @Test + fun testObservableCollectThrowingAction() = runTest { + expect(1) + var sum = 0 + val expectedSum = 5 + try { + var disposed = false + ObservableSource { observer -> + launch(Dispatchers.Default) { + observer.onSubscribe(object : Disposable { + override fun dispose() { + disposed = true + expect(expectedSum + 2) + } + + override fun isDisposed(): Boolean = disposed + }) + while (!disposed) { + observer.onNext(1) + } + } + }.collect { + expect(sum + 2) + sum += it + if (sum == expectedSum) { + throw TestException() + } + } + } catch (e: TestException) { + assertEquals(expectedSum, sum) + finish(expectedSum + 3) + } + } +} \ No newline at end of file