Skip to content

Commit

Permalink
ReceiveChannel.broadcast shall start lazy coroutine when closed
Browse files Browse the repository at this point in the history
* Documentation on broadcast operators is added that explains that the resulting BroadcastChannel shall be cancelled if it is not needed anymore.
* More tests added for various broadcast cancel/close cases.
* The only functional change is that closing a broadcast channel for lazy coroutine shall start the corresponding coroutine to give it a chance to promptly fail.
* Mark broadcast operators as obsolete. To be replaced with sharing operators on flows (see #1716).

Fixes #1713
  • Loading branch information
elizarov committed Feb 17, 2020
1 parent bf9509d commit f361535
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 7 deletions.
56 changes: 51 additions & 5 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels
Expand All @@ -10,27 +10,49 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.native.concurrent.*

/**
* Broadcasts all elements of the channel.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* otherwise -- throws [IllegalArgumentException].
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
* but it is not as prompt.
*
* ### Obsolete
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow].
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
@ObsoleteCoroutinesApi // since version 1.4.0
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E> =
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
): BroadcastChannel<E> {
val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
// which passes all exceptions upstream to the source ReceiveChannel
return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
for (e in this@broadcast) {
send(e)
}
}
}

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
Expand Down Expand Up @@ -63,12 +85,28 @@ fun <E> ReceiveChannel<E>.broadcast(
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
* coroutine is doing something else.
*
* ### Obsolete
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow].
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
* @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]).
* @param block the coroutine code.
*/
@ObsoleteCoroutinesApi // since version 1.4.0
public fun <E> CoroutineScope.broadcast(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 1,
Expand Down Expand Up @@ -107,8 +145,9 @@ private open class BroadcastCoroutine<E>(
}

override fun cancelInternal(cause: Throwable) {
_channel.cancel(cause.toCancellationException()) // cancel the channel
cancelCoroutine(cause) // cancel the job
val exception = cause.toCancellationException()
_channel.cancel(exception) // cancel the channel
cancelCoroutine(exception) // cancel the job
}

override fun onCompleted(value: Unit) {
Expand All @@ -119,6 +158,13 @@ private open class BroadcastCoroutine<E>(
val processed = _channel.close(cause)
if (!processed && !handled) handleCoroutineException(context, cause)
}

// The BroadcastChannel could be also closed
override fun close(cause: Throwable?): Boolean {
val result = _channel.close(cause)
start() // start coroutine if it was not started yet
return result
}
}

private class LazyBroadcastCoroutine<E>(
Expand Down
107 changes: 105 additions & 2 deletions kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt
@@ -1,11 +1,12 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED")

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.test.*

class BroadcastTest : TestBase() {
Expand Down Expand Up @@ -34,4 +35,106 @@ class BroadcastTest : TestBase() {
yield() // to broadcast
finish(11)
}

/**
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
*/
@Test
fun testChannelBroadcastLazyCancel() = runTest {
expect(1)
val a = produce {
expect(3)
assertFailsWith<CancellationException> { send("MSG") }
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.cancel()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}

@Test
fun testChannelBroadcastLazyClose() = runTest {
expect(1)
val a = produce {
expect(3)
send("MSG")
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.close()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}

@Test
fun testChannelBroadcastEagerCancel() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
expectUnreached() // will be cancelled
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.cancel()
yield() // to produce (cancelled)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}

@Test
fun testChannelBroadcastEagerClose() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
// shall eventually get cancelled
assertFailsWith<CancellationException> {
while (true) { send(Unit) }
}
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.close()
yield() // to produce (closed)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}

@Test
fun testBroadcastCloseWithException() = runTest {
expect(1)
val b = broadcast(NonCancellable, capacity = 1) {
expect(2)
send(1)
expect(3)
send(2) // suspends
expect(5)
// additional attempts to send fail
assertFailsWith<TestException> { send(3) }
}
val sub = b.openSubscription()
yield() // into broadcast
expect(4)
b.close(TestException()) // close broadcast channel with exception
assertTrue(b.isClosedForSend) // sub was also closed
assertEquals(1, sub.receive()) // 1st element received
assertEquals(2, sub.receive()) // 2nd element received
assertFailsWith<TestException> { sub.receive() } // then closed with exception
yield() // to cancel broadcast
finish(6)
}
}

0 comments on commit f361535

Please sign in to comment.