Skip to content

Commit

Permalink
ReceiveChannel.receiveAsFlow extension
Browse files Browse the repository at this point in the history
* Experimental ReceiveChannel.receiveAsFlow extension convert channel to flow in fan-out fashion allowing for multi-use.
* Also, ReceiveChannel.consumeAsFlow is promoted to experimental from preview

Fixes #1490
  • Loading branch information
elizarov committed Dec 24, 2019
1 parent 12a0318 commit e80cc5e
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 14 deletions.
Expand Up @@ -943,6 +943,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down
58 changes: 44 additions & 14 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Expand Up @@ -23,8 +23,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
* See [consumeEach][ReceiveChannel.consumeEach].
*/
@ExperimentalCoroutinesApi
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
@ExperimentalCoroutinesApi // since version 1.3.0
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) =
emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
// fix retention of the last emitted value.
Expand Down Expand Up @@ -59,51 +62,78 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
cause = e
throw e
} finally {
channel.cancelConsumed(cause)
if (consume) channel.cancelConsumed(cause)
}
}

/**
* Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel
* in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.
*
* See also [consumeAsFlow] which ensures that the resulting flow is collected just once.
*
* ### Cancellation semantics
*
* 1) Flow collectors are cancelled when the original channel is cancelled.
* 2) Flow collectors complete normally when the original channel completes (~is closed) normally.
* 3) Failure or cancellation of the flow collector does not affect the channel.
*
* ### Operator fusion
*
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `receiveAsFlow` are fused.
* In particular, [produceIn] returns the original channel.
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
*/
@ExperimentalCoroutinesApi // since version 1.4.0
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)

/**
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
* on the first collection from this flow. The resulting flow can be collected just once and throws
* [IllegalStateException] when trying to collect it more than once.
*
* See also [receiveAsFlow] which supports multiple collectors of the resulting flow.
*
* ### Cancellation semantics
*
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel was closed normally and then fully consumed.
* 3) If the flow consumer fails with an exception, channel is cancelled.
* 1) Flow collector is cancelled when the original channel is cancelled.
* 2) Flow collector completes normally when the original channel was closed normally and then fully consumed.
* 3) If the flow collector fails with an exception, channel is cancelled.
*
* ### Operator fusion
*
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `consumeAsFlow` are fused.
* In particular, [produceIn] returns the original channel (but throws [IllegalStateException] on repeated calls).
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
*/
@FlowPreview
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ConsumeAsFlow(this)
@ExperimentalCoroutinesApi // since version 1.3.0
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)

/**
* Represents an existing [channel] as [ChannelFlow] implementation.
* It fuses with subsequent [flowOn] operators, but for the most part ignores the specified context.
* However, additional [buffer] calls cause a separate buffering channel to be created and that is where
* the context might play a role, because it is used by the producing coroutine.
*/
private class ConsumeAsFlow<T>(
private class ChannelAsFlow<T>(
private val channel: ReceiveChannel<T>,
private val consume: Boolean,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
) : ChannelFlow<T>(context, capacity) {
private val consumed = atomic(false)

private fun markConsumed() =
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
private fun markConsumed() {
if (consume) {
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
}
}

override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
ConsumeAsFlow(channel, context, capacity)
ChannelAsFlow(channel, consume, context, capacity)

override suspend fun collectTo(scope: ProducerScope<T>) =
SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll
SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll

override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
markConsumed() // fail fast on repeated attempt to collect it
Expand All @@ -121,7 +151,7 @@ private class ConsumeAsFlow<T>(
override suspend fun collect(collector: FlowCollector<T>) {
if (capacity == Channel.OPTIONAL_CHANNEL) {
markConsumed()
collector.emitAll(channel) // direct
collector.emitAllImpl(channel, consume) // direct
} else {
super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed
}
Expand Down
Expand Up @@ -21,6 +21,18 @@ class ChannelBuildersFlowTest : TestBase() {
assertFailsWith<IllegalStateException> { flow.collect() }
}

@Test
fun testChannelReceiveAsFlow() = runTest {
val channel = produce {
repeat(10) {
send(it + 1)
}
}
val flow = channel.receiveAsFlow()
assertEquals(55, flow.sum())
assertEquals(emptyList(), flow.toList())
}

@Test
fun testConsumeAsFlowCancellation() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
Expand All @@ -36,6 +48,20 @@ class ChannelBuildersFlowTest : TestBase() {
assertFailsWith<IllegalStateException> { flow.collect() }
}

@Test
fun testReceiveAsFlowCancellation() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
repeat(10) {
send(it + 1)
}
throw TestException()
}
val flow = channel.receiveAsFlow()
assertEquals(15, flow.take(5).sum()) // sum of first 5
assertEquals(40, flow.take(5).sum()) // sum the rest 5
assertFailsWith<TestException> { flow.sum() } // exception in the rest
}

@Test
fun testConsumeAsFlowException() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
Expand All @@ -49,6 +75,19 @@ class ChannelBuildersFlowTest : TestBase() {
assertFailsWith<IllegalStateException> { flow.collect() }
}

@Test
fun testReceiveAsFlowException() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
repeat(10) {
send(it + 1)
}
throw TestException()
}
val flow = channel.receiveAsFlow()
assertFailsWith<TestException> { flow.sum() }
assertFailsWith<TestException> { flow.collect() } // repeated collection -- same exception
}

@Test
fun testConsumeAsFlowProduceFusing() = runTest {
val channel = produce { send("OK") }
Expand All @@ -58,6 +97,15 @@ class ChannelBuildersFlowTest : TestBase() {
channel.cancel()
}

@Test
fun testReceiveAsFlowProduceFusing() = runTest {
val channel = produce { send("OK") }
val flow = channel.receiveAsFlow()
assertSame(channel, flow.produceIn(this))
assertSame(channel, flow.produceIn(this)) // can use produce multiple times
channel.cancel()
}

@Test
fun testConsumeAsFlowProduceBuffered() = runTest {
expect(1)
Expand Down

0 comments on commit e80cc5e

Please sign in to comment.