diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index e487a53431..ef8b980ab5 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -10,9 +10,11 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.intrinsics.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* +import kotlin.native.concurrent.* /** * Broadcasts all elements of the channel. + * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. * * The kind of the resulting channel depends on the specified [capacity] parameter: * when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity, @@ -20,17 +22,37 @@ import kotlin.coroutines.intrinsics.* * Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel]. * otherwise -- throws [IllegalArgumentException]. * + * ### Cancelling broadcast + * + * **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.** + * + * Do not use [close][BroadcastChannel.close] on the resulting channel. + * It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too, + * but it is not as prompt. + * + * ### Obsolete + * + * This function has an inappropriate result type of [BroadcastChannel] which provides + * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with + * the broadcasting coroutine in hard-to-specify ways. It will be replaced with + * sharing operators on [Flow][kotlinx.coroutines.flow.Flow]. + * * @param start coroutine start option. The default value is [CoroutineStart.LAZY]. */ +@ObsoleteCoroutinesApi // since version 1.4.0 fun ReceiveChannel.broadcast( capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY -): BroadcastChannel = - GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) { +): BroadcastChannel { + val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> } + // We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()` + // which passes all exceptions upstream to the source ReceiveChannel + return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) { for (e in this@broadcast) { send(e) } } +} /** * Launches new coroutine to produce a stream of values by sending them to a broadcast channel @@ -63,12 +85,28 @@ fun ReceiveChannel.broadcast( * * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine. * + * ### Cancelling broadcast + * + * **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.** + * + * Do not use [close][BroadcastChannel.close] on the resulting channel. + * It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the + * coroutine is doing something else. + * + * ### Obsolete + * + * This function has an inappropriate result type of [BroadcastChannel] which provides + * [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with + * the broadcasting coroutine in hard-to-specify ways. It will be replaced with + * sharing operators on [Flow][kotlinx.coroutines.flow.Flow]. + * * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine. * @param capacity capacity of the channel's buffer (1 by default). * @param start coroutine start option. The default value is [CoroutineStart.LAZY]. * @param onCompletion optional completion handler for the producer coroutine (see [Job.invokeOnCompletion]). * @param block the coroutine code. */ +@ObsoleteCoroutinesApi // since version 1.4.0 public fun CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, @@ -107,8 +145,9 @@ private open class BroadcastCoroutine( } override fun cancelInternal(cause: Throwable) { - _channel.cancel(cause.toCancellationException()) // cancel the channel - cancelCoroutine(cause) // cancel the job + val exception = cause.toCancellationException() + _channel.cancel(exception) // cancel the channel + cancelCoroutine(exception) // cancel the job } override fun onCompleted(value: Unit) { @@ -119,6 +158,13 @@ private open class BroadcastCoroutine( val processed = _channel.close(cause) if (!processed && !handled) handleCoroutineException(context, cause) } + + // The BroadcastChannel could be also closed + override fun close(cause: Throwable?): Boolean { + val result = _channel.close(cause) + start() // start coroutine if it was not started yet + return result + } } private class LazyBroadcastCoroutine( diff --git a/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt b/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt index 878437bbae..bb3142e54c 100644 --- a/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt @@ -1,11 +1,12 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") + package kotlinx.coroutines.channels import kotlinx.coroutines.* -import kotlin.coroutines.* import kotlin.test.* class BroadcastTest : TestBase() { @@ -34,4 +35,106 @@ class BroadcastTest : TestBase() { yield() // to broadcast finish(11) } + + /** + * See https://github.com/Kotlin/kotlinx.coroutines/issues/1713 + */ + @Test + fun testChannelBroadcastLazyCancel() = runTest { + expect(1) + val a = produce { + expect(3) + assertFailsWith { send("MSG") } + expect(5) + } + expect(2) + yield() // to produce + val b = a.broadcast() + b.cancel() + expect(4) + yield() // to abort produce + assertTrue(a.isClosedForReceive) // the source channel was consumed + finish(6) + } + + @Test + fun testChannelBroadcastLazyClose() = runTest { + expect(1) + val a = produce { + expect(3) + send("MSG") + expect(5) + } + expect(2) + yield() // to produce + val b = a.broadcast() + b.close() + expect(4) + yield() // to abort produce + assertTrue(a.isClosedForReceive) // the source channel was consumed + finish(6) + } + + @Test + fun testChannelBroadcastEagerCancel() = runTest { + expect(1) + val a = produce { + expect(3) + yield() // back to main + expectUnreached() // will be cancelled + } + expect(2) + val b = a.broadcast(start = CoroutineStart.DEFAULT) + yield() // to produce + expect(4) + b.cancel() + yield() // to produce (cancelled) + assertTrue(a.isClosedForReceive) // the source channel was consumed + finish(5) + } + + @Test + fun testChannelBroadcastEagerClose() = runTest { + expect(1) + val a = produce { + expect(3) + yield() // back to main + // shall eventually get cancelled + assertFailsWith { + while (true) { send(Unit) } + } + } + expect(2) + val b = a.broadcast(start = CoroutineStart.DEFAULT) + yield() // to produce + expect(4) + b.close() + yield() // to produce (closed) + assertTrue(a.isClosedForReceive) // the source channel was consumed + finish(5) + } + + @Test + fun testBroadcastCloseWithException() = runTest { + expect(1) + val b = broadcast(NonCancellable, capacity = 1) { + expect(2) + send(1) + expect(3) + send(2) // suspends + expect(5) + // additional attempts to send fail + assertFailsWith { send(3) } + } + val sub = b.openSubscription() + yield() // into broadcast + expect(4) + b.close(TestException()) // close broadcast channel with exception + assertTrue(b.isClosedForSend) // sub was also closed + assertEquals(1, sub.receive()) // 1st element received + assertEquals(2, sub.receive()) // 2nd element received + assertFailsWith { sub.receive() } // then closed with exception + yield() // to cancel broadcast + finish(6) + } } \ No newline at end of file