From f9e14dd9ed74b3c2a2f8323f28a75320db713cb8 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 17 Jul 2019 16:46:30 -0700 Subject: [PATCH] Channel.receiveOrNull becomes extension, internal receiveOrClosed added * The corresponding ReceiveChannel methods are deprecated. * Introduced corresponding extensions with the same semantic and generic Any bound. * Introduce internal ReceiveChannel.[on]receiveOrClosed * Using internal inline class ValueOrClosed. * To be stabilized and made public in the future when inline classes ABI stabilizes. * It is related to #330 but does not resolve it yet. * Includes todos for future public ValueOrClose design. * Simplify AbstractChannel select implementations. * AbstractChannel implementation is optimized to avoid code duplication in suspension of different receive methods: receive, receiveOrNull, receiveOrClosed. --- .../kotlinx-coroutines-core.txt | 21 +++ build.gradle | 2 +- .../channels/ChannelReceiveOrClosedTest.kt | 141 +++++++++++++++ docs/select-expression.md | 12 +- kotlinx-coroutines-core/README.md | 6 +- kotlinx-coroutines-core/common/README.md | 6 +- .../common/src/channels/AbstractChannel.kt | 163 ++++++++++------- .../common/src/channels/Channel.kt | 167 +++++++++++++++++- .../common/src/channels/Channels.common.kt | 44 +++++ .../test/channels/BasicOperationsTest.kt | 33 ++++ .../common/test/channels/TestChannelKind.kt | 3 + .../flow/channels/ChannelBuildersFlowTest.kt | 40 +++++ .../test/selects/SelectArrayChannelTest.kt | 98 ++++++++++ .../selects/SelectRendezvousChannelTest.kt | 98 ++++++++++ .../test/channels/TickerChannelCommonTest.kt | 21 +-- 15 files changed, 760 insertions(+), 95 deletions(-) create mode 100644 common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt diff --git a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt index 94df09a6ee..89bed95c68 100644 --- a/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt +++ b/binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt @@ -670,7 +670,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt { public static final fun minWith (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun onReceiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/selects/SelectClause1; public static final fun partition (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun receiveOrNull (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun reduce (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun reduceIndexed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun requireNoNulls (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel; @@ -743,12 +745,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel { public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1; + public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1; public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1; public abstract fun isClosedForReceive ()Z public abstract fun isEmpty ()Z public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator; public abstract fun poll ()Ljava/lang/Object; public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -784,6 +788,23 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum { public static fun values ()[Lkotlinx/coroutines/channels/TickerMode; } +public final class kotlinx/coroutines/channels/ValueOrClosed { + public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion; + public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed; + public fun equals (Ljava/lang/Object;)Z + public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z + public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z + public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable; + public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object; + public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object; + public fun hashCode ()I + public static fun hashCode-impl (Ljava/lang/Object;)I + public static final fun isClosed-impl (Ljava/lang/Object;)Z + public fun toString ()Ljava/lang/String; + public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String; + public final synthetic fun unbox-impl ()Ljava/lang/Object; +} + public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/Flow { public fun ()V public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; diff --git a/build.gradle b/build.gradle index a1813d18a6..a1a797f8f4 100644 --- a/build.gradle +++ b/build.gradle @@ -123,7 +123,6 @@ allprojects { def platform = platformOf(it) apply from: rootProject.file("gradle/compile-${platform}.gradle") - dependencies { // See comment below for rationale, it will be replaced with "project" dependency compile "org.jetbrains.kotlinx:kotlinx-coroutines-core:$version" @@ -135,6 +134,7 @@ allprojects { tasks.withType(org.jetbrains.kotlin.gradle.tasks.AbstractKotlinCompile).all { kotlinOptions.freeCompilerArgs += experimentalAnnotations.collect { "-Xuse-experimental=" + it } kotlinOptions.freeCompilerArgs += "-progressive" + kotlinOptions.freeCompilerArgs += "-XXLanguage:+InlineClasses" // Binary compatibility support kotlinOptions.freeCompilerArgs += ["-Xdump-declarations-to=${buildDir}/visibilities.json"] } diff --git a/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt b/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt new file mode 100644 index 0000000000..303e6d15e2 --- /dev/null +++ b/common/kotlinx-coroutines-core-common/test/channels/ChannelReceiveOrClosedTest.kt @@ -0,0 +1,141 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.channels + +import kotlinx.coroutines.* +import kotlin.test.* + +class ChannelReceiveOrClosedTest : TestBase() { + @Test + fun testChannelOfThrowables() = runTest { + val channel = Channel() + launch { + channel.send(TestException1()) + channel.close(TestException2()) + } + + val element = channel.receiveOrClosed() + assertTrue(element.isValue) + assertTrue(element.value is TestException1) + assertTrue(element.valueOrNull is TestException1) + + val closed = channel.receiveOrClosed() + assertTrue(closed.isClosed) + assertTrue(closed.closeCause is TestException2) + } + + @Test + @Suppress("ReplaceAssertBooleanWithAssertEquality") // inline classes test + fun testNullableIntChanel() = runTest { + val channel = Channel() + launch { + expect(2) + channel.send(1) + expect(3) + channel.send(null) + + expect(6) + channel.close() + } + + expect(1) + val element = channel.receiveOrClosed() + assertTrue(element.isValue) + assertEquals(1, element.value) + assertEquals(1, element.valueOrNull) + assertEquals("Value(1)", element.toString()) + assertTrue(ValueOrClosed.value(1) == element) // Don't box + + expect(4) + val nullElement = channel.receiveOrClosed() + assertTrue(nullElement.isValue) + assertNull(nullElement.value) + assertNull(nullElement.valueOrNull) + assertEquals("Value(null)", nullElement.toString()) + assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box + + expect(5) + val closed = channel.receiveOrClosed() + assertTrue(closed.isClosed) + + val closed2 = channel.receiveOrClosed() + assertTrue(closed2.isClosed) + assertTrue(closed2.closeCause is ClosedReceiveChannelException) + finish(7) + } + + @Test + @ExperimentalUnsignedTypes + fun testUIntChannel() = runTest { + val channel = Channel() + launch { + expect(2) + channel.send(1u) + yield() + expect(4) + channel.send((Long.MAX_VALUE - 1).toUInt()) + expect(5) + } + + expect(1) + val element = channel.receiveOrClosed() + assertEquals(1u, element.value) + + expect(3) + val element2 = channel.receiveOrClosed() + assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value) + finish(6) + } + + @Test + fun testCancelChannel() = runTest { + val channel = Channel() + launch { + expect(2) + channel.cancel() + } + + expect(1) + val closed = channel.receiveOrClosed() + assertTrue(closed.isClosed) + assertTrue(closed.closeCause is ClosedReceiveChannelException) + finish(3) + } + + @Test + @ExperimentalUnsignedTypes + fun testReceiveResultChannel() = runTest { + val channel = Channel>() + launch { + channel.send(ValueOrClosed.value(1u)) + channel.send(ValueOrClosed.closed(TestException1())) + channel.close(TestException2()) + } + + val intResult = channel.receiveOrClosed() + assertTrue(intResult.isValue) + assertEquals(1u, intResult.value.value) + + val closeCauseResult = channel.receiveOrClosed() + assertTrue(closeCauseResult.isValue) + assertTrue(closeCauseResult.value.closeCause is TestException1) + + val closeCause = channel.receiveOrClosed() + assertTrue(closeCause.isClosed) + assertTrue(closeCause.closeCause is TestException2) + assertFailsWith { closeCause.valueOrThrow } + } + + @Test + fun testToString() = runTest { + val channel = Channel(1) + channel.send("message") + channel.close(TestException1()) + assertEquals("Value(message)", channel.receiveOrClosed().toString()) + // toString implementation for exception differs on every platform + val str = channel.receiveOrClosed().toString() + assertTrue(str.matches("Closed\\(.*TestException1\\)".toRegex())) + } +} diff --git a/docs/select-expression.md b/docs/select-expression.md index a9bf7f2f54..35480abf36 100644 --- a/docs/select-expression.md +++ b/docs/select-expression.md @@ -163,7 +163,7 @@ buzz -> 'Buzz!' ### Selecting on close The [onReceive][ReceiveChannel.onReceive] clause in `select` fails when the channel is closed causing the corresponding -`select` to throw an exception. We can use [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] clause to perform a +`select` to throw an exception. We can use [onReceiveOrNull][onReceiveOrNull] clause to perform a specific action when the channel is closed. The following example also shows that `select` is an expression that returns the result of its selected clause: @@ -189,6 +189,10 @@ suspend fun selectAorB(a: ReceiveChannel, b: ReceiveChannel): St +Note that [onReceiveOrNull][onReceiveOrNull] is an extension function defined only +for channels with non-nullable elements so that there is no accidental confusion between a closed channel +and a null value. + Let's use it with channel `a` that produces "Hello" string four times and channel `b` that produces "World" four times: @@ -259,7 +263,7 @@ the first one among them gets selected. Here, both channels are constantly produ being the first clause in select, wins. However, because we are using unbuffered channel, the `a` gets suspended from time to time on its [send][SendChannel.send] invocation and gives a chance for `b` to send, too. -The second observation, is that [onReceiveOrNull][ReceiveChannel.onReceiveOrNull] gets immediately selected when the +The second observation, is that [onReceiveOrNull][onReceiveOrNull] gets immediately selected when the channel is already closed. ### Selecting to send @@ -433,7 +437,7 @@ Deferred 4 produced answer 'Waited for 128 ms' Let us write a channel producer function that consumes a channel of deferred string values, waits for each received deferred value, but only until the next deferred value comes over or the channel is closed. This example puts together -[onReceiveOrNull][ReceiveChannel.onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`: +[onReceiveOrNull][onReceiveOrNull] and [onAwait][Deferred.onAwait] clauses in the same `select`:
@@ -556,7 +560,7 @@ Channel was closed [ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html [ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html -[ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html +[onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html [SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html [SendChannel.onSend]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/on-send.html diff --git a/kotlinx-coroutines-core/README.md b/kotlinx-coroutines-core/README.md index 2dc751ea2e..5fe3298173 100644 --- a/kotlinx-coroutines-core/README.md +++ b/kotlinx-coroutines-core/README.md @@ -56,7 +56,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] | [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none @@ -131,8 +131,8 @@ Obsolete and deprecated module to test coroutines. Replaced with `kotlinx-corout [kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html [kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html [kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html -[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html -[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html +[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html +[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html [kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html [kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html diff --git a/kotlinx-coroutines-core/common/README.md b/kotlinx-coroutines-core/common/README.md index b84cedf4d4..a0cc809127 100644 --- a/kotlinx-coroutines-core/common/README.md +++ b/kotlinx-coroutines-core/common/README.md @@ -59,7 +59,7 @@ helper function. [NonCancellable] job object is provided to suppress cancellatio | [Deferred] | [await][Deferred.await] | [onAwait][Deferred.onAwait] | [isCompleted][Job.isCompleted] | [SendChannel][kotlinx.coroutines.channels.SendChannel] | [send][kotlinx.coroutines.channels.SendChannel.send] | [onSend][kotlinx.coroutines.channels.SendChannel.onSend] | [offer][kotlinx.coroutines.channels.SendChannel.offer] | [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receive][kotlinx.coroutines.channels.ReceiveChannel.receive] | [onReceive][kotlinx.coroutines.channels.ReceiveChannel.onReceive] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] -| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] +| [ReceiveChannel][kotlinx.coroutines.channels.ReceiveChannel] | [receiveOrNull][kotlinx.coroutines.channels.receiveOrNull] | [onReceiveOrNull][kotlinx.coroutines.channels.onReceiveOrNull] | [poll][kotlinx.coroutines.channels.ReceiveChannel.poll] | [Mutex][kotlinx.coroutines.sync.Mutex] | [lock][kotlinx.coroutines.sync.Mutex.lock] | [onLock][kotlinx.coroutines.sync.Mutex.onLock] | [tryLock][kotlinx.coroutines.sync.Mutex.tryLock] | none | [delay] | [onTimeout][kotlinx.coroutines.selects.SelectBuilder.onTimeout] | none @@ -143,8 +143,8 @@ Low-level primitives for finer-grained control of coroutines. [kotlinx.coroutines.channels.SendChannel.offer]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html [kotlinx.coroutines.channels.ReceiveChannel.onReceive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive.html [kotlinx.coroutines.channels.ReceiveChannel.poll]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/poll.html -[kotlinx.coroutines.channels.ReceiveChannel.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive-or-null.html -[kotlinx.coroutines.channels.ReceiveChannel.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/on-receive-or-null.html +[kotlinx.coroutines.channels.receiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/receive-or-null.html +[kotlinx.coroutines.channels.onReceiveOrNull]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/on-receive-or-null.html [kotlinx.coroutines.selects.select]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/select.html [kotlinx.coroutines.selects.SelectBuilder.onTimeout]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.selects/-select-builder/on-timeout.html diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 6419e2087e..bed497909d 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -14,7 +14,6 @@ import kotlin.jvm.* /** * Abstract send channel. It is a base class for all send channel implementations. - * */ internal abstract class AbstractSendChannel : SendChannel { /** @suppress **This is unstable API and it is subject to change.** */ @@ -542,13 +541,12 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel - val receive = ReceiveElement(cont as CancellableContinuation, nullOnClose = false) + private suspend fun receiveSuspend(onClose: Int): R = suspendAtomicCancellableCoroutine sc@ { cont -> + val receive = ReceiveElement(cont as CancellableContinuation, onClose) while (true) { if (enqueueReceive(receive)) { removeReceiveOnCancel(cont, receive) @@ -568,11 +566,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel) { - cont.resumeWithException(result.receiveException) + receive.resumeReceiveClosed(result) return@sc } if (result !== POLL_FAILED) { - cont.resume(result as E) + cont.resume(receive.resumeValue(result as E)) return@sc } } @@ -586,13 +584,12 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : AbstractSendChannel(), Channel - val receive = ReceiveElement(cont, nullOnClose = true) - while (true) { - if (enqueueReceive(receive)) { - removeReceiveOnCancel(cont, receive) - return@sc - } - // hm... something is not right. try to poll - val result = pollInternal() - if (result is Closed<*>) { - if (result.closeCause == null) - cont.resume(null) - else - cont.resumeWithException(result.closeCause) - return@sc - } - if (result !== POLL_FAILED) { - cont.resume(result as E) - return@sc - } - } + public final override suspend fun receiveOrClosed(): ValueOrClosed { + // fast path -- try poll non-blocking + val result = pollInternal() + if (result !== POLL_FAILED) return result.toResult() + // slow-path does suspend + return receiveSuspend(RECEIVE_RESULT) } @Suppress("UNCHECKED_CAST") @@ -694,9 +676,9 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( select: SelectInstance, - block: suspend (E?) -> R, - nullOnClose: Boolean - ) : AddLastDesc>(queue, ReceiveSelect(select, block, nullOnClose)) { + block: suspend (Any?) -> R, + receiveMode: Int + ) : AddLastDesc>(queue, ReceiveSelect(select, block, receiveMode)) { override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? { if (affected is Send) return ENQUEUE_FAILED return null @@ -728,13 +710,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel R), nullOnClose = false) - val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return - when { - enqueueResult === ALREADY_SELECTED -> return - enqueueResult === ENQUEUE_FAILED -> {} // retry - else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") - } + if (registerEnqueueDesc(select, block, RECEIVE_THROWS_ON_CLOSE)) return } else { val pollResult = pollSelectInternal(select) when { @@ -762,13 +738,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel return - enqueueResult === ENQUEUE_FAILED -> {} // retry - else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") - } + if (registerEnqueueDesc(select, block, RECEIVE_NULL_ON_CLOSE)) return } else { val pollResult = pollSelectInternal(select) when { @@ -793,6 +763,51 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel> + get() = object : SelectClause1> { + override fun registerSelectClause1(select: SelectInstance, block: suspend (ValueOrClosed) -> R) { + registerSelectReceiveOrClosed(select, block) + } + } + + @Suppress("UNCHECKED_CAST") + private fun registerSelectReceiveOrClosed(select: SelectInstance, block: suspend (ValueOrClosed) -> R) { + while (true) { + if (select.isSelected) return + if (isEmpty) { + if (registerEnqueueDesc(select, block, RECEIVE_RESULT)) return + } else { + val pollResult = pollSelectInternal(select) + when { + pollResult === ALREADY_SELECTED -> return + pollResult === POLL_FAILED -> {} // retry + pollResult is Closed<*> -> { + block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.closeCause), select.completion) + } + else -> { + // selected successfully + block.startCoroutineUnintercepted(ValueOrClosed.value(pollResult as E), select.completion) + return + } + } + } + } + } + + private fun registerEnqueueDesc( + select: SelectInstance, block: suspend (E) -> R, + receiveMode: Int + ): Boolean { + @Suppress("UNCHECKED_CAST") + val enqueueOp = TryEnqueueReceiveDesc(select, block as suspend (Any?) -> R, receiveMode) + val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true + return when { + enqueueResult === ALREADY_SELECTED -> true + enqueueResult === ENQUEUE_FAILED -> false // retry + else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult") + } + } + // ------ protected ------ override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed? = @@ -884,18 +899,25 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( - @JvmField val cont: CancellableContinuation, - @JvmField val nullOnClose: Boolean + @JvmField val cont: CancellableContinuation, + @JvmField val receiveMode: Int ) : Receive() { - override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent) + fun resumeValue(value: E): Any? = when (receiveMode) { + RECEIVE_RESULT -> ValueOrClosed.value(value) + else -> value + } + + @Suppress("IMPLICIT_CAST_TO_ANY") + override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(resumeValue(value), idempotent) override fun completeResumeReceive(token: Any) = cont.completeResume(token) override fun resumeReceiveClosed(closed: Closed<*>) { - if (closed.closeCause == null && nullOnClose) - cont.resume(null) - else - cont.resumeWithException(closed.receiveException) + when { + receiveMode == RECEIVE_NULL_ON_CLOSE && closed.closeCause == null -> cont.resume(null) + receiveMode == RECEIVE_RESULT -> cont.resume(closed.toResult()) + else -> cont.resumeWithException(closed.receiveException) + } } - override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]" + override fun toString(): String = "ReceiveElement[$cont,receiveMode=$receiveMode]" } private class ReceiveHasNext( @@ -940,25 +962,26 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( @JvmField val select: SelectInstance, - @JvmField val block: suspend (E?) -> R, - @JvmField val nullOnClose: Boolean + @JvmField val block: suspend (Any?) -> R, + @JvmField val receiveMode: Int ) : Receive(), DisposableHandle { override fun tryResumeReceive(value: E, idempotent: Any?): Any? = if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(token: Any) { - block.startCoroutine(NULL_VALUE.unbox(token), select.completion) + val value: E = NULL_VALUE.unbox(token) + block.startCoroutine(if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion) } override fun resumeReceiveClosed(closed: Closed<*>) { - if (select.trySelect(null)) { - if (closed.closeCause == null && nullOnClose) { + if (!select.trySelect(null)) return + when (receiveMode) { + RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException) + RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed(closed.closeCause), select.completion) + RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) { block.startCoroutine(null, select.completion) } else { - // even though we are dispatching coroutine to process channel close on receive, - // which is an atomically cancellable suspending function, - // close is a final state, so we can use a cancellable resume mode select.resumeSelectCancellableWithException(closed.receiveException) } } @@ -973,7 +996,7 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel( @@ -982,6 +1005,11 @@ internal abstract class AbstractChannel : AbstractSendChannel(), Channel : LockFreeLinkedListNode(), ReceiveOrClosed override val offerResult get() = OFFER_SUCCESS abstract fun resumeReceiveClosed(closed: Closed<*>) } + +@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") +private inline fun Any?.toResult(): ValueOrClosed = + if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E) + +@Suppress("NOTHING_TO_INLINE") +private inline fun Closed<*>.toResult(): ValueOrClosed = ValueOrClosed.closed(closeCause) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index 9fa341830d..a2a6da2e0c 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -15,6 +15,7 @@ import kotlinx.coroutines.channels.Channel.Factory.CHANNEL_DEFAULT_CAPACITY import kotlinx.coroutines.internal.systemProp import kotlinx.coroutines.selects.* import kotlin.jvm.* +import kotlin.internal.* /** * Sender's interface to [Channel]. @@ -91,7 +92,8 @@ public interface SendChannel { * on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements * are received. * - * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send or receive. + * A channel that was closed without a [cause] throws [ClosedSendChannelException] on attempts to send + * and [ClosedReceiveChannelException] on attempts to receive. * A channel that was closed with non-null [cause] is called a _failed_ channel. Attempts to send or * receive on a failed channel throw the specified [cause] exception. */ @@ -209,25 +211,73 @@ public interface ReceiveChannel { * This function can be used in [select] invocation with [onReceiveOrNull] clause. * Use [poll] to try receiving from this channel without waiting. * - * **Note: This is an obsolete api.** - * This function will be replaced with `receiveOrClosed: ReceiveResult` and - * extension `suspend fun ReceiveChannel.receiveOrNull(): E?` - * It is obsolete because it does not distinguish closed channel and null elements. + * @suppress **Deprecated**: in favor of receiveOrClosed and receiveOrNull extension. */ @ObsoleteCoroutinesApi + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + @LowPriorityInOverloadResolution + @Deprecated( + message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("receiveOrNull", "kotlinx.coroutines.channels.receiveOrNull") + ) public suspend fun receiveOrNull(): E? /** * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that - * is received from the channel or selects with `null` if if the channel + * is received from the channel or selects with `null` if the channel * [isClosedForReceive] without cause. The [select] invocation fails with * the original [close][SendChannel.close] cause exception if the channel has _failed_. * - * **Note: This is an experimental api.** This function may be replaced with a better one in the future. + * @suppress **Deprecated**: in favor of onReceiveOrClosed and onReceiveOrNull extension. */ - @ExperimentalCoroutinesApi + @ObsoleteCoroutinesApi + @Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER") + @LowPriorityInOverloadResolution + @Deprecated( + message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension", + level = DeprecationLevel.WARNING, + replaceWith = ReplaceWith("onReceiveOrNull", "kotlinx.coroutines.channels.onReceiveOrNull") + ) public val onReceiveOrNull: SelectClause1 + /** + * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]. + * This method returns [ValueOrClosed] with a value if element was successfully retrieved from the channel + * or [ValueOrClosed] with close cause if channel was closed. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this + * function is suspended, this function immediately resumes with [CancellationException]. + * + * *Cancellation of suspended receive is atomic* -- when this function + * throws [CancellationException] it means that the element was not retrieved from this channel. + * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may + * continue to execute even after it was cancelled from the same thread in the case when this receive operation + * was already resumed and the continuation was posted for execution to the thread's queue. + * + * Note, that this function does not check for cancellation when it is not suspended. + * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * + * This function can be used in [select] invocation with [onReceiveOrClosed] clause. + * Use [poll] to try receiving from this channel without waiting. + * + * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and + * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + */ + @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed + public suspend fun receiveOrClosed(): ValueOrClosed + + /** + * Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value + * that is received from the channel or selects with [ValueOrClosed] with a close cause if the channel + * [isClosedForReceive]. + * + * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and + * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + */ + @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed + public val onReceiveOrClosed: SelectClause1> + /** * Retrieves and removes the element from this channel, or returns `null` if this channel is empty * or is [isClosedForReceive] without cause. @@ -271,6 +321,107 @@ public interface ReceiveChannel { public fun cancel(cause: Throwable? = null): Boolean } +/** + * A discriminated union of [ReceiveChannel.receiveOrClosed] result, + * that encapsulates either successfully received element of type [T] from the channel or a close cause. + * + * :todo: Do not make it public before resolving todos in the code of this class. + * + * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and + * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + */ +@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS") +@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed +public inline class ValueOrClosed +internal constructor(private val holder: Any?) { + /** + * Returns `true` if this instance represents received element. + * In this case [isClosed] returns `false`. + * todo: it is commented for now, because it is not used + */ + //public val isValue: Boolean get() = holder !is Closed + + /** + * Returns `true` if this instance represents close cause. + * In this case [isValue] returns `false`. + */ + public val isClosed: Boolean get() = holder is Closed + + /** + * Returns received value if this instance represents received value or throws [IllegalStateException] otherwise. + * + * :todo: Decide if it is needed how it shall be named with relation to [valueOrThrow]: + * + * So we have the following methods on ValueOrClosed: `value`, `valueOrNull`, `valueOrThrow`. + * On the other hand, the channel has the following receive variants: + * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh? + * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull` + * * `receiveOrClosed` + * For the sake of consider dropping this version of `value` and rename [valueOrThrow] to simply `value`. + */ + @Suppress("UNCHECKED_CAST") + public val value: T + get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T + + /** + * Returns received value if this element represents received value or `null` otherwise. + * :todo: Decide if it shall be made into extension that is available only for non-null T. + * Note: it might become inconsistent with kotlin.Result + */ + @Suppress("UNCHECKED_CAST") + public val valueOrNull: T? + get() = if (holder is Closed) null else holder as T + + /** + * :todo: Decide if it is needed how it shall be named with relation to [value]. + * Note, that valueOrThrow rethrows the cause adding no meaningful information about the callsite, + * so if one is sure that ValueOrClosed is always value, this very property should be used. + * Otherwise, it could be very hard to locate the source of the exception. + * todo: it is commented for now, because it is not used + */ + //@Suppress("UNCHECKED_CAST") + //public val valueOrThrow: T + // get() = if (holder is Closed) throw holder.exception else holder as T + + /** + * Returns close cause of the channel if this instance represents close cause or throws + * [IllegalStateException] otherwise. + */ + @Suppress("UNCHECKED_CAST") + public val closeCause: Throwable? get() = + if (holder is Closed) holder.cause else error("Channel was not closed") + + /** + * @suppress + */ + public override fun toString(): String = + when (holder) { + is Closed -> holder.toString() + else -> "Value($holder)" + } + + internal class Closed(@JvmField val cause: Throwable?) { + // todo: it is commented for now, because it is not used + //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE) + override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause + override fun hashCode(): Int = cause.hashCode() + override fun toString(): String = "Closed($cause)" + } + + /** + * todo: consider making value/closed constructors public in the future. + */ + internal companion object { + @Suppress("NOTHING_TO_INLINE") + internal inline fun value(value: E): ValueOrClosed = + ValueOrClosed(value) + + @Suppress("NOTHING_TO_INLINE") + internal inline fun closed(cause: Throwable?): ValueOrClosed = + ValueOrClosed(Closed(cause)) + } +} + /** * Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used * from concurrent coroutines. diff --git a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt index 352c8c1aa1..cd37bfbc2e 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channels.common.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channels.common.kt @@ -8,6 +8,7 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* +import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.jvm.* @@ -33,6 +34,49 @@ public inline fun BroadcastChannel.consume(block: ReceiveChannel.() } } +/** + * Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty] + * or returns `null` if the channel is [closed][Channel.isClosedForReceive]. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this + * function is suspended, this function immediately resumes with [CancellationException]. + * + * *Cancellation of suspended receive is atomic* -- when this function + * throws [CancellationException] it means that the element was not retrieved from this channel. + * As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may + * continue to execute even after it was cancelled from the same thread in the case when this receive operation + * was already resumed and the continuation was posted for execution to the thread's queue. + * + * Note, that this function does not check for cancellation when it is not suspended. + * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. + * + * This extension is defined only for channels on non-null types, so that generic functions defined using + * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard + * to find bugs. + */ +@Suppress("EXTENSION_SHADOWED_BY_MEMBER") +@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0 +public suspend fun ReceiveChannel.receiveOrNull(): E? { + @Suppress("DEPRECATION", "UNCHECKED_CAST") + return (this as ReceiveChannel).receiveOrNull() +} + +/** + * Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that + * is received from the channel or selects with `null` if the channel + * [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with + * the original [close][SendChannel.close] cause exception if the channel has _failed_. + * + * This extension is defined only for channels on non-null types, so that generic functions defined using + * these extensions do not accidentally confuse `null` value and a normally closed channel, leading to hard + * to find bugs. + **/ +@ExperimentalCoroutinesApi // since 1.3.0, tentatively stable in 1.4.0 +public fun ReceiveChannel.onReceiveOrNull(): SelectClause1 { + @Suppress("DEPRECATION", "UNCHECKED_CAST") + return (this as ReceiveChannel).onReceiveOrNull +} + /** * Subscribes to this [BroadcastChannel] and performs the specified action for each received element. * diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index 167edba53e..a6ddd81185 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -34,6 +34,11 @@ class BasicOperationsTest : TestBase() { TestChannelKind.values().forEach { kind -> testReceiveOrNullException(kind) } } + @Test + fun testReceiveOrClosed() = runTest { + TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) } + } + @Test fun testInvokeOnClose() = TestChannelKind.values().forEach { kind -> reset() @@ -124,6 +129,34 @@ class BasicOperationsTest : TestBase() { assertTrue(d.getCancellationException().cause is TestException) } + @Suppress("ReplaceAssertBooleanWithAssertEquality") + private suspend fun testReceiveOrClosed(kind: TestChannelKind) = coroutineScope { + reset() + val channel = kind.create() + launch { + expect(2) + channel.send(1) + } + + expect(1) + val result = channel.receiveOrClosed() + assertEquals(1, result.value) + assertEquals(1, result.valueOrNull) + assertTrue(ValueOrClosed.value(1) == result) + + expect(3) + launch { + expect(4) + channel.close() + } + val closed = channel.receiveOrClosed() + expect(5) + assertNull(closed.valueOrNull) + assertTrue(closed.isClosed) + assertNull(closed.closeCause) + assertTrue(ValueOrClosed.closed(closed.closeCause) == closed) + finish(6) + } private suspend fun testOffer(kind: TestChannelKind) = coroutineScope { val channel = kind.create() diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index 465699e27c..27c58165c1 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -58,6 +58,7 @@ private class ChannelViaBroadcast( override suspend fun receive(): E = sub.receive() override suspend fun receiveOrNull(): E? = sub.receiveOrNull() + override suspend fun receiveOrClosed(): ValueOrClosed = sub.receiveOrClosed() override fun poll(): E? = sub.poll() override fun iterator(): ChannelIterator = sub.iterator() @@ -71,4 +72,6 @@ private class ChannelViaBroadcast( get() = sub.onReceive override val onReceiveOrNull: SelectClause1 get() = sub.onReceiveOrNull + override val onReceiveOrClosed: SelectClause1> + get() = sub.onReceiveOrClosed } diff --git a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt index 3c74b0fb41..306c241376 100644 --- a/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/channels/ChannelBuildersFlowTest.kt @@ -9,6 +9,46 @@ import kotlinx.coroutines.channels.* import kotlin.test.* class ChannelBuildersFlowTest : TestBase() { + @Test + fun testChannelConsumeAsFlow() = runTest { + val channel = produce { + repeat(10) { + send(it + 1) + } + } + val flow = channel.consumeAsFlow() + assertEquals(55, flow.sum()) + assertFailsWith { flow.collect() } + } + + @Test + fun testConsumeAsFlowCancellation() = runTest { + val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well + repeat(10) { + send(it + 1) + } + throw TestException() + } + val flow = channel.consumeAsFlow() + assertEquals(15, flow.take(5).sum()) + // the channel should have been canceled, even though took only 5 elements + assertTrue(channel.isClosedForReceive) + assertFailsWith { flow.collect() } + } + + @Test + fun testConsumeAsFlowException() = runTest { + val channel = produce(NonCancellable) { // otherwise failure will cancel scope as well + repeat(10) { + send(it + 1) + } + throw TestException() + } + val flow = channel.consumeAsFlow() + assertFailsWith { flow.sum() } + assertFailsWith { flow.collect() } + } + @Test fun testBroadcastChannelAsFlow() = runTest { val channel = broadcast { diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt index f8c3439b9e..ece95db19a 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt @@ -284,6 +284,104 @@ class SelectArrayChannelTest : TestBase() { finish(10) } + @Test + fun testSelectReceiveOrClosedWaitClosed() = runTest { + expect(1) + val channel = Channel(1) + launch { + expect(3) + channel.close() + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertNull(it.closeCause) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + expect(1) + val channel = Channel(1) + launch { + expect(3) + channel.close(TestException()) + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is TestException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosed() = runTest { + val c = Channel(1) + val iterations = 10 + expect(1) + val job = launch { + repeat(iterations) { + select { + c.onReceiveOrClosed { v -> + expect(4 + it * 2) + assertEquals(it, v.value) + } + } + } + } + + expect(2) + repeat(iterations) { + expect(3 + it * 2) + c.send(it) + yield() + } + + job.join() + finish(3 + iterations * 2) + } + + @Test + fun testSelectReceiveOrClosedDispatch() = runTest { + val c = Channel(1) + expect(1) + launch { + expect(3) + val res = select { + c.onReceiveOrClosed { v -> + expect(6) + assertEquals(42, v.value) + yield() // back to main + expect(8) + "OK" + } + } + expect(9) + assertEquals("OK", res) + } + expect(2) + yield() // to launch + expect(4) + c.send(42) // do not suspend + expect(5) + yield() // to receive + expect(7) + yield() // again + finish(10) + } + // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt index 0c1f9f6bc7..ed8b8d3691 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -305,6 +305,104 @@ class SelectRendezvousChannelTest : TestBase() { finish(10) } + @Test + fun testSelectReceiveOrClosedWaitClosed() = runTest { + expect(1) + val channel = Channel(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close() + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertNull(it.closeCause) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + expect(1) + val channel = Channel(Channel.RENDEZVOUS) + launch { + expect(3) + channel.close(TestException()) + expect(4) + } + expect(2) + select { + channel.onReceiveOrClosed { + expect(5) + assertTrue(it.isClosed) + assertTrue(it.closeCause is TestException) + } + } + + finish(6) + } + + @Test + fun testSelectReceiveOrClosed() = runTest { + val channel = Channel(Channel.RENDEZVOUS) + val iterations = 10 + expect(1) + val job = launch { + repeat(iterations) { + select { + channel.onReceiveOrClosed { v -> + expect(4 + it * 2) + assertEquals(it, v.value) + } + } + } + } + + expect(2) + repeat(iterations) { + expect(3 + it * 2) + channel.send(it) + yield() + } + + job.join() + finish(3 + iterations * 2) + } + + @Test + fun testSelectReceiveOrClosedDispatch() = runTest { + val c = Channel(Channel.RENDEZVOUS) + expect(1) + launch { + expect(3) + val res = select { + c.onReceiveOrClosed { v -> + expect(6) + assertEquals(42, v.value) + yield() // back to main + expect(8) + "OK" + } + } + expect(9) + assertEquals("OK", res) + } + expect(2) + yield() // to launch + expect(4) + c.send(42) // do not suspend + expect(5) + yield() // to receive + expect(7) + yield() // again + finish(10) + } + // only for debugging internal fun SelectBuilder.default(block: suspend () -> R) { this as SelectBuilderImpl // type assertion diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index cffe6c0e20..5178907875 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -112,18 +112,15 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() var sum = 0 var n = 0 whileSelect { - this@averageInTimeWindow.onReceiveOrNull { - when (it) { - null -> { - // Send leftovers and bail out - if (n != 0) send(sum / n.toDouble()) - false - } - else -> { - sum += it - ++n - true - } + this@averageInTimeWindow.onReceiveOrClosed { + if (it.isClosed) { + // Send leftovers and bail out + if (n != 0) send(sum / n.toDouble()) + false + } else { + sum += it.value + ++n + true } }