Skip to content

Commit

Permalink
Fixes for the reactive integrations (Kotlin#2617)
Browse files Browse the repository at this point in the history
Reworked the comments, added new tests.
Fixed a bug where `Maybe.collect()` would hang on success.
  • Loading branch information
dkhalanskyjb authored and pablobaxter committed Sep 14, 2022
1 parent 1cdc50d commit c1098c7
Show file tree
Hide file tree
Showing 13 changed files with 610 additions and 63 deletions.
29 changes: 15 additions & 14 deletions reactive/kotlinx-coroutines-jdk9/src/Publish.kt
Expand Up @@ -6,33 +6,34 @@ package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.reactive.*
import java.util.concurrent.*
import kotlin.coroutines.*
import org.reactivestreams.FlowAdapters

/**
* Creates cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
* Creates a cold reactive [Flow.Publisher] that runs a given [block] in a coroutine.
*
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
* The coroutine emits (via [Flow.Subscriber.onNext]) values with [send][ProducerScope.send],
* completes (via [Flow.Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
* errors (via [Flow.Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
* ensure that [onNext][Flow.Subscriber.onNext] is not invoked concurrently.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
* used.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi // Since 1.3.x
@ExperimentalCoroutinesApi
public fun <T> flowPublish(
context: CoroutineContext = EmptyCoroutineContext,
@BuilderInference block: suspend ProducerScope<T>.() -> Unit
): Flow.Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = kotlinx.coroutines.reactive.publish<T>(context, block)
return FlowAdapters.toFlowPublisher(reactivePublisher)
}
): Flow.Publisher<T> = FlowAdapters.toFlowPublisher(publish(context, block))
36 changes: 19 additions & 17 deletions reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
Expand Up @@ -7,41 +7,43 @@ package kotlinx.coroutines.jdk9
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.asPublisher as asReactivePublisher
import kotlinx.coroutines.reactive.collect
import kotlinx.coroutines.channels.*
import org.reactivestreams.*
import kotlin.coroutines.*
import java.util.concurrent.Flow as JFlow

/**
* Transforms the given reactive [Publisher] into [Flow].
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
* [buffer] default capacity is used by default.
* Transforms the given reactive [Flow Publisher][JFlow.Publisher] into [Flow].
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
* In effect, it specifies the value of the subscription's [request][JFlow.Subscription.request].
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
* are discarded.
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
* elements are discarded.
*/
public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =
FlowAdapters.toPublisher(this).asFlow()
FlowAdapters.toPublisher(this).asFlow()

/**
* Transforms the given flow to a reactive specification compliant [Publisher].
* Transforms the given flow into a reactive specification compliant [Flow Publisher][JFlow.Publisher].
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* An optional [context] can be specified to control the execution context of calls to the [Flow Subscriber][Subscriber]
* methods.
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
@JvmOverloads // binary compatibility
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
return FlowAdapters.toFlowPublisher(reactivePublisher)
}
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> =
FlowAdapters.toFlowPublisher(asReactivePublisher(context))

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
* Subscribes to this [Flow Publisher][JFlow.Publisher] and performs the specified action for each received element.
*
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> JFlow.Publisher<T>.collect(action: (T) -> Unit): Unit =
FlowAdapters.toPublisher(this).collect(action)
146 changes: 146 additions & 0 deletions reactive/kotlinx-coroutines-jdk9/test/PublisherCollectTest.kt
@@ -0,0 +1,146 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import org.reactivestreams.*
import kotlin.test.*
import java.util.concurrent.Flow as JFlow

class PublisherCollectTest: TestBase() {

/** Tests the simple scenario where the publisher outputs a bounded stream of values to collect. */
@Test
fun testCollect() = runTest {
val x = 100
val xSum = x * (x + 1) / 2
val publisher = JFlow.Publisher<Int> { subscriber ->
var requested = 0L
var lastOutput = 0
subscriber.onSubscribe(object: JFlow.Subscription {

override fun request(n: Long) {
requested += n
if (n <= 0) {
subscriber.onError(IllegalArgumentException())
return
}
while (lastOutput < x && lastOutput < requested) {
lastOutput += 1
subscriber.onNext(lastOutput)
}
if (lastOutput == x)
subscriber.onComplete()
}

override fun cancel() {
/** According to rule 3.5 of the
* [reactive spec](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#3.5),
* this method can be called by the subscriber at any point, so it's not an error if it's called
* in this scenario. */
}

})
}
var sum = 0
publisher.collect {
sum += it
}
assertEquals(xSum, sum)
}

/** Tests the behavior of [collect] when the publisher raises an error. */
@Test
fun testCollectThrowingPublisher() = runTest {
val errorString = "Too many elements requested"
val x = 100
val xSum = x * (x + 1) / 2
val publisher = Publisher<Int> { subscriber ->
var requested = 0L
var lastOutput = 0
subscriber.onSubscribe(object: Subscription {

override fun request(n: Long) {
requested += n
if (n <= 0) {
subscriber.onError(IllegalArgumentException())
return
}
while (lastOutput < x && lastOutput < requested) {
lastOutput += 1
subscriber.onNext(lastOutput)
}
if (lastOutput == x)
subscriber.onError(IllegalArgumentException(errorString))
}

override fun cancel() {
/** See the comment for the corresponding part of [testCollect]. */
}

})
}
var sum = 0
try {
publisher.collect {
sum += it
}
} catch (e: IllegalArgumentException) {
assertEquals(errorString, e.message)
}
assertEquals(xSum, sum)
}

/** Tests the behavior of [collect] when the action throws. */
@Test
fun testCollectThrowingAction() = runTest {
val errorString = "Too many elements produced"
val x = 100
val xSum = x * (x + 1) / 2
val publisher = Publisher<Int> { subscriber ->
var requested = 0L
var lastOutput = 0
subscriber.onSubscribe(object: Subscription {

override fun request(n: Long) {
requested += n
if (n <= 0) {
subscriber.onError(IllegalArgumentException())
return
}
while (lastOutput < x && lastOutput < requested) {
lastOutput += 1
subscriber.onNext(lastOutput)
}
}

override fun cancel() {
assertEquals(x, lastOutput)
expect(x + 2)
}

})
}
var sum = 0
try {
expect(1)
var i = 1
publisher.collect {
sum += it
i += 1
expect(i)
if (sum >= xSum) {
throw IllegalArgumentException(errorString)
}
}
} catch (e: IllegalArgumentException) {
expect(x + 3)
assertEquals(errorString, e.message)
}
finish(x + 4)
}
}
14 changes: 8 additions & 6 deletions reactive/kotlinx-coroutines-reactive/src/Channel.kt
Expand Up @@ -11,10 +11,10 @@ import kotlinx.coroutines.internal.*
import org.reactivestreams.*

/**
* Subscribes to this [Publisher] and returns a channel to receive elements emitted by it.
* The resulting channel shall be [cancelled][ReceiveChannel.cancel] to unsubscribe from this publisher.
* Subscribes to this [Publisher] and returns a channel to receive the elements emitted by it.
* The resulting channel needs to be [cancelled][ReceiveChannel.cancel] in order to unsubscribe from this publisher.
* @param request how many items to request from publisher in advance (optional, one by default).
* @param request how many items to request from the publisher in advance (optional, a single element by default).
*
* This method is deprecated in the favor of [Flow].
* Instead of iterating over the resulting channel please use [collect][Flow.collect]:
Expand All @@ -35,7 +35,9 @@ public fun <T> Publisher<T>.openSubscription(request: Int = 1): ReceiveChannel<T

/**
* Subscribes to this [Publisher] and performs the specified action for each received element.
* Cancels subscription if any exception happens during collect.
*
* If [action] throws an exception at some point, the subscription is cancelled, and the exception is rethrown from
* [collect]. Also, if the publisher signals an error, that error is rethrown from [collect].
*/
public suspend inline fun <T> Publisher<T>.collect(action: (T) -> Unit): Unit =
toChannel().consumeEach(action)
Expand All @@ -61,7 +63,7 @@ private class SubscriptionChannel<T>(
// can be negative if we have receivers, but no subscription yet
private val _requested = atomic(0)

// AbstractChannel overrides
// --------------------- AbstractChannel overrides -------------------------------
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER")
override fun onReceiveEnqueued() {
_requested.loop { wasRequested ->
Expand Down Expand Up @@ -89,7 +91,7 @@ private class SubscriptionChannel<T>(
_subscription.getAndSet(null)?.cancel() // cancel exactly once
}

// Subscriber overrides
// --------------------- Subscriber overrides -------------------------------
override fun onSubscribe(s: Subscription) {
_subscription.value = s
while (true) { // lock-free loop on _requested
Expand Down
21 changes: 12 additions & 9 deletions reactive/kotlinx-coroutines-reactive/src/Publish.kt
Expand Up @@ -12,22 +12,25 @@ import org.reactivestreams.*
import kotlin.coroutines.*

/**
* Creates cold reactive [Publisher] that runs a given [block] in a coroutine.
* Creates a cold reactive [Publisher] that runs a given [block] in a coroutine.
*
* Every time the returned flux is subscribed, it starts a new coroutine in the specified [context].
* Coroutine emits ([Subscriber.onNext]) values with `send`, completes ([Subscriber.onComplete])
* when the coroutine completes or channel is explicitly closed and emits error ([Subscriber.onError])
* if coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels running coroutine.
* The coroutine emits (via [Subscriber.onNext]) values with [send][ProducerScope.send],
* completes (via [Subscriber.onComplete]) when the coroutine completes or channel is explicitly closed, and emits
* errors (via [Subscriber.onError]) if the coroutine throws an exception or closes channel with a cause.
* Unsubscribing cancels the running coroutine.
*
* Invocations of `send` are suspended appropriately when subscribers apply back-pressure and to ensure that
* `onNext` is not invoked concurrently.
* Invocations of [send][ProducerScope.send] are suspended appropriately when subscribers apply back-pressure and to
* ensure that [onNext][Subscriber.onNext] is not invoked concurrently.
*
* Coroutine context can be specified with [context] argument.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
* Method throws [IllegalArgumentException] if provided [context] contains a [Job] instance.
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is
* used.
*
* **Note: This is an experimental api.** Behaviour of publishers that work as children in a parent scope with respect
* to cancellation and error handling may change in the future.
*
* @throws IllegalArgumentException if the provided [context] contains a [Job] instance.
*/
@ExperimentalCoroutinesApi
public fun <T> publish(
Expand Down
20 changes: 10 additions & 10 deletions reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
Expand Up @@ -17,12 +17,12 @@ import kotlinx.coroutines.internal.*

/**
* Transforms the given reactive [Publisher] into [Flow].
* Use [buffer] operator on the resulting flow to specify the size of the backpressure.
* More precisely, it specifies the value of the subscription's [request][Subscription.request].
* [buffer] default capacity is used by default.
* Use the [buffer] operator on the resulting flow to specify the size of the back-pressure.
* In effect, it specifies the value of the subscription's [request][Subscription.request].
* The [default buffer capacity][Channel.BUFFERED] for a suspending channel is used by default.
*
* If any of the resulting flow transformations fails, subscription is immediately cancelled and all in-flight elements
* are discarded.
* If any of the resulting flow transformations fails, the subscription is immediately cancelled and all the in-flight
* elements are discarded.
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
Expand All @@ -31,13 +31,13 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
PublisherAsFlow(this)

/**
* Transforms the given flow to a reactive specification compliant [Publisher].
* Transforms the given flow into a reactive specification compliant [Publisher].
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* An optional [context] can be specified to control the execution context of calls to the [Subscriber] methods.
* A [CoroutineDispatcher] can be set to confine them to a specific thread; various [ThreadContextElement] can be set to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
Expand All @@ -55,8 +55,8 @@ private class PublisherAsFlow<T : Any>(
PublisherAsFlow(publisher, context, capacity, onBufferOverflow)

/*
* Suppress for Channel.CHANNEL_DEFAULT_CAPACITY.
* It's too counter-intuitive to be public and moving it to Flow companion
* The @Suppress is for Channel.CHANNEL_DEFAULT_CAPACITY.
* It's too counter-intuitive to be public, and moving it to Flow companion
* will also create undesired effect.
*/
@Suppress("INVISIBLE_MEMBER", "INVISIBLE_REFERENCE")
Expand Down

0 comments on commit c1098c7

Please sign in to comment.