Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement receiveOrClosed #762

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,9 @@ public final class kotlinx/coroutines/channels/ChannelsKt {
public static final fun minWith (Lkotlinx/coroutines/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun none (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun onReceiveOrNull (Lkotlinx/coroutines/channels/Channel;)Lkotlinx/coroutines/selects/SelectClause1;
public static final fun partition (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun receiveOrNull (Lkotlinx/coroutines/channels/Channel;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduce (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun reduceIndexed (Lkotlinx/coroutines/channels/ReceiveChannel;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun requireNoNulls (Lkotlinx/coroutines/channels/ReceiveChannel;)Lkotlinx/coroutines/channels/ReceiveChannel;
Expand Down Expand Up @@ -659,12 +661,14 @@ public abstract interface class kotlinx/coroutines/channels/ReceiveChannel {
public abstract synthetic fun cancel ()Z
public abstract fun cancel (Ljava/lang/Throwable;)Z
public abstract fun getOnReceive ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrClosed ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun getOnReceiveOrNull ()Lkotlinx/coroutines/selects/SelectClause1;
public abstract fun isClosedForReceive ()Z
public abstract fun isEmpty ()Z
public abstract fun iterator ()Lkotlinx/coroutines/channels/ChannelIterator;
public abstract fun poll ()Ljava/lang/Object;
public abstract fun receive (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrClosed (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun receiveOrNull (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down Expand Up @@ -699,6 +703,25 @@ public final class kotlinx/coroutines/channels/TickerMode : java/lang/Enum {
public static fun values ()[Lkotlinx/coroutines/channels/TickerMode;
}

public final class kotlinx/coroutines/channels/ValueOrClosed {
public static final field Companion Lkotlinx/coroutines/channels/ValueOrClosed$Companion;
public static final synthetic fun box-impl (Ljava/lang/Object;)Lkotlinx/coroutines/channels/ValueOrClosed;
public fun equals (Ljava/lang/Object;)Z
public static fun equals-impl (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun equals-impl0 (Ljava/lang/Object;Ljava/lang/Object;)Z
public static final fun getCloseCause-impl (Ljava/lang/Object;)Ljava/lang/Throwable;
public static final fun getValue-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrNull-impl (Ljava/lang/Object;)Ljava/lang/Object;
public static final fun getValueOrThrow-impl (Ljava/lang/Object;)Ljava/lang/Object;
public fun hashCode ()I
public static fun hashCode-impl (Ljava/lang/Object;)I
public static final fun isClosed-impl (Ljava/lang/Object;)Z
public static final fun isValue-impl (Ljava/lang/Object;)Z
public fun toString ()Ljava/lang/String;
public static fun toString-impl (Ljava/lang/Object;)Ljava/lang/String;
public final synthetic fun unbox-impl ()Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/selects/SelectBuilder {
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause0;Lkotlin/jvm/functions/Function1;)V
public abstract fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V
Expand Down
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) {
"-Xuse-experimental=kotlin.experimental.ExperimentalTypeInference",
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi"]
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi",
"-XXLanguage:+InlineClasses"]

}
}
Expand Down
168 changes: 129 additions & 39 deletions common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import kotlin.jvm.*

/**
* Abstract send channel. It is a base class for all send channel implementations.
*
*/
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
/** @suppress **This is unstable API and it is subject to change.** */
Expand Down Expand Up @@ -580,7 +579,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

@Suppress("UNCHECKED_CAST")
private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.THROW_ON_CLOSE)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability() // make it properly cancellable
Expand Down Expand Up @@ -628,7 +627,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

@Suppress("UNCHECKED_CAST")
private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveElement(cont, nullOnClose = true)
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.NULL_ON_CLOSE)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability() // make it properly cancellable
Expand All @@ -651,13 +650,42 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrClosed(): ValueOrClosed<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) {
return result.toResult()
}
// slow-path does suspend
return receiveOrClosedSuspend()
}

@Suppress("UNCHECKED_CAST")
private suspend fun receiveOrClosedSuspend(): ValueOrClosed<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.RECEIVE_RESULT)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability()
removeReceiveOnCancel(cont, receive)
return@sc
}

val result = pollInternal()
if (result is Closed<*> || result !== POLL_FAILED) {
cont.resume(result.toResult<E>())
return@sc
}
}
}

@Suppress("UNCHECKED_CAST")
public final override fun poll(): E? {
val result = pollInternal()
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
}

override fun cancel(): Unit {
override fun cancel() {
cancel(null)
}

Expand Down Expand Up @@ -712,9 +740,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private inner class TryEnqueueReceiveDesc<E, R>(
select: SelectInstance<R>,
block: suspend (E?) -> R,
nullOnClose: Boolean
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
block: suspend (Any?) -> R,
mode: ResumeMode
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, mode)) {
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is Send) return ENQUEUE_FAILED
return null
Expand Down Expand Up @@ -746,13 +774,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
enqueueResult === ALREADY_SELECTED -> return
enqueueResult === ENQUEUE_FAILED -> {} // retry
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
}
if (registerEnqueueDesc(select, block, ResumeMode.THROW_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
Expand Down Expand Up @@ -780,13 +802,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
enqueueResult === ALREADY_SELECTED -> return
enqueueResult === ENQUEUE_FAILED -> {} // retry
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
}
if (registerEnqueueDesc(select, block, ResumeMode.NULL_ON_CLOSE)) return
} else {
val pollResult = pollSelectInternal(select)
when {
Expand All @@ -797,8 +813,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
if (select.trySelect(null))
block.startCoroutineUnintercepted(null, select.completion)
return
} else
} else {
throw pollResult.closeCause
}
}
else -> {
// selected successfully
Expand All @@ -810,6 +827,51 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

override val onReceiveOrClosed: SelectClause1<ValueOrClosed<E>>
get() = object : SelectClause1<ValueOrClosed<E>> {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
registerSelectReceiveOrClosed(select, block)
}
}

@Suppress("UNCHECKED_CAST")
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ValueOrClosed<E>) -> R) {
while (true) {
if (select.isSelected) return
if (isEmpty) {
if (registerEnqueueDesc(select, block, ResumeMode.RECEIVE_RESULT)) return
} else {
val pollResult = pollSelectInternal(select)
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> {
block.startCoroutineUnintercepted(ValueOrClosed.closed(pollResult.receiveException), select.completion)
}
else -> {
// selected successfully
block.startCoroutineUnintercepted(ValueOrClosed.value(pollResult as E), select.completion)
return
}
}
}
}
}

