Skip to content

Commit

Permalink
ReceiveChannel.broadcast shall start lazy coroutine when closed
Browse files Browse the repository at this point in the history
Documentation on broadcast operators is added that explain the the resulting BroadcastChannel shall be cancelled if it is not needed anymore.

Fixes #1713
  • Loading branch information
elizarov committed Dec 25, 2019
1 parent cc3d8c4 commit 4a1835e
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 6 deletions.
39 changes: 35 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels
Expand All @@ -10,28 +10,43 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.native.concurrent.*

/**
* Broadcasts all elements of the channel.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* otherwise -- throws [IllegalArgumentException].
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
* but it is not as prompt.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E> =
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
OperatorScope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
// which passes all exceptions upstream to the source ReceiveChannel
for (e in this@broadcast) {
send(e)
}
}

@SharedImmutable
private val OperatorScope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
* and returns a reference to the coroutine as a [BroadcastChannel]. The resulting
Expand Down Expand Up @@ -63,6 +78,14 @@ fun <E> ReceiveChannel<E>.broadcast(
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
* coroutine is doing something else.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
Expand Down Expand Up @@ -107,8 +130,9 @@ private open class BroadcastCoroutine<E>(
}

override fun cancelInternal(cause: Throwable) {
_channel.cancel(cause.toCancellationException()) // cancel the channel
cancelCoroutine(cause) // cancel the job
val exception = cause.toCancellationException()
_channel.cancel(exception) // cancel the channel
cancelCoroutine(exception) // cancel the job
}

override fun onCompleted(value: Unit) {
Expand All @@ -119,6 +143,13 @@ private open class BroadcastCoroutine<E>(
val processed = _channel.close(cause)
if (!processed && !handled) handleCoroutineException(context, cause)
}

// The BroadcastChannel could be also closed
override fun close(cause: Throwable?): Boolean {
val result = _channel.close(cause)
start() // start coroutine if it was not started yet
return result
}
}

private class LazyBroadcastCoroutine<E>(
Expand Down
107 changes: 105 additions & 2 deletions kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt
@@ -1,11 +1,12 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED")

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.test.*

class BroadcastTest : TestBase() {
Expand Down Expand Up @@ -34,4 +35,106 @@ class BroadcastTest : TestBase() {
yield() // to broadcast
finish(11)
}

/**
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
*/
@Test
fun testChannelBroadcastLazyCancel() = runTest {
expect(1)
val a = produce {
expect(3)
assertFailsWith<CancellationException> { send("MSG") }
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.cancel()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}

@Test
fun testChannelBroadcastLazyClose() = runTest {
expect(1)
val a = produce {
expect(3)
send("MSG")
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.close()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}

@Test
fun testChannelBroadcastEagerCancel() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
expectUnreached() // will be cancelled
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.cancel()
yield() // to produce (cancelled)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}

@Test
fun testChannelBroadcastEagerClose() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
// shall eventually get cancelled
assertFailsWith<CancellationException> {
while (true) { send(Unit) }
}
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.close()
yield() // to produce (closed)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}

@Test
fun testBroadcastCloseWithException() = runTest {
expect(1)
val b = broadcast(NonCancellable, capacity = 1) {
expect(2)
send(1)
expect(3)
send(2) // suspends
expect(5)
// additional attempts to send fail
assertFailsWith<TestException> { send(3) }
}
val sub = b.openSubscription()
yield() // into broadcast
expect(4)
b.close(TestException()) // close broadcast channel with exception
assertTrue(b.isClosedForSend) // sub was also closed
assertEquals(1, sub.receive()) // 1st element received
assertEquals(2, sub.receive()) // 2nd element received
assertFailsWith<TestException> { sub.receive() } // then closed with exception
yield() // to cancel broadcast
finish(6)
}
}

0 comments on commit 4a1835e

Please sign in to comment.