From d80db631548d2514838faa1b0aa5689f2cb9f606 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 18 Mar 2021 16:58:55 +0300 Subject: [PATCH] Introduce ChannelResult and receiveCatching/onReceiveCatching * Introduce three-state ChannelResult, a domain-specific Result class counterpart * Introduce receiveCatching/onReceiveCatching, make it public * Get rid of receiveOrClosed/onReceiveOrClosed without migrations, it was @InternalCoroutinesApi anyway Fixes #330 --- .../api/kotlinx-coroutines-core.api | 41 +++--- .../common/src/channels/AbstractChannel.kt | 26 ++-- .../common/src/channels/Channel.kt | 126 +++++++----------- .../common/src/flow/Channels.kt | 8 +- .../test/channels/BasicOperationsTest.kt | 20 +-- ...dTest.kt => ChannelReceiveCatchingTest.kt} | 68 +++++----- .../ChannelUndeliveredElementFailureTest.kt | 8 +- .../common/test/channels/TestChannelKind.kt | 8 +- .../test/selects/SelectArrayChannelTest.kt | 28 ++-- .../selects/SelectRendezvousChannelTest.kt | 32 ++--- .../ChannelUndeliveredElementStressTest.kt | 4 +- .../test/channels/TickerChannelCommonTest.kt | 6 +- 12 files changed, 172 insertions(+), 203 deletions(-) rename kotlinx-coroutines-core/common/test/channels/{ChannelReceiveOrClosedTest.kt => ChannelReceiveCatchingTest.kt} (54%) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 1d16d31086..985f17a1da 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -625,6 +625,26 @@ public final class kotlinx/coroutines/channels/ChannelKt { public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; } +public final class kotlinx/coroutines/channels/ChannelResult { + public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion; + public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult; + public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object; + public fun equals (Ljava/lang/Object;)Z + public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z + public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z + public static final fun exceptionOrNull-impl (Ljava/lang/Object;)Ljava/lang/Throwable; + public static final fun getOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object; + public static final fun getOrThrow-impl (Ljava/lang/Object;)Ljava/lang/Object; + public fun hashCode ()I + public static fun hashCode-impl (Ljava/lang/Object;)I + public static final fun isClosed-impl (Ljava/lang/Object;)Z + public static final fun isFailure-impl (Ljava/lang/Object;)Z + public static final fun isSuccess-impl (Ljava/lang/Object;)Z + public fun toString ()Ljava/lang/String; + public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String; + public final synthetic fun unbox-impl ()Ljava/lang/Object; +} + public final class kotlinx/coroutines/channels/ChannelsKt { public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -789,14 +809,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel { public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1; - public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1; + public abstract fun getOnReceiveCatching ()Lkotlinx/coroutines/selects/SelectClause1; public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1; public abstract fun isClosedForReceive ()Z public abstract fun isEmpty ()Z public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator; public abstract fun poll ()Ljava/lang/Object; public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun receiveOrClosed-WVj179g (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } @@ -832,23 +852,6 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum { public static fun values ()[Lkotlinx/coroutines/channels/TickerMode; } -public final class kotlinx/coroutines/channels/ValueOrClosed { - public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion; - public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed; - public fun equals (Ljava/lang/Object;)Z - public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z - public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z - public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable; - public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object; - public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object; - public fun hashCode ()I - public static fun hashCode-impl (Ljava/lang/Object;)I - public static final fun isClosed-impl (Ljava/lang/Object;)Z - public fun toString ()Ljava/lang/String; - public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String; - public final synthetic fun unbox-impl ()Ljava/lang/Object; -} - public final class kotlinx/coroutines/debug/internal/DebugCoroutineInfo { public fun (Lkotlinx/coroutines/debug/internal/DebugCoroutineInfoImpl;Lkotlin/coroutines/CoroutineContext;)V public final fun getContext ()Lkotlin/coroutines/CoroutineContext; diff --git a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt index 9721583e83..180598c94f 100644 --- a/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt @@ -623,7 +623,7 @@ internal abstract class AbstractChannel( } @Suppress("UNCHECKED_CAST") - public final override suspend fun receiveOrClosed(): ValueOrClosed { + public final override suspend fun receiveCatching(): ChannelResult { // fast path -- try poll non-blocking val result = pollInternal() if (result !== POLL_FAILED) return result.toResult() @@ -742,10 +742,10 @@ internal abstract class AbstractChannel( } } - final override val onReceiveOrClosed: SelectClause1> - get() = object : SelectClause1> { + final override val onReceiveCatching: SelectClause1> + get() = object : SelectClause1> { @Suppress("UNCHECKED_CAST") - override fun registerSelectClause1(select: SelectInstance, block: suspend (ValueOrClosed) -> R) { + override fun registerSelectClause1(select: SelectInstance, block: suspend (ChannelResult) -> R) { registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R) } } @@ -776,7 +776,7 @@ internal abstract class AbstractChannel( } RECEIVE_RESULT -> { if (!select.trySelect()) return - startCoroutineUnintercepted(ValueOrClosed.closed(value.closeCause), select.completion) + startCoroutineUnintercepted(ChannelResult.closed(value.closeCause), select.completion) } RECEIVE_NULL_ON_CLOSE -> { if (value.closeCause == null) { @@ -905,7 +905,7 @@ internal abstract class AbstractChannel( @JvmField val receiveMode: Int ) : Receive() { fun resumeValue(value: E): Any? = when (receiveMode) { - RECEIVE_RESULT -> ValueOrClosed.value(value) + RECEIVE_RESULT -> ChannelResult.value(value) else -> value } @@ -990,7 +990,7 @@ internal abstract class AbstractChannel( @Suppress("UNCHECKED_CAST") override fun completeResumeReceive(value: E) { block.startCoroutineCancellable( - if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value, + if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value, select.completion, resumeOnCancellationFun(value) ) @@ -1000,7 +1000,7 @@ internal abstract class AbstractChannel( if (!select.trySelect()) return when (receiveMode) { RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException) - RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed(closed.closeCause), select.completion) + RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed(closed.closeCause), select.completion) RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) { block.startCoroutineCancellable(null, select.completion) } else { @@ -1128,9 +1128,9 @@ internal class Closed( override val offerResult get() = this override val pollResult get() = this - override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeSend() {} - override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() } + override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() } override fun completeResumeReceive(value: E) {} override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked" override fun toString(): String = "Closed@$hexAddress[$closeCause]" @@ -1143,8 +1143,8 @@ internal abstract class Receive : LockFreeLinkedListNode(), ReceiveOrClose } @Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST") -private inline fun Any?.toResult(): ValueOrClosed = - if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E) +private inline fun Any?.toResult(): ChannelResult = + if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E) @Suppress("NOTHING_TO_INLINE") -private inline fun Closed<*>.toResult(): ValueOrClosed = ValueOrClosed.closed(closeCause) +private inline fun Closed<*>.toResult(): ChannelResult = ChannelResult.closed(closeCause) diff --git a/kotlinx-coroutines-core/common/src/channels/Channel.kt b/kotlinx-coroutines-core/common/src/channels/Channel.kt index b8b81aac53..6a2517adaf 100644 --- a/kotlinx-coroutines-core/common/src/channels/Channel.kt +++ b/kotlinx-coroutines-core/common/src/channels/Channel.kt @@ -244,8 +244,8 @@ public interface ReceiveChannel { /** * Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty. - * This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel - * or the close cause if the channel was closed. + * This method returns [ChannelResult] with the value of an element successfully retrieved from the channel + * or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally. * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with a [CancellationException]. @@ -257,25 +257,17 @@ public interface ReceiveChannel { * Note that this function does not check for cancellation when it is not suspended. * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed. * - * This function can be used in [select] invocations with the [onReceiveOrClosed] clause. + * This function can be used in [select] invocations with the [onReceiveCatching] clause. * Use [poll] to try receiving from this channel without waiting. - * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. */ - @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public suspend fun receiveOrClosed(): ValueOrClosed + public suspend fun receiveCatching(): ChannelResult /** - * Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value + * Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value * that is received from the channel or with a close cause if the channel * [is closed for `receive`][isClosedForReceive]. - * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. */ - @InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed - public val onReceiveOrClosed: SelectClause1> + public val onReceiveCatching: SelectClause1> /** * Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty @@ -321,104 +313,78 @@ public interface ReceiveChannel { } /** - * A discriminated union of [ReceiveChannel.receiveOrClosed] result - * that encapsulates either an element of type [T] successfully received from the channel or a close cause. - * - * :todo: Do not make it public before resolving todos in the code of this class. + * A discriminated union of channel operation result. + * It encapsulates either a successful result of the channel operation or a closed cause of a channel. * - * @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and - * [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed. + * Successful result may represent a successfully received value of type [T], for example as a result of [Channel.receiveCatching] + * operation or a successfully sent element as a result of [Channel.trySend]. */ -@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING") -@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed -public inline class ValueOrClosed -internal constructor(private val holder: Any?) { - /** - * Returns `true` if this instance represents a received element. - * In this case [isClosed] returns `false`. - * todo: it is commented for now, because it is not used - */ - //public val isValue: Boolean get() = holder !is Closed - +@Suppress("UNCHECKED_CAST") +public inline class ChannelResult +private constructor(private val holder: Any?) { /** - * Returns `true` if this instance represents a close cause. - * In this case [isValue] returns `false`. + * Returns `true` if this instance represents a successful + * operation outcome. + * + * In this case [isFailure] and [isClosed] return false. */ - public val isClosed: Boolean get() = holder is Closed + public val isSuccess: Boolean get() = holder !is Closed /** - * Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise. - * - * :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]: + * Returns true if this instance represents a failed outcome. * - * So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`. - * On the other hand, the channel has the following `receive` variants: - * * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh? - * * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull` - * * `receiveOrClosed` - * For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`. + * In this case [isSuccess] returns false and [isClosed] returns true, + * but it does not imply that [exceptionOrNull] will return non-null value. */ - @Suppress("UNCHECKED_CAST") - public val value: T - get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T + public val isFailure: Boolean get() = holder is Closed && holder.cause != null /** - * Returns the received value if this element represents a received value, or `null` otherwise. - * :todo: Decide if it shall be made into extension that is available only for non-null T. - * Note: it might become inconsistent with kotlin.Result + * Returns `true` if this instance represents unsuccessful operation + * to closed channel. + * + * In this case [isSuccess] returns false, but it doesn't imply + * that [isFailure] returns true or that [exceptionOrNull] returns non-null value. + * It can happen if the channel was [closed][Channel.close] normally without an exception. */ - @Suppress("UNCHECKED_CAST") - public val valueOrNull: T? - get() = if (holder is Closed) null else holder as T + public val isClosed: Boolean get() = holder is Closed /** - * :todo: Decide, if it is needed, how it shall be named with relation to [value]. - * Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site, - * so if one is sure that `ValueOrClosed` always holds a value, this very property should be used. - * Otherwise, it could be very hard to locate the source of the exception. - * todo: it is commented for now, because it is not used + * Returns the encapsulated value if this instance represents success or `null` if it is closed or failed. */ - //@Suppress("UNCHECKED_CAST") - //public val valueOrThrow: T - // get() = if (holder is Closed) throw holder.exception else holder as T + public fun getOrNull(): T? = if (holder !is Closed) holder as T else null /** - * Returns the close cause of the channel if this instance represents a close cause, or throws - * an [IllegalStateException] otherwise. + * Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed. */ - @Suppress("UNCHECKED_CAST") - public val closeCause: Throwable? get() = - if (holder is Closed) holder.cause else error("Channel was not closed") + public fun getOrThrow(): T = if (holder !is Closed) holder as T else error("Trying to call 'getOrThrow' on a closed channel result") /** - * @suppress + * Returns the encapsulated exception if this instance represents failure or null if it is success + * or unsuccessful operation to closed channel. */ - public override fun toString(): String = - when (holder) { - is Closed -> holder.toString() - else -> "Value($holder)" - } + public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause internal class Closed(@JvmField val cause: Throwable?) { - // todo: it is commented for now, because it is not used - //val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE) override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause override fun hashCode(): Int = cause.hashCode() override fun toString(): String = "Closed($cause)" } - /** - * todo: consider making value/closed constructors public in the future. - */ internal companion object { @Suppress("NOTHING_TO_INLINE") - internal inline fun value(value: E): ValueOrClosed = - ValueOrClosed(value) + internal inline fun value(value: E): ChannelResult = + ChannelResult(value) @Suppress("NOTHING_TO_INLINE") - internal inline fun closed(cause: Throwable?): ValueOrClosed = - ValueOrClosed(Closed(cause)) + internal inline fun closed(cause: Throwable?): ChannelResult = + ChannelResult(Closed(cause)) } + + public override fun toString(): String = + when (holder) { + is Closed -> holder.toString() + else -> "Value($holder)" + } } /** diff --git a/kotlinx-coroutines-core/common/src/flow/Channels.kt b/kotlinx-coroutines-core/common/src/flow/Channels.kt index e883c3b468..5b126f0ed7 100644 --- a/kotlinx-coroutines-core/common/src/flow/Channels.kt +++ b/kotlinx-coroutines-core/common/src/flow/Channels.kt @@ -30,7 +30,7 @@ public suspend fun FlowCollector.emitAll(channel: ReceiveChannel): Uni emitAllImpl(channel, consume = true) private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, consume: Boolean) { - // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveOrClosed". + // Manually inlined "consumeEach" implementation that does not use iterator but works via "receiveCatching". // It has smaller and more efficient spilled state which also allows to implement a manual kludge to // fix retention of the last emitted value. // See https://youtrack.jetbrains.com/issue/KT-16222 @@ -47,9 +47,9 @@ private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, // L$1 <- channel // L$2 <- cause // L$3 <- this$run (actually equal to this) - val result = run { channel.receiveOrClosed() } + val result = run { channel.receiveCatching() } if (result.isClosed) { - result.closeCause?.let { throw it } + result.exceptionOrNull()?.let { throw it } break // returns normally when result.closeCause == null } // result is spilled here to the coroutine state and retained after the call, even though @@ -58,7 +58,7 @@ private suspend fun FlowCollector.emitAllImpl(channel: ReceiveChannel, // L$1 <- channel // L$2 <- cause // L$3 <- result - emit(result.value) + emit(result.getOrThrow()) } } catch (e: Throwable) { cause = e diff --git a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt index 91d941b32c..e178f124a7 100644 --- a/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/BasicOperationsTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -35,7 +35,7 @@ class BasicOperationsTest : TestBase() { } @Test - fun testReceiveOrClosed() = runTest { + fun testReceiveCatching() = runTest { TestChannelKind.values().forEach { kind -> testReceiveOrClosed(kind) } } @@ -139,22 +139,22 @@ class BasicOperationsTest : TestBase() { } expect(1) - val result = channel.receiveOrClosed() - assertEquals(1, result.value) - assertEquals(1, result.valueOrNull) - assertTrue(ValueOrClosed.value(1) == result) + val result = channel.receiveCatching() + assertEquals(1, result.getOrThrow()) + assertEquals(1, result.getOrNull()) + assertTrue(ChannelResult.value(1) == result) expect(3) launch { expect(4) channel.close() } - val closed = channel.receiveOrClosed() + val closed = channel.receiveCatching() expect(5) - assertNull(closed.valueOrNull) + assertNull(closed.getOrNull()) assertTrue(closed.isClosed) - assertNull(closed.closeCause) - assertTrue(ValueOrClosed.closed(closed.closeCause) == closed) + assertNull(closed.exceptionOrNull()) + assertTrue(ChannelResult.closed(closed.exceptionOrNull()) == closed) finish(6) } diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt similarity index 54% rename from kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt rename to kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt index e58b0deed5..b869b41928 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelReceiveOrClosedTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelReceiveCatchingTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -7,7 +7,7 @@ package kotlinx.coroutines.channels import kotlinx.coroutines.* import kotlin.test.* -class ChannelReceiveOrClosedTest : TestBase() { +class ChannelReceiveCatchingTest : TestBase() { @Test fun testChannelOfThrowables() = runTest { val channel = Channel() @@ -16,13 +16,13 @@ class ChannelReceiveOrClosedTest : TestBase() { channel.close(TestException2()) } - val element = channel.receiveOrClosed() - assertTrue(element.value is TestException1) - assertTrue(element.valueOrNull is TestException1) + val element = channel.receiveCatching() + assertTrue(element.getOrThrow() is TestException1) + assertTrue(element.getOrNull() is TestException1) - val closed = channel.receiveOrClosed() + val closed = channel.receiveCatching() assertTrue(closed.isClosed) - assertTrue(closed.closeCause is TestException2) + assertTrue(closed.exceptionOrNull() is TestException2) } @Test @@ -40,26 +40,26 @@ class ChannelReceiveOrClosedTest : TestBase() { } expect(1) - val element = channel.receiveOrClosed() - assertEquals(1, element.value) - assertEquals(1, element.valueOrNull) + val element = channel.receiveCatching() + assertEquals(1, element.getOrThrow()) + assertEquals(1, element.getOrNull()) assertEquals("Value(1)", element.toString()) - assertTrue(ValueOrClosed.value(1) == element) // Don't box + assertTrue(ChannelResult.value(1) == element) // Don't box expect(4) - val nullElement = channel.receiveOrClosed() - assertNull(nullElement.value) - assertNull(nullElement.valueOrNull) + val nullElement = channel.receiveCatching() + assertNull(nullElement.getOrThrow()) + assertNull(nullElement.getOrNull()) assertEquals("Value(null)", nullElement.toString()) - assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box + assertTrue(ChannelResult.value(null) == nullElement) // Don't box expect(5) - val closed = channel.receiveOrClosed() + val closed = channel.receiveCatching() assertTrue(closed.isClosed) - val closed2 = channel.receiveOrClosed() + val closed2 = channel.receiveCatching() assertTrue(closed2.isClosed) - assertNull(closed2.closeCause) + assertNull(closed2.exceptionOrNull()) finish(7) } @@ -77,12 +77,12 @@ class ChannelReceiveOrClosedTest : TestBase() { } expect(1) - val element = channel.receiveOrClosed() - assertEquals(1u, element.value) + val element = channel.receiveCatching() + assertEquals(1u, element.getOrThrow()) expect(3) - val element2 = channel.receiveOrClosed() - assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.value) + val element2 = channel.receiveCatching() + assertEquals((Long.MAX_VALUE - 1).toUInt(), element2.getOrThrow()) finish(6) } @@ -95,7 +95,7 @@ class ChannelReceiveOrClosedTest : TestBase() { } expect(1) - val closed = channel.receiveOrClosed() + val closed = channel.receiveCatching() assertTrue(closed.isClosed) finish(3) } @@ -103,22 +103,22 @@ class ChannelReceiveOrClosedTest : TestBase() { @Test @ExperimentalUnsignedTypes fun testReceiveResultChannel() = runTest { - val channel = Channel>() + val channel = Channel>() launch { - channel.send(ValueOrClosed.value(1u)) - channel.send(ValueOrClosed.closed(TestException1())) + channel.send(ChannelResult.value(1u)) + channel.send(ChannelResult.closed(TestException1())) channel.close(TestException2()) } - val intResult = channel.receiveOrClosed() - assertEquals(1u, intResult.value.value) + val intResult = channel.receiveCatching() + assertEquals(1u, intResult.getOrThrow().getOrThrow()) - val closeCauseResult = channel.receiveOrClosed() - assertTrue(closeCauseResult.value.closeCause is TestException1) + val closeCauseResult = channel.receiveCatching() + assertTrue(closeCauseResult.getOrThrow().exceptionOrNull() is TestException1) - val closeCause = channel.receiveOrClosed() + val closeCause = channel.receiveCatching() assertTrue(closeCause.isClosed) - assertTrue(closeCause.closeCause is TestException2) + assertTrue(closeCause.exceptionOrNull() is TestException2) } @Test @@ -126,9 +126,9 @@ class ChannelReceiveOrClosedTest : TestBase() { val channel = Channel(1) channel.send("message") channel.close(TestException1("OK")) - assertEquals("Value(message)", channel.receiveOrClosed().toString()) + assertEquals("Value(message)", channel.receiveCatching().toString()) // toString implementation for exception differs on every platform - val str = channel.receiveOrClosed().toString() + val str = channel.receiveCatching().toString() if (!str.matches("Closed\\(.*TestException1: OK\\)".toRegex())) error("Unexpected string: '$str'") } diff --git a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt index d2ef3d2691..37db7e4526 100644 --- a/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt +++ b/kotlinx-coroutines-core/common/test/channels/ChannelUndeliveredElementFailureTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -99,7 +99,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() { fun testReceiveOrClosedCancelledFail() = runTest(unhandled = shouldBeUnhandled) { val channel = Channel(onUndeliveredElement = onCancelFail) val job = launch(start = CoroutineStart.UNDISPATCHED) { - channel.receiveOrClosed() + channel.receiveCatching() expectUnreached() // will be cancelled before it dispatches } channel.send(item) @@ -111,7 +111,7 @@ class ChannelUndeliveredElementFailureTest : TestBase() { val channel = Channel(onUndeliveredElement = onCancelFail) val job = launch(start = CoroutineStart.UNDISPATCHED) { select { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expectUnreached() } } @@ -140,4 +140,4 @@ class ChannelUndeliveredElementFailureTest : TestBase() { expectUnreached() } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt index 993be78e17..d28d912d17 100644 --- a/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt +++ b/kotlinx-coroutines-core/common/test/channels/TestChannelKind.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -43,7 +43,7 @@ private class ChannelViaBroadcast( override suspend fun receive(): E = sub.receive() override suspend fun receiveOrNull(): E? = sub.receiveOrNull() - override suspend fun receiveOrClosed(): ValueOrClosed = sub.receiveOrClosed() + override suspend fun receiveCatching(): ChannelResult = sub.receiveCatching() override fun poll(): E? = sub.poll() override fun iterator(): ChannelIterator = sub.iterator() @@ -57,6 +57,6 @@ private class ChannelViaBroadcast( get() = sub.onReceive override val onReceiveOrNull: SelectClause1 get() = sub.onReceiveOrNull - override val onReceiveOrClosed: SelectClause1> - get() = sub.onReceiveOrClosed + override val onReceiveCatching: SelectClause1> + get() = sub.onReceiveCatching } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt index a4f8c3ba2a..0158c84307 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectArrayChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.selects @@ -295,10 +295,10 @@ class SelectArrayChannelTest : TestBase() { } expect(2) select { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) } } @@ -316,10 +316,10 @@ class SelectArrayChannelTest : TestBase() { } expect(2) select { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertTrue(it.closeCause is TestException) + assertTrue(it.exceptionOrNull() is TestException) } } @@ -327,16 +327,16 @@ class SelectArrayChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosed() = runTest { + fun testSelectReceiveCatching() = runTest { val c = Channel(1) val iterations = 10 expect(1) val job = launch { repeat(iterations) { select { - c.onReceiveOrClosed { v -> + c.onReceiveCatching { v -> expect(4 + it * 2) - assertEquals(it, v.value) + assertEquals(it, v.getOrNull()) } } } @@ -360,9 +360,9 @@ class SelectArrayChannelTest : TestBase() { launch { expect(3) val res = select { - c.onReceiveOrClosed { v -> + c.onReceiveCatching { v -> expect(6) - assertEquals(42, v.value) + assertEquals(42, v.getOrNull()) yield() // back to main expect(8) "OK" @@ -396,9 +396,9 @@ class SelectArrayChannelTest : TestBase() { expect(1) select { expect(2) - channel.onReceiveOrClosed { + channel.onReceiveCatching { assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) finish(3) } } @@ -412,9 +412,9 @@ class SelectArrayChannelTest : TestBase() { expect(1) select { expect(2) - channel.onReceiveOrClosed { + channel.onReceiveCatching { assertFalse(it.isClosed) - assertEquals(42, it.value) + assertEquals(42, it.getOrNull()) finish(3) } } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt index 2027630f20..6a1576761a 100644 --- a/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt +++ b/kotlinx-coroutines-core/common/test/selects/SelectRendezvousChannelTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ @file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 @@ -306,7 +306,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedWaitClosed() = runTest { + fun testSelectReceiveCatchingWaitClosed() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { @@ -316,10 +316,10 @@ class SelectRendezvousChannelTest : TestBase() { } expect(2) select { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) } } @@ -327,7 +327,7 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedWaitClosedWithCause() = runTest { + fun testSelectReceiveCatchingWaitClosedWithCause() = runTest { expect(1) val channel = Channel(Channel.RENDEZVOUS) launch { @@ -337,10 +337,10 @@ class SelectRendezvousChannelTest : TestBase() { } expect(2) select { - channel.onReceiveOrClosed { + channel.onReceiveCatching { expect(5) assertTrue(it.isClosed) - assertTrue(it.closeCause is TestException) + assertTrue(it.exceptionOrNull() is TestException) } } @@ -348,31 +348,31 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedForClosedChannel() = runTest { + fun testSelectReceiveCatchingForClosedChannel() = runTest { val channel = Channel() channel.close() expect(1) select { expect(2) - channel.onReceiveOrClosed { + channel.onReceiveCatching { assertTrue(it.isClosed) - assertNull(it.closeCause) + assertNull(it.exceptionOrNull()) finish(3) } } } @Test - fun testSelectReceiveOrClosed() = runTest { + fun testSelectReceiveCatching() = runTest { val channel = Channel(Channel.RENDEZVOUS) val iterations = 10 expect(1) val job = launch { repeat(iterations) { select { - channel.onReceiveOrClosed { v -> + channel.onReceiveCatching { v -> expect(4 + it * 2) - assertEquals(it, v.value) + assertEquals(it, v.getOrThrow()) } } } @@ -390,15 +390,15 @@ class SelectRendezvousChannelTest : TestBase() { } @Test - fun testSelectReceiveOrClosedDispatch() = runTest { + fun testSelectReceiveCatchingDispatch() = runTest { val c = Channel(Channel.RENDEZVOUS) expect(1) launch { expect(3) val res = select { - c.onReceiveOrClosed { v -> + c.onReceiveCatching { v -> expect(6) - assertEquals(42, v.value) + assertEquals(42, v.getOrThrow()) yield() // back to main expect(8) "OK" diff --git a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt index 1188329a4c..3f502ba9fb 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/ChannelUndeliveredElementStressTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -190,7 +190,7 @@ class ChannelUndeliveredElementStressTest(private val kind: TestChannelKind) : T 2 -> select { channel.onReceive { it } } 3 -> channel.receiveOrNull() ?: error("Should not be closed") 4 -> select { channel.onReceiveOrNull { it ?: error("Should not be closed") } } - 5 -> channel.receiveOrClosed().value + 5 -> channel.receiveCatching().getOrThrow() 6 -> { val iterator = channel.iterator() check(iterator.hasNext()) { "Should not be closed" } diff --git a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt index 5178907875..6ce2f20d5a 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/TickerChannelCommonTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.channels @@ -112,13 +112,13 @@ class TickerChannelCommonTest(private val channelFactory: Channel) : TestBase() var sum = 0 var n = 0 whileSelect { - this@averageInTimeWindow.onReceiveOrClosed { + this@averageInTimeWindow.onReceiveCatching { if (it.isClosed) { // Send leftovers and bail out if (n != 0) send(sum / n.toDouble()) false } else { - sum += it.value + sum += it.getOrThrow() ++n true }