diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt index e487a53431..de65f3adeb 100644 --- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt +++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -13,6 +13,7 @@ import kotlin.coroutines.intrinsics.* /** * Broadcasts all elements of the channel. + * This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel]. * * The kind of the resulting channel depends on the specified [capacity] parameter: * when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity, @@ -28,7 +29,12 @@ fun ReceiveChannel.broadcast( ): BroadcastChannel = GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) { for (e in this@broadcast) { - send(e) + try { + send(e) + } catch (e: ClosedSendChannelException) { + // the resulting BroadcastChannel was closed -> just break the sending loop + break + } } } @@ -119,6 +125,13 @@ private open class BroadcastCoroutine( val processed = _channel.close(cause) if (!processed && !handled) handleCoroutineException(context, cause) } + + // The BroadcastChannel could be also closed + override fun close(cause: Throwable?): Boolean { + val result = _channel.close(cause) + cancelCoroutine(cause) + return result + } } private class LazyBroadcastCoroutine( diff --git a/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt b/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt index 878437bbae..f631d6e5fd 100644 --- a/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt @@ -1,11 +1,10 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels import kotlinx.coroutines.* -import kotlin.coroutines.* import kotlin.test.* class BroadcastTest : TestBase() { @@ -34,4 +33,47 @@ class BroadcastTest : TestBase() { yield() // to broadcast finish(11) } + + /** + * See https://github.com/Kotlin/kotlinx.coroutines/issues/1713 + */ + @Test + fun testChannelBroadcastLazyClose() = runTest { + expect(1) + val a = produce { + expect(3) + try { + send("MSG") + } finally { + expect(5) + } + expectUnreached() + } + expect(2) + yield() // to produce + val b = a.broadcast() + b.close() + expect(4) + yield() // to abort produce + assertTrue(a.isClosedForReceive) // the source channel was consumed + finish(6) + } + + @Test + fun testChannelBroadcastEagerClose() = runTest { + expect(1) + val a = produce { + expect(3) + yield() // back to main + expectUnreached() // will be cancelled + } + expect(2) + val b = a.broadcast(start = CoroutineStart.DEFAULT) + yield() // to produce + expect(4) + b.close() + yield() // to produce (cancelled) + assertTrue(a.isClosedForReceive) // the source channel was consumed + finish(5) + } } \ No newline at end of file