From e80cc5ef123256dfba95d82ffec4035b701a619a Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Tue, 24 Dec 2019 11:19:20 +0300 Subject: [PATCH] ReceiveChannel.receiveAsFlow extension * Experimental ReceiveChannel.receiveAsFlow extension convert channel to flow in fan-out fashion allowing for multi-use. * Also, ReceiveChannel.consumeAsFlow is promoted to experimental from preview Fixes #1490 --- .../kotlinx-coroutines-core.txt | 1 + .../common/src/flow/Channels.kt | 58 ++++++++++++++----- .../flow/channels/ChannelBuildersFlowTest.kt | 48 +++++++++++++++ 3 files changed, 93 insertions(+), 14 deletions(-) diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index d8d4528eb4..6df47e179f 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -943,6 +943,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; + public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow; public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow; public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index f81bc037e3..59459558b3 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -23,8 +23,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`. * See [consumeEach][ReceiveChannel.consumeEach]. */ -@ExperimentalCoroutinesApi -public suspend fun FlowCollector.emitAll(channel: ReceiveChannel) { +@ExperimentalCoroutinesApi // since version 1.3.0 +public suspend fun FlowCollector.emitAll(channel: ReceiveChannel) = + emitAllImpl(channel, consume = true) + +private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, consume: Boolean) { // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed". // It has smaller and more efficient spilled state which also allows to implement a manual kludge to // fix retention of the last emitted value. @@ -59,20 +62,43 @@ public suspend fun FlowCollector.emitAll(channel: ReceiveChannel) { cause = e throw e } finally { - channel.cancelConsumed(cause) + if (consume) channel.cancelConsumed(cause) } } +/** + * Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel + * in fan-out fashion every time this flow is collected. One element will be emitted to one collector only. + * + * See also [consumeAsFlow] which ensures that the resulting flow is collected just once. + * + * ### Cancellation semantics + * + * 1) Flow collectors are cancelled when the original channel is cancelled. + * 2) Flow collectors complete normally when the original channel completes (~is closed) normally. + * 3) Failure or cancellation of the flow collector does not affect the channel. + * + * ### Operator fusion + * + * Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `receiveAsFlow` are fused. + * In particular, [produceIn] returns the original channel. + * Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering. + */ +@ExperimentalCoroutinesApi // since version 1.4.0 +public fun ReceiveChannel.receiveAsFlow(): Flow = ChannelAsFlow(this, consume = false) + /** * Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel * on the first collection from this flow. The resulting flow can be collected just once and throws * [IllegalStateException] when trying to collect it more than once. * + * See also [receiveAsFlow] which supports multiple collectors of the resulting flow. + * * ### Cancellation semantics * - * 1) Flow consumer is cancelled when the original channel is cancelled. - * 2) Flow consumer completes normally when the original channel was closed normally and then fully consumed. - * 3) If the flow consumer fails with an exception, channel is cancelled. + * 1) Flow collector is cancelled when the original channel is cancelled. + * 2) Flow collector completes normally when the original channel was closed normally and then fully consumed. + * 3) If the flow collector fails with an exception, channel is cancelled. * * ### Operator fusion * @@ -80,8 +106,8 @@ public suspend fun FlowCollector.emitAll(channel: ReceiveChannel) { * In particular, [produceIn] returns the original channel (but throws [IllegalStateException] on repeated calls). * Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering. */ -@FlowPreview -public fun ReceiveChannel.consumeAsFlow(): Flow = ConsumeAsFlow(this) +@ExperimentalCoroutinesApi // since version 1.3.0 +public fun ReceiveChannel.consumeAsFlow(): Flow = ChannelAsFlow(this, consume = true) /** * Represents an existing [channel] as [ChannelFlow] implementation. @@ -89,21 +115,25 @@ public fun ReceiveChannel.consumeAsFlow(): Flow = ConsumeAsFlow(this) * However, additional [buffer] calls cause a separate buffering channel to be created and that is where * the context might play a role, because it is used by the producing coroutine. */ -private class ConsumeAsFlow( +private class ChannelAsFlow( private val channel: ReceiveChannel, + private val consume: Boolean, context: CoroutineContext = EmptyCoroutineContext, capacity: Int = Channel.OPTIONAL_CHANNEL ) : ChannelFlow(context, capacity) { private val consumed = atomic(false) - private fun markConsumed() = - check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" } + private fun markConsumed() { + if (consume) { + check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" } + } + } override fun create(context: CoroutineContext, capacity: Int): ChannelFlow = - ConsumeAsFlow(channel, context, capacity) + ChannelAsFlow(channel, consume, context, capacity) override suspend fun collectTo(scope: ProducerScope) = - SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll + SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel { markConsumed() // fail fast on repeated attempt to collect it @@ -121,7 +151,7 @@ private class ConsumeAsFlow( override suspend fun collect(collector: FlowCollector) { if (capacity == Channel.OPTIONAL_CHANNEL) { markConsumed() - collector.emitAll(channel) // direct + collector.emitAllImpl(channel, consume) // direct } else { super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt index de5c220f31..8dd6e3c8a7 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -21,6 +21,18 @@ class ChannelBuildersFlowTest : TestBase() { assertFailsWith { flow.collect() } } + @Test + fun testChannelReceiveAsFlow() = runTest { + val channel = produce { + repeat(10) { + send(it + 1) + } + } + val flow = channel.receiveAsFlow() + assertEquals(55, flow.sum()) + assertEquals(emptyList(), flow.toList()) + } + @Test fun testConsumeAsFlowCancellation() = runTest { val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well @@ -36,6 +48,20 @@ class ChannelBuildersFlowTest : TestBase() { assertFailsWith { flow.collect() } } + @Test + fun testReceiveAsFlowCancellation() = runTest { + val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well + repeat(10) { + send(it + 1) + } + throw TestException() + } + val flow = channel.receiveAsFlow() + assertEquals(15, flow.take(5).sum()) // sum of first 5 + assertEquals(40, flow.take(5).sum()) // sum the rest 5 + assertFailsWith { flow.sum() } // exception in the rest + } + @Test fun testConsumeAsFlowException() = runTest { val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well @@ -49,6 +75,19 @@ class ChannelBuildersFlowTest : TestBase() { assertFailsWith { flow.collect() } } + @Test + fun testReceiveAsFlowException() = runTest { + val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well + repeat(10) { + send(it + 1) + } + throw TestException() + } + val flow = channel.receiveAsFlow() + assertFailsWith { flow.sum() } + assertFailsWith { flow.collect() } // repeated collection -- same exception + } + @Test fun testConsumeAsFlowProduceFusing() = runTest { val channel = produce { send("OK") } @@ -58,6 +97,15 @@ class ChannelBuildersFlowTest : TestBase() { channel.cancel() } + @Test + fun testReceiveAsFlowProduceFusing() = runTest { + val channel = produce { send("OK") } + val flow = channel.receiveAsFlow() + assertSame(channel, flow.produceIn(this)) + assertSame(channel, flow.produceIn(this)) // can use produce multiple times + channel.cancel() + } + @Test fun testConsumeAsFlowProduceBuffered() = runTest { expect(1)