Skip to content

Commit

Permalink
Deprecate receiveOrNull and onReceiveOrNull and introduce correspondi…
Browse files Browse the repository at this point in the history
…ng extensions with the same semantic and generic ': Any' bound. Rename ReceiveResult to ValueOrClosed for consistency.
  • Loading branch information
qwwdfsad committed Oct 25, 2018
1 parent f8a6880 commit 27615aa
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun minWith (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun onReceiveOrNull (Lkotlinx/coroutines/channels/Channel;)Lkotlinx/coroutines/selects/SelectClause1;
public static final fun partition (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun receiveOrNull (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduce (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduceIndexed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun requireNoNulls (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down Expand Up @@ -659,12 +661,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel ()Z
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -699,6 +703,25 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum {
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}

public final class kotlinx/coroutines/channels/ValueOrClosed {
public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrThrow-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public static final fun isClosed-impl (Ljava/lang/Object;)Z
public static final fun isValue-impl (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/selects/SelectBuilder {
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause0;Lkotlin/jvm/functions/Function1;)V
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrClosed(): ReceiveResult<E> {
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) {
Expand All @@ -662,7 +662,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

@Suppress("UNCHECKED_CAST")
private suspend fun receiveOrClosedSuspend(): ReceiveResult<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
private suspend fun receiveOrClosedSuspend(): ValueOrClosed<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.RECEIVE_RESULT)
while (true) {
if (enqueueReceive(receive)) {
Expand Down Expand Up @@ -827,15 +827,15 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
get() = object : SelectClause1<ReceiveResult<E>> {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
get() = object : SelectClause1<ValueOrClosed<E>> {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
registerSelectReceiveOrClosed(select, block)
}
}

@Suppress("UNCHECKED_CAST")
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
while (true) {
if (select.isSelected) return
if (isEmpty) {
Expand All @@ -846,11 +846,11 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> {
block.startCoroutineUnintercepted(ReceiveResult.closed(pollResult.receiveException), select.completion)
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.receiveException), select.completion)
}
else -> {
// selected successfully
block.startCoroutineUnintercepted(ReceiveResult.value(pollResult as E), select.completion)
block.startCoroutineUnintercepted(ValueOrClosed.value(pollResult as E), select.completion)
return
}
}
Expand Down Expand Up @@ -970,7 +970,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
val resumeValue = when (resumeMode) {
ResumeMode.RECEIVE_RESULT -> ReceiveResult.value(value)
ResumeMode.RECEIVE_RESULT -> ValueOrClosed.value(value)
else -> value
}
return cont.tryResume(resumeValue, idempotent)
Expand Down Expand Up @@ -1041,15 +1041,15 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(token: Any) {
val value: E = (if (token === NULL_VALUE) null else token) as E
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ReceiveResult.value(value) else value, select.completion)
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (!select.trySelect(null)) return

when (mode) {
ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ReceiveResult.closed<R>(closed.receiveException), select.completion)
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.receiveException), select.completion)
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutine(null, select.completion)
} else {
Expand Down Expand Up @@ -1161,11 +1161,11 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
}

@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ReceiveResult<E> =
if (this is Closed<*>) ReceiveResult.closed(receiveException) else ReceiveResult.value(this as E)
private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
if (this is Closed<*>) ValueOrClosed.closed(receiveException) else ValueOrClosed.value(this as E)

@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ReceiveResult<E> = ReceiveResult.closed(receiveException)
private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(receiveException)

// Marker for receive, receiveOrNull and receiveOrClosed
private enum class ResumeMode {
Expand Down
40 changes: 21 additions & 19 deletions common/kotlinx-coroutines-core-common/src/channels/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.channels.Channel.Factory.RENDEZVOUS
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlin.internal.*

/**
* Sender's interface to [Channel].
Expand Down Expand Up @@ -188,10 +189,6 @@ public interface ReceiveChannel<out E> {
public val onReceive: SelectClause1<E>

/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or returns `null` if the channel is [closed][isClosedForReceive] without cause
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
Expand All @@ -207,12 +204,14 @@ public interface ReceiveChannel<out E> {
* This function can be used in [select] invocation with [onReceiveOrNull] clause.
* Use [poll] to try receiving from this channel without waiting.
*
* **Note: This is an obsolete api.**
* This function will be replaced with `receiveOrClosed: ReceiveResult<E>` and
* extension `suspend fun <E: Any> ReceiveChannel<E>.receiveOrNull(): E?`
* It is obsolete because it does not distinguish closed channel and null elements.
* This function is deprecated because it does not distinguish closed channel and null elements.
* Use [receiveOrClosed] or [receiveOrNull] extension method.
*/
@ObsoleteCoroutinesApi
@Suppress("INVISIBLE_REFERENCE", "INVISIBLE_MEMBER")
@LowPriorityInOverloadResolution
@Deprecated(message = "Deprecated in favor of receiveOrClosed and receiveOrNull extension",
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("receiveOrClosed"))
public suspend fun receiveOrNull(): E?

/**
Expand All @@ -224,12 +223,15 @@ public interface ReceiveChannel<out E> {
* **Note: This is an experimental api.** This function may be replaced with a better on in the future.
*/
@ExperimentalCoroutinesApi
@ObsoleteCoroutinesApi
@Deprecated(message = "Deprecated in favor of onReceiveOrClosed and onReceiveOrNull extension",
level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("onReceiveOrClosed"))
public val onReceiveOrNull: SelectClause1<E?>

/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty].
* This method returns [ReceiveResult] with a value if element was successfully retrieved from the channel
* or [ReceiveResult] with close cause if channel was closed.
* This method returns [ValueOrClosed] with a value if element was successfully retrieved from the channel
* or [ValueOrClosed] with close cause if channel was closed.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
Expand All @@ -247,15 +249,15 @@ public interface ReceiveChannel<out E> {
* Use [poll] to try receiving from this channel without waiting.
*/
@ExperimentalCoroutinesApi
public suspend fun receiveOrClosed(): ReceiveResult<E>
public suspend fun receiveOrClosed(): ValueOrClosed<E>

/**
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ReceiveResult] with a value
* that is received from the channel or selects with [ReceiveResult] with a close cause if the channel
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ValueOrClosed] with a value
* that is received from the channel or selects with [ValueOrClosed] with a close cause if the channel
* [isClosedForReceive].
*/
@ExperimentalCoroutinesApi
public val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
public val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>

/**
* Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
Expand Down Expand Up @@ -304,7 +306,7 @@ public interface ReceiveChannel<out E> {
* that encapsulates either successfully received element of type [T] from the channel or a close cause.
*/
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
public inline class ReceiveResult<out T>
public inline class ValueOrClosed<out T>
internal constructor(private val holder: Any?) {
/**
* Returns `true` if this instance represents received element.
Expand Down Expand Up @@ -359,12 +361,12 @@ internal constructor(private val holder: Any?) {

internal companion object {
@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> value(value: E): ReceiveResult<E> =
ReceiveResult(value)
internal inline fun <E> value(value: E): ValueOrClosed<E> =
ValueOrClosed(value)

@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> closed(cause: Throwable): ReceiveResult<E> =
ReceiveResult(Closed(cause))
internal inline fun <E> closed(cause: Throwable): ValueOrClosed<E> =
ValueOrClosed(Closed(cause))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package kotlinx.coroutines.channels

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.jvm.*

Expand All @@ -32,6 +33,41 @@ public inline fun <E, R> BroadcastChannel<E>.consume(block: ReceiveChannel<E>.()
}
}

/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or returns `null` if the channel is [closed][Channel.isClosedForReceive].
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
@ExperimentalCoroutinesApi
public suspend fun <E: Any> Channel<E>.receiveOrNull(): E? {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as Channel<E?>).receiveOrNull()
}

/**
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
* is received from the channel or selects with `null` if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] without cause. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
**/
@ExperimentalCoroutinesApi
public fun <E: Any> Channel<E>.onReceiveOrNull(): SelectClause1<E?> {
@Suppress("DEPRECATION", "UNCHECKED_CAST")
return (this as Channel<E?>).onReceiveOrNull
}

/**
* Subscribes to this [BroadcastChannel] and performs the specified action for each received element.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class BasicOperationsTest : TestBase() {
val result = channel.receiveOrClosed()
assertEquals(1, result.value)
assertEquals(1, result.valueOrNull)
assertTrue(ReceiveResult.value(1) == result)
assertTrue(ValueOrClosed.value(1) == result)

expect(3)
launch {
Expand All @@ -139,7 +139,7 @@ class BasicOperationsTest : TestBase() {
assertTrue(closed.isClosed)
assertTrue(closed.closeCause is ClosedReceiveChannelException)
assertFailsWith<ClosedReceiveChannelException> { closed.valueOrThrow }
assertTrue(ReceiveResult.closed<Int>(closed.closeCause) == closed)
assertTrue(ValueOrClosed.closed<Int>(closed.closeCause) == closed)
finish(6)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,15 @@ class ChannelReceiveOrClosedTest : TestBase() {
assertEquals(1, element.value)
assertEquals(1, element.valueOrNull)
assertEquals("Value(1)", element.toString())
assertTrue(ReceiveResult.value(1) == element) // Don't box
assertTrue(ValueOrClosed.value(1) == element) // Don't box

expect(4)
val nullElement = channel.receiveOrClosed()
assertTrue(nullElement.isValue)
assertNull(nullElement.value)
assertNull(nullElement.valueOrNull)
assertEquals("Value(null)", nullElement.toString())
assertTrue(ReceiveResult.value(null) == nullElement) // Don't box
assertTrue(ValueOrClosed.value(null) == nullElement) // Don't box

expect(5)
val closed = channel.receiveOrClosed()
Expand Down Expand Up @@ -107,10 +107,10 @@ class ChannelReceiveOrClosedTest : TestBase() {
@Test
@ExperimentalUnsignedTypes
fun testReceiveResultChannel() = runTest {
val channel = Channel<ReceiveResult<UInt>>()
val channel = Channel<ValueOrClosed<UInt>>()
launch {
channel.send(ReceiveResult.value(1u))
channel.send(ReceiveResult.closed(TestException1()))
channel.send(ValueOrClosed.value(1u))
channel.send(ValueOrClosed.closed(TestException1()))
channel.close(TestException2())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private class ChannelViaBroadcast<E>(

override suspend fun receive(): E = sub.receive()
override suspend fun receiveOrNull(): E? = sub.receiveOrNull()
override suspend fun receiveOrClosed(): ReceiveResult<E> = sub.receiveOrClosed()
override suspend fun receiveOrClosed(): ValueOrClosed<E> = sub.receiveOrClosed()
override fun poll(): E? = sub.poll()
override fun iterator(): ChannelIterator<E> = sub.iterator()
override fun cancel(): Unit = sub.cancel()
Expand All @@ -66,6 +66,6 @@ private class ChannelViaBroadcast<E>(
get() = sub.onReceive
override val onReceiveOrNull: SelectClause1<E?>
get() = sub.onReceiveOrNull
override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
get() = sub.onReceiveOrClosed
}

0 comments on commit 27615aa

Please sign in to comment.