Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReceiveChannel.receiveAsFlow extension #1731

Merged
merged 2 commits into from Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -943,6 +943,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun onStart (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun produceIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun publishOn (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow;
public static final fun receiveAsFlow (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/flow/Flow;
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final synthetic fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down
58 changes: 44 additions & 14 deletions kotlinx-coroutines-core/common/src/flow/Channels.kt
Expand Up @@ -23,8 +23,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
* This function provides a more efficient shorthand for `channel.consumeEach { value -> emit(value) }`.
* See [consumeEach][ReceiveChannel.consumeEach].
*/
@ExperimentalCoroutinesApi
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
@ExperimentalCoroutinesApi // since version 1.3.0
public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) =
emitAllImpl(channel, consume = true)

private suspend fun <T> FlowCollector<T>.emitAllImpl(channel: ReceiveChannel<T>, consume: Boolean) {
// Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed".
// It has smaller and more efficient spilled state which also allows to implement a manual kludge to
// fix retention of the last emitted value.
Expand Down Expand Up @@ -59,51 +62,78 @@ public suspend fun <T> FlowCollector<T>.emitAll(channel: ReceiveChannel<T>) {
cause = e
throw e
} finally {
channel.cancelConsumed(cause)
if (consume) channel.cancelConsumed(cause)
}
}

/**
* Represents the given receive channel as a hot flow and [receives][ReceiveChannel.receive] from the channel
* in fan-out fashion every time this flow is collected. One element will be emitted to one collector only.
*
* See also [consumeAsFlow] which ensures that the resulting flow is collected just once.
*
* ### Cancellation semantics
*
* 1) Flow collectors are cancelled when the original channel is cancelled.
* 2) Flow collectors complete normally when the original channel completes (~is closed) normally.
elizarov marked this conversation as resolved.
Show resolved Hide resolved
* 3) Failure or cancellation of the flow collector does not affect the channel.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* ### Operator fusion
*
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `receiveAsFlow` are fused.
* In particular, [produceIn] returns the original channel.
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
*/
@ExperimentalCoroutinesApi // since version 1.4.0
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
public fun <T> ReceiveChannel<T>.receiveAsFlow(): Flow<T> = ChannelAsFlow(this, consume = false)

/**
* Represents the given receive channel as a hot flow and [consumes][ReceiveChannel.consume] the channel
* on the first collection from this flow. The resulting flow can be collected just once and throws
* [IllegalStateException] when trying to collect it more than once.
*
* See also [receiveAsFlow] which supports multiple collectors of the resulting flow.
*
* ### Cancellation semantics
*
* 1) Flow consumer is cancelled when the original channel is cancelled.
* 2) Flow consumer completes normally when the original channel was closed normally and then fully consumed.
* 3) If the flow consumer fails with an exception, channel is cancelled.
* 1) Flow collector is cancelled when the original channel is cancelled.
* 2) Flow collector completes normally when the original channel was closed normally and then fully consumed.
* 3) If the flow collector fails with an exception, channel is cancelled.
*
* ### Operator fusion
*
* Adjacent applications of [flowOn], [buffer], [conflate], and [produceIn] to the result of `consumeAsFlow` are fused.
* In particular, [produceIn] returns the original channel (but throws [IllegalStateException] on repeated calls).
* Calls to [flowOn] have generally no effect, unless [buffer] is used to explicitly request buffering.
*/
@FlowPreview
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ConsumeAsFlow(this)
@ExperimentalCoroutinesApi // since version 1.3.0
public fun <T> ReceiveChannel<T>.consumeAsFlow(): Flow<T> = ChannelAsFlow(this, consume = true)

/**
* Represents an existing [channel] as [ChannelFlow] implementation.
* It fuses with subsequent [flowOn] operators, but for the most part ignores the specified context.
* However, additional [buffer] calls cause a separate buffering channel to be created and that is where
* the context might play a role, because it is used by the producing coroutine.
*/
private class ConsumeAsFlow<T>(
private class ChannelAsFlow<T>(
private val channel: ReceiveChannel<T>,
private val consume: Boolean,
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = Channel.OPTIONAL_CHANNEL
) : ChannelFlow<T>(context, capacity) {
private val consumed = atomic(false)

private fun markConsumed() =
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
private fun markConsumed() {
if (consume) {
check(!consumed.getAndSet(true)) { "ReceiveChannel.consumeAsFlow can be collected just once" }
}
}

override fun create(context: CoroutineContext, capacity: Int): ChannelFlow<T> =
ConsumeAsFlow(channel, context, capacity)
ChannelAsFlow(channel, consume, context, capacity)

override suspend fun collectTo(scope: ProducerScope<T>) =
SendingCollector(scope).emitAll(channel) // use efficient channel receiving code from emitAll
SendingCollector(scope).emitAllImpl(channel, consume) // use efficient channel receiving code from emitAll

override fun broadcastImpl(scope: CoroutineScope, start: CoroutineStart): BroadcastChannel<T> {
markConsumed() // fail fast on repeated attempt to collect it
Expand All @@ -121,7 +151,7 @@ private class ConsumeAsFlow<T>(
override suspend fun collect(collector: FlowCollector<T>) {
if (capacity == Channel.OPTIONAL_CHANNEL) {
markConsumed()
collector.emitAll(channel) // direct
collector.emitAllImpl(channel, consume) // direct
} else {
super.collect(collector) // extra buffering channel, produceImpl will mark it as consumed
}
Expand Down
Expand Up @@ -21,6 +21,18 @@ class ChannelBuildersFlowTest : TestBase() {
assertFailsWith<IllegalStateException> { flow.collect() }
}

@Test
fun testChannelReceiveAsFlow() = runTest {
val channel = produce {
repeat(10) {
send(it + 1)
}
}
val flow = channel.receiveAsFlow()
assertEquals(55, flow.sum())
assertEquals(emptyList(), flow.toList())
}

@Test
fun testConsumeAsFlowCancellation() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
Expand All @@ -36,6 +48,20 @@ class ChannelBuildersFlowTest : TestBase() {
assertFailsWith<IllegalStateException> { flow.collect() }
}

@Test
fun testReceiveAsFlowCancellation() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
repeat(10) {
send(it + 1)
}
throw TestException()
}
val flow = channel.receiveAsFlow()
assertEquals(15, flow.take(5).sum()) // sum of first 5
assertEquals(40, flow.take(5).sum()) // sum the rest 5
assertFailsWith<TestException> { flow.sum() } // exception in the rest
}

@Test
fun testConsumeAsFlowException() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
Expand All @@ -49,6 +75,19 @@ class ChannelBuildersFlowTest : TestBase() {
assertFailsWith<IllegalStateException> { flow.collect() }
}

@Test
fun testReceiveAsFlowException() = runTest {
val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well
repeat(10) {
send(it + 1)
}
throw TestException()
}
val flow = channel.receiveAsFlow()
assertFailsWith<TestException> { flow.sum() }
assertFailsWith<TestException> { flow.collect() } // repeated collection -- same exception
}

@Test
fun testConsumeAsFlowProduceFusing() = runTest {
val channel = produce { send("OK") }
Expand All @@ -58,6 +97,15 @@ class ChannelBuildersFlowTest : TestBase() {
channel.cancel()
}

@Test
fun testReceiveAsFlowProduceFusing() = runTest {
val channel = produce { send("OK") }
val flow = channel.receiveAsFlow()
assertSame(channel, flow.produceIn(this))
assertSame(channel, flow.produceIn(this)) // can use produce multiple times
channel.cancel()
}

@Test
fun testConsumeAsFlowProduceBuffered() = runTest {
expect(1)
Expand Down