Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ReceiveChannel.broadcast shall start lazy coroutine when closed #1717

Merged
merged 4 commits into from Mar 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
52 changes: 48 additions & 4 deletions kotlinx-coroutines-core/common/src/channels/Broadcast.kt
Expand Up @@ -10,27 +10,48 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.native.concurrent.*

/**
* Broadcasts all elements of the channel.
* This function [consumes][ReceiveChannel.consume] all elements of the original [ReceiveChannel].
*
* The kind of the resulting channel depends on the specified [capacity] parameter:
* when `capacity` is positive (1 by default), but less than [UNLIMITED] -- uses `ArrayBroadcastChannel` with a buffer of given capacity,
* when `capacity` is [CONFLATED] -- uses [ConflatedBroadcastChannel] that conflates back-to-back sends;
* Note that resulting channel behaves like [ConflatedBroadcastChannel] but is not an instance of [ConflatedBroadcastChannel].
* otherwise -- throws [IllegalArgumentException].
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes eventual failure of the broadcast coroutine and cancellation of the underlying channel, too,
* but it is not as prompt.
*
* ### Future replacement
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
*
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
*/
fun <E> ReceiveChannel<E>.broadcast(
capacity: Int = 1,
start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E> =
GlobalScope.broadcast(Dispatchers.Unconfined, capacity = capacity, start = start, onCompletion = consumes()) {
): BroadcastChannel<E> {
val scope = GlobalScope + Dispatchers.Unconfined + CoroutineExceptionHandler { _, _ -> }
// We can run this coroutine in the context that ignores all exceptions, because of `onCompletion = consume()`
// which passes all exceptions upstream to the source ReceiveChannel
return scope.broadcast(capacity = capacity, start = start, onCompletion = consumes()) {
for (e in this@broadcast) {
send(e)
}
}
}

/**
* Launches new coroutine to produce a stream of values by sending them to a broadcast channel
Expand Down Expand Up @@ -63,6 +84,21 @@ fun <E> ReceiveChannel<E>.broadcast(
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* ### Cancelling broadcast
*
* **To stop broadcasting from the underlying channel call [cancel][BroadcastChannel.cancel] on the result.**
*
* Do not use [close][BroadcastChannel.close] on the resulting channel.
* It causes failure of the `send` operation in broadcast coroutine and would not cancel it if the
* coroutine is doing something else.
*
* ### Future replacement
*
* This function has an inappropriate result type of [BroadcastChannel] which provides
* [send][BroadcastChannel.send] and [close][BroadcastChannel.close] operations that interfere with
* the broadcasting coroutine in hard-to-specify ways. It will be replaced with
* sharing operators on [Flow][kotlinx.coroutines.flow.Flow] in the future.
*
* @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
* @param capacity capacity of the channel's buffer (1 by default).
* @param start coroutine start option. The default value is [CoroutineStart.LAZY].
Expand Down Expand Up @@ -107,8 +143,9 @@ private open class BroadcastCoroutine<E>(
}

override fun cancelInternal(cause: Throwable) {
_channel.cancel(cause.toCancellationException()) // cancel the channel
cancelCoroutine(cause) // cancel the job
val exception = cause.toCancellationException()
_channel.cancel(exception) // cancel the channel
cancelCoroutine(exception) // cancel the job
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
}

override fun onCompleted(value: Unit) {
Expand All @@ -119,6 +156,13 @@ private open class BroadcastCoroutine<E>(
val processed = _channel.close(cause)
if (!processed && !handled) handleCoroutineException(context, cause)
}

// The BroadcastChannel could be also closed
override fun close(cause: Throwable?): Boolean {
elizarov marked this conversation as resolved.
Show resolved Hide resolved
val result = _channel.close(cause)
start() // start coroutine if it was not started yet
elizarov marked this conversation as resolved.
Show resolved Hide resolved
return result
}
}

private class LazyBroadcastCoroutine<E>(
Expand Down
107 changes: 105 additions & 2 deletions kotlinx-coroutines-core/common/test/channels/BroadcastTest.kt
@@ -1,11 +1,12 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED")

package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlin.coroutines.*
import kotlin.test.*

class BroadcastTest : TestBase() {
Expand Down Expand Up @@ -34,4 +35,106 @@ class BroadcastTest : TestBase() {
yield() // to broadcast
finish(11)
}

/**
* See https://github.com/Kotlin/kotlinx.coroutines/issues/1713
*/
@Test
fun testChannelBroadcastLazyCancel() = runTest {
expect(1)
val a = produce {
expect(3)
assertFailsWith<CancellationException> { send("MSG") }
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.cancel()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}

@Test
fun testChannelBroadcastLazyClose() = runTest {
expect(1)
val a = produce {
expect(3)
send("MSG")
expect(5)
}
expect(2)
yield() // to produce
val b = a.broadcast()
b.close()
expect(4)
yield() // to abort produce
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(6)
}

@Test
fun testChannelBroadcastEagerCancel() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
expectUnreached() // will be cancelled
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.cancel()
yield() // to produce (cancelled)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}

@Test
fun testChannelBroadcastEagerClose() = runTest {
expect(1)
val a = produce<Unit> {
expect(3)
yield() // back to main
// shall eventually get cancelled
assertFailsWith<CancellationException> {
while (true) { send(Unit) }
}
}
expect(2)
val b = a.broadcast(start = CoroutineStart.DEFAULT)
yield() // to produce
expect(4)
b.close()
yield() // to produce (closed)
assertTrue(a.isClosedForReceive) // the source channel was consumed
finish(5)
}

@Test
fun testBroadcastCloseWithException() = runTest {
expect(1)
val b = broadcast(NonCancellable, capacity = 1) {
expect(2)
send(1)
expect(3)
send(2) // suspends
expect(5)
// additional attempts to send fail
assertFailsWith<TestException> { send(3) }
}
val sub = b.openSubscription()
yield() // into broadcast
expect(4)
b.close(TestException()) // close broadcast channel with exception
assertTrue(b.isClosedForSend) // sub was also closed
assertEquals(1, sub.receive()) // 1st element received
assertEquals(2, sub.receive()) // 2nd element received
assertFailsWith<TestException> { sub.receive() } // then closed with exception
yield() // to cancel broadcast
finish(6)
}
}
5 changes: 4 additions & 1 deletion kotlinx-coroutines-core/jvm/src/channels/Actor.kt
Expand Up @@ -165,8 +165,11 @@ private class LazyActorCoroutine<E>(
}

override fun close(cause: Throwable?): Boolean {
// close the channel _first_
val closed = super.close(cause)
// then start the coroutine (it will promptly fail if it was not started yet)
start()
return super.close(cause)
return closed
}

override val onSend: SelectClause2<E, SendChannel<E>>
Expand Down