private fun <R, E> registerEnqueueDesc(select: SelectInstance<R>, block: suspend (E) -> R,
mode: ResumeMode): Boolean {
@Suppress("UNCHECKED_CAST")
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, mode)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return true
return when {
enqueueResult === ALREADY_SELECTED -> true
enqueueResult === ENQUEUE_FAILED -> {
false // retry
}
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
}
}

// ------ protected ------

override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
Expand Down Expand Up @@ -902,18 +964,31 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

private class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<E?>,
@JvmField val nullOnClose: Boolean
@JvmField val cont: CancellableContinuation<Any?>,
@JvmField val resumeMode: ResumeMode
) : Receive<E>() {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
val resumeValue = when (resumeMode) {
ResumeMode.RECEIVE_RESULT -> ValueOrClosed.value(value)
else -> value
}
return cont.tryResume(resumeValue, idempotent)
}

override fun completeResumeReceive(token: Any) = cont.completeResume(token)
override fun resumeReceiveClosed(closed: Closed<*>) {
if (closed.closeCause == null && nullOnClose)
cont.resume(null)
else
cont.resumeWithException(closed.receiveException)
when (resumeMode) {
ResumeMode.THROW_ON_CLOSE -> cont.resumeWithException(closed.receiveException)
ResumeMode.RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
cont.resume(null)
} else {
cont.resumeWithException(closed.receiveException)
}
}
}
override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
override fun toString(): String = "ReceiveElement[$cont,mode=$resumeMode]"
}

private class ReceiveHasNext<E>(
Expand Down Expand Up @@ -957,26 +1032,27 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private inner class ReceiveSelect<R, in E>(
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (E?) -> R,
@JvmField val nullOnClose: Boolean
@JvmField val block: suspend (Any?) -> R,
@JvmField val mode: ResumeMode
) : Receive<E>(), DisposableHandle {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null

@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(token: Any) {
val value: E = (if (token === NULL_VALUE) null else token) as E
block.startCoroutine(value, select.completion)
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ValueOrClosed.value(value) else value, select.completion)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (select.trySelect(null)) {
if (closed.closeCause == null && nullOnClose) {
if (!select.trySelect(null)) return

when (mode) {
ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ValueOrClosed.closed<R>(closed.receiveException), select.completion)
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutine(null, select.completion)
} else {
// even though we are dispatching coroutine to process channel close on receive,
// which is an atomically cancellable suspending function,
// close is a final state, so we can use a cancellable resume mode
select.resumeSelectCancellableWithException(closed.receiveException)
}
}
Expand All @@ -991,7 +1067,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
onReceiveDequeued() // notify cancellation of receive
}

override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
override fun toString(): String = "ReceiveSelect[$select,mode=$mode]"
}

private class IdempotentTokenValue<out E>(
Expand Down Expand Up @@ -1083,3 +1159,17 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
}

@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ValueOrClosed<E> =
if (this is Closed<*>) ValueOrClosed.closed(receiveException) else ValueOrClosed.value(this as E)

@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ValueOrClosed<E> = ValueOrClosed.closed(receiveException)

// Marker for receive, receiveOrNull and receiveOrClosed
private enum class ResumeMode {
THROW_ON_CLOSE,
NULL_ON_CLOSE,
RECEIVE_RESULT
}