Skip to content

Commit

Permalink
Introduce ChannelResult and receiveCatching/onReceiveCatching
Browse files Browse the repository at this point in the history
    * Introduce three-state ChannelResult, a domain-specific Result class counterpart
    * Introduce receiveCatching/onReceiveCatching, make it public
    * Get rid of receiveOrClosed/onReceiveOrClosed without migrations, it was @InternalCoroutinesApi anyway

Fixes #330
  • Loading branch information
qwwdfsad committed Mar 22, 2021
1 parent f2940d5 commit d80db63
Show file tree
Hide file tree
Showing 12 changed files with 172 additions and 203 deletions.
41 changes: 22 additions & 19 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -625,6 +625,26 @@ public final class kotlinx/coroutines/channels/ChannelKt {
public static synthetic fun Channel$default (ILkotlinx/coroutines/channels/BufferOverflow;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel;
}

public final class kotlinx/coroutines/channels/ChannelResult {
public static final field Companion Lkotlinx/coroutines/channels/ChannelResult$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ChannelResult;
public static synthetic fun constructor-impl (Ljava/lang/Object;Lkotlin/jvm/internal/DefaultConstructorMarker;)Ljava/lang/Object;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun exceptionOrNull-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
public static final fun getOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getOrThrow-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public static final fun isClosed-impl (Ljava/lang/Object;)Z
public static final fun isFailure-impl (Ljava/lang/Object;)Z
public static final fun isSuccess-impl (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun all (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun any (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down Expand Up @@ -789,14 +809,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel (Ljava/lang/Throwable;)Z
public abstract fun cancel (Ljava/util/concurrent/CancellationException;)V
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveCatching ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed-WVj179g (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveCatching-JP2dKIU (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -832,23 +852,6 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum {
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}

public final class kotlinx/coroutines/channels/ValueOrClosed {
public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public static final fun isClosed-impl (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public final class kotlinx/coroutines/debug/internal/DebugCoroutineInfo {
public fun <init> (Lkotlinx/coroutines/debug/internal/DebugCoroutineInfoImpl;Lkotlin/coroutines/CoroutineContext;)V
public final fun getContext ()Lkotlin/coroutines/CoroutineContext;
Expand Down
26 changes: 13 additions & 13 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Expand Up @@ -623,7 +623,7 @@ internal abstract class AbstractChannel<E>(
}

@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
public final override suspend fun receiveCatching(): ChannelResult<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) return result.toResult()
Expand Down Expand Up @@ -742,10 +742,10 @@ internal abstract class AbstractChannel<E>(
}
}

final override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
get() = object : SelectClause1<ValueOrClosed<E>> {
final override val onReceiveCatching: SelectClause1<ChannelResult<E>>
get() = object : SelectClause1<ChannelResult<E>> {
@Suppress("UNCHECKED_CAST")
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ChannelResult<E>) -> R) {
registerSelectReceiveMode(select, RECEIVE_RESULT, block as suspend (Any?) -> R)
}
}
Expand Down Expand Up @@ -776,7 +776,7 @@ internal abstract class AbstractChannel<E>(
}
RECEIVE_RESULT -> {
if (!select.trySelect()) return
startCoroutineUnintercepted(ValueOrClosed.closed<Any>(value.closeCause), select.completion)
startCoroutineUnintercepted(ChannelResult.closed<Any>(value.closeCause), select.completion)
}
RECEIVE_NULL_ON_CLOSE -> {
if (value.closeCause == null) {
Expand Down Expand Up @@ -905,7 +905,7 @@ internal abstract class AbstractChannel<E>(
@JvmField val receiveMode: Int
) : Receive<E>() {
fun resumeValue(value: E): Any? = when (receiveMode) {
RECEIVE_RESULT -> ValueOrClosed.value(value)
RECEIVE_RESULT -> ChannelResult.value(value)
else -> value
}

Expand Down Expand Up @@ -990,7 +990,7 @@ internal abstract class AbstractChannel<E>(
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(value: E) {
block.startCoroutineCancellable(
if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
if (receiveMode == RECEIVE_RESULT) ChannelResult.value(value) else value,
select.completion,
resumeOnCancellationFun(value)
)
Expand All @@ -1000,7 +1000,7 @@ internal abstract class AbstractChannel<E>(
if (!select.trySelect()) return
when (receiveMode) {
RECEIVE_THROWS_ON_CLOSE -> select.resumeSelectWithException(closed.receiveException)
RECEIVE_RESULT -> block.startCoroutineCancellable(ValueOrClosed.closed<R>(closed.closeCause), select.completion)
RECEIVE_RESULT -> block.startCoroutineCancellable(ChannelResult.closed<R>(closed.closeCause), select.completion)
RECEIVE_NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutineCancellable(null, select.completion)
} else {
Expand Down Expand Up @@ -1128,9 +1128,9 @@ internal class Closed<in E>(

override val offerResult get() = this
override val pollResult get() = this
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun tryResumeSend(otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeSend() {}
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol = RESUME_TOKEN.also { otherOp?.finishPrepare() }
override fun completeResumeReceive(value: E) {}
override fun resumeSendClosed(closed: Closed<*>) = assert { false } // "Should be never invoked"
override fun toString(): String = "Closed@$hexAddress[$closeCause]"
Expand All @@ -1143,8 +1143,8 @@ internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClose
}

@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
if (this is Closed<*>) ValueOrClosed.closed(closeCause) else ValueOrClosed.value(this as E)
private inline fun <E> Any?.toResult(): ChannelResult<E> =
if (this is Closed<*>) ChannelResult.closed(closeCause) else ChannelResult.value(this as E)

@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(closeCause)
private inline fun <E> Closed<*>.toResult(): ChannelResult<E> = ChannelResult.closed(closeCause)
126 changes: 46 additions & 80 deletions kotlinx-coroutines-core/common/src/channels/Channel.kt
Expand Up @@ -244,8 +244,8 @@ public interface ReceiveChannel<out E> {

/**
* Retrieves and removes an element from this channel if it's not empty, or suspends the caller while this channel is empty.
* This method returns [ValueOrClosed] with the value of an element successfully retrieved from the channel
* or the close cause if the channel was closed.
* This method returns [ChannelResult] with the value of an element successfully retrieved from the channel
* or the close cause if the channel was closed. Closed cause may be `null` if the channel was closed normally.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with a [CancellationException].
Expand All @@ -257,25 +257,17 @@ public interface ReceiveChannel<out E> {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocations with the [onReceiveOrClosed] clause.
* This function can be used in [select] invocations with the [onReceiveCatching] clause.
* Use [poll] to try receiving from this channel without waiting.
*
* @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
public suspend fun receiveOrClosed(): ValueOrClosed<E>
public suspend fun receiveCatching(): ChannelResult<E>

/**
* Clause for the [select] expression of the [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
* Clause for the [select] expression of the [onReceiveCatching] suspending function that selects with the [ChannelResult] with a value
* that is received from the channel or with a close cause if the channel
* [is closed for `receive`][isClosedForReceive].
*
* @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
*/
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
public val onReceiveCatching: SelectClause1<ChannelResult<E>>

/**
* Retrieves and removes an element from this channel if its not empty, or returns `null` if the channel is empty
Expand Down Expand Up @@ -321,104 +313,78 @@ public interface ReceiveChannel<out E> {
}

/**
* A discriminated union of [ReceiveChannel.receiveOrClosed] result
* that encapsulates either an element of type [T] successfully received from the channel or a close cause.
*
* :todo: Do not make it public before resolving todos in the code of this class.
* A discriminated union of channel operation result.
* It encapsulates either a successful result of the channel operation or a closed cause of a channel.
*
* @suppress *This is an internal API, do not use*: Inline classes ABI is not stable yet and
* [KT-27524](https://youtrack.jetbrains.com/issue/KT-27524) needs to be fixed.
* Successful result may represent a successfully received value of type [T], for example as a result of [Channel.receiveCatching]
* operation or a successfully sent element as a result of [Channel.trySend].
*/
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS", "EXPERIMENTAL_FEATURE_WARNING")
@InternalCoroutinesApi // until https://youtrack.jetbrains.com/issue/KT-27524 is fixed
public inline class ValueOrClosed<out T>
internal constructor(private val holder: Any?) {
/**
* Returns `true` if this instance represents a received element.
* In this case [isClosed] returns `false`.
* todo: it is commented for now, because it is not used
*/
//public val isValue: Boolean get() = holder !is Closed

@Suppress("UNCHECKED_CAST")
public inline class ChannelResult<out T>
private constructor(private val holder: Any?) {
/**
* Returns `true` if this instance represents a close cause.
* In this case [isValue] returns `false`.
* Returns `true` if this instance represents a successful
* operation outcome.
*
* In this case [isFailure] and [isClosed] return false.
*/
public val isClosed: Boolean get() = holder is Closed
public val isSuccess: Boolean get() = holder !is Closed

/**
* Returns the received value if this instance represents a received value, or throws an [IllegalStateException] otherwise.
*
* :todo: Decide, if it is needed, how it shall be named with relation to [valueOrThrow]:
* Returns true if this instance represents a failed outcome.
*
* So we have the following methods on `ValueOrClosed`: `value`, `valueOrNull`, `valueOrThrow`.
* On the other hand, the channel has the following `receive` variants:
* * `receive` which corresponds to `receiveOrClosed().valueOrThrow`... huh?
* * `receiveOrNull` which corresponds to `receiveOrClosed().valueOrNull`
* * `receiveOrClosed`
* For the sake of simplicity consider dropping this version of `value` and rename [valueOrThrow] to simply `value`.
* In this case [isSuccess] returns false and [isClosed] returns true,
* but it does not imply that [exceptionOrNull] will return non-null value.
*/
@Suppress("UNCHECKED_CAST")
public val value: T
get() = if (holder is Closed) error(DEFAULT_CLOSE_MESSAGE) else holder as T
public val isFailure: Boolean get() = holder is Closed && holder.cause != null

/**
* Returns the received value if this element represents a received value, or `null` otherwise.
* :todo: Decide if it shall be made into extension that is available only for non-null T.
* Note: it might become inconsistent with kotlin.Result
* Returns `true` if this instance represents unsuccessful operation
* to closed channel.
*
* In this case [isSuccess] returns false, but it doesn't imply
* that [isFailure] returns true or that [exceptionOrNull] returns non-null value.
* It can happen if the channel was [closed][Channel.close] normally without an exception.
*/
@Suppress("UNCHECKED_CAST")
public val valueOrNull: T?
get() = if (holder is Closed) null else holder as T
public val isClosed: Boolean get() = holder is Closed

/**
* :todo: Decide, if it is needed, how it shall be named with relation to [value].
* Note that `valueOrThrow` rethrows the cause adding no meaningful information about the call site,
* so if one is sure that `ValueOrClosed` always holds a value, this very property should be used.
* Otherwise, it could be very hard to locate the source of the exception.
* todo: it is commented for now, because it is not used
* Returns the encapsulated value if this instance represents success or `null` if it is closed or failed.
*/
//@Suppress("UNCHECKED_CAST")
//public val valueOrThrow: T
// get() = if (holder is Closed) throw holder.exception else holder as T
public fun getOrNull(): T? = if (holder !is Closed) holder as T else null

/**
* Returns the close cause of the channel if this instance represents a close cause, or throws
* an [IllegalStateException] otherwise.
* Returns the encapsulated value if this instance represents success or throws an exception if it is closed or failed.
*/
@Suppress("UNCHECKED_CAST")
public val closeCause: Throwable? get() =
if (holder is Closed) holder.cause else error("Channel was not closed")
public fun getOrThrow(): T = if (holder !is Closed) holder as T else error("Trying to call 'getOrThrow' on a closed channel result")

/**
* @suppress
* Returns the encapsulated exception if this instance represents failure or null if it is success
* or unsuccessful operation to closed channel.
*/
public override fun toString(): String =
when (holder) {
is Closed -> holder.toString()
else -> "Value($holder)"
}
public fun exceptionOrNull(): Throwable? = (holder as? Closed)?.cause

internal class Closed(@JvmField val cause: Throwable?) {
// todo: it is commented for now, because it is not used
//val exception: Throwable get() = cause ?: ClosedReceiveChannelException(DEFAULT_CLOSE_MESSAGE)
override fun equals(other: Any?): Boolean = other is Closed && cause == other.cause
override fun hashCode(): Int = cause.hashCode()
override fun toString(): String = "Closed($cause)"
}

/**
* todo: consider making value/closed constructors public in the future.
*/
internal companion object {
@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> value(value: E): ValueOrClosed<E> =
ValueOrClosed(value)
internal inline fun <E> value(value: E): ChannelResult<E> =
ChannelResult(value)

@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> closed(cause: Throwable?): ValueOrClosed<E> =
ValueOrClosed(Closed(cause))
internal inline fun <E> closed(cause: Throwable?): ChannelResult<E> =
ChannelResult(Closed(cause))
}

public override fun toString(): String =
when (holder) {
is Closed -> holder.toString()
else -> "Value($holder)"
}
}

/**
Expand Down

0 comments on commit d80db63

Please sign in to comment.