Skip to content

Commit

Permalink
Introduce ReceiveChannel.receiveOrClosed and ReceiveChannel.onReceive…
Browse files Browse the repository at this point in the history
…OrClosed

Fixes #330
  • Loading branch information
qwwdfsad committed Oct 23, 2018
1 parent 8a22c54 commit 3a86f65
Show file tree
Hide file tree
Showing 9 changed files with 608 additions and 30 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Expand Up @@ -185,7 +185,8 @@ configure(subprojects.findAll { !unpublished.contains(it.name) }) {
"-Xuse-experimental=kotlin.experimental.ExperimentalTypeInference",
"-Xuse-experimental=kotlinx.coroutines.ExperimentalCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.ObsoleteCoroutinesApi",
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi"]
"-Xuse-experimental=kotlinx.coroutines.InternalCoroutinesApi",
"-XXLanguage:+InlineClasses"]

}
}
Expand Down
145 changes: 119 additions & 26 deletions common/kotlinx-coroutines-core-common/src/channels/AbstractChannel.kt
Expand Up @@ -14,7 +14,6 @@ import kotlin.jvm.*

/**
* Abstract send channel. It is a base class for all send channel implementations.
*
*/
internal abstract class AbstractSendChannel<E> : SendChannel<E> {
/** @suppress **This is unstable API and it is subject to change.** */
Expand Down Expand Up @@ -580,7 +579,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

@Suppress("UNCHECKED_CAST")
private suspend fun receiveSuspend(): E = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveElement(cont as CancellableContinuation<E?>, nullOnClose = false)
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.THROW_ON_CLOSE)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability() // make it properly cancellable
Expand Down Expand Up @@ -628,7 +627,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

@Suppress("UNCHECKED_CAST")
private suspend fun receiveOrNullSuspend(): E? = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@ { cont ->
val receive = ReceiveElement(cont, nullOnClose = true)
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.NULL_ON_CLOSE)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability() // make it properly cancellable
Expand All @@ -651,13 +650,42 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

@Suppress("UNCHECKED_CAST")
public final override suspend fun receiveOrClosed(): ReceiveResult<E> {
// fast path -- try poll non-blocking
val result = pollInternal()
if (result !== POLL_FAILED) {
return result.toResult()
}
// slow-path does suspend
return receiveOrClosedSuspend()
}

@Suppress("UNCHECKED_CAST")
private suspend fun receiveOrClosedSuspend(): ReceiveResult<E> = suspendAtomicCancellableCoroutine(holdCancellability = true) sc@{ cont ->
val receive = ReceiveElement<E>(cont as CancellableContinuation<Any?>, ResumeMode.RECEIVE_RESULT)
while (true) {
if (enqueueReceive(receive)) {
cont.initCancellability()
removeReceiveOnCancel(cont, receive)
return@sc
}

val result = pollInternal()
if (result is Closed<*> || result !== POLL_FAILED) {
cont.resume(result.toResult<E>())
return@sc
}
}
}

@Suppress("UNCHECKED_CAST")
public final override fun poll(): E? {
val result = pollInternal()
return if (result === POLL_FAILED) null else receiveOrNullResult(result)
}

override fun cancel(): Unit {
override fun cancel() {
cancel(null)
}

Expand Down Expand Up @@ -712,9 +740,9 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private inner class TryEnqueueReceiveDesc<E, R>(
select: SelectInstance<R>,
block: suspend (E?) -> R,
nullOnClose: Boolean
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, nullOnClose)) {
block: suspend (Any?) -> R,
mode: ResumeMode
) : AddLastDesc<ReceiveSelect<R, E>>(queue, ReceiveSelect(select, block, mode)) {
override fun failure(affected: LockFreeLinkedListNode, next: Any): Any? {
if (affected is Send) return ENQUEUE_FAILED
return null
Expand Down Expand Up @@ -746,7 +774,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block as (suspend (E?) -> R), nullOnClose = false)
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as (suspend (Any?) -> R), ResumeMode.THROW_ON_CLOSE)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
enqueueResult === ALREADY_SELECTED -> return
Expand Down Expand Up @@ -780,7 +808,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
while (true) {
if (select.isSelected) return
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc(select, block, nullOnClose = true)
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.NULL_ON_CLOSE)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
enqueueResult === ALREADY_SELECTED -> return
Expand Down Expand Up @@ -810,6 +838,43 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}
}

override val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>
get() = object : SelectClause1<ReceiveResult<E>> {
override fun <R> registerSelectClause1(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
registerSelectReceiveOrClosed(select, block)
}
}

@Suppress("UNCHECKED_CAST")
private fun <R> registerSelectReceiveOrClosed(select: SelectInstance<R>, block: suspend (ReceiveResult<E>) -> R) {
while (true) {
if (select.isSelected) return
if (isEmpty) {
val enqueueOp = TryEnqueueReceiveDesc<E, R>(select, block as suspend (Any?) -> R, ResumeMode.RECEIVE_RESULT)
val enqueueResult = select.performAtomicIfNotSelected(enqueueOp) ?: return
when {
enqueueResult === ALREADY_SELECTED -> return
enqueueResult === ENQUEUE_FAILED -> {} // retry
else -> error("performAtomicIfNotSelected(TryEnqueueReceiveDesc) returned $enqueueResult")
}
} else {
val pollResult = pollSelectInternal(select)
when {
pollResult === ALREADY_SELECTED -> return
pollResult === POLL_FAILED -> {} // retry
pollResult is Closed<*> -> {
block.startCoroutineUnintercepted(ReceiveResult.closed(pollResult.receiveException), select.completion)
}
else -> {
// selected successfully
block.startCoroutineUnintercepted(ReceiveResult.value(pollResult as E), select.completion)
return
}
}
}
}
}

// ------ protected ------

override fun takeFirstReceiveOrPeekClosed(): ReceiveOrClosed<E>? =
Expand Down Expand Up @@ -902,18 +967,31 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
}

private class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<E?>,
@JvmField val nullOnClose: Boolean
@JvmField val cont: CancellableContinuation<Any?>,
@JvmField val resumeMode: ResumeMode
) : Receive<E>() {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? = cont.tryResume(value, idempotent)
@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, idempotent: Any?): Any? {
val resumeValue = when (resumeMode) {
ResumeMode.RECEIVE_RESULT -> ReceiveResult.value(value)
else -> value
}
return cont.tryResume(resumeValue, idempotent)
}

override fun completeResumeReceive(token: Any) = cont.completeResume(token)
override fun resumeReceiveClosed(closed: Closed<*>) {
if (closed.closeCause == null && nullOnClose)
cont.resume(null)
else
cont.resumeWithException(closed.receiveException)
when (resumeMode) {
ResumeMode.THROW_ON_CLOSE -> cont.resumeWithException(closed.receiveException)
ResumeMode.RECEIVE_RESULT -> cont.resume(closed.toResult<Any>())
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
cont.resume(null)
} else {
cont.resumeWithException(closed.receiveException)
}
}
}
override fun toString(): String = "ReceiveElement[$cont,nullOnClose=$nullOnClose]"
override fun toString(): String = "ReceiveElement[$cont,mode=$resumeMode]"
}

private class ReceiveHasNext<E>(
Expand Down Expand Up @@ -957,26 +1035,27 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E

private inner class ReceiveSelect<R, in E>(
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (E?) -> R,
@JvmField val nullOnClose: Boolean
@JvmField val block: suspend (Any?) -> R,
@JvmField val mode: ResumeMode
) : Receive<E>(), DisposableHandle {
override fun tryResumeReceive(value: E, idempotent: Any?): Any? =
if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null

@Suppress("UNCHECKED_CAST")
override fun completeResumeReceive(token: Any) {
val value: E = (if (token === NULL_VALUE) null else token) as E
block.startCoroutine(value, select.completion)
block.startCoroutine(if (mode == ResumeMode.RECEIVE_RESULT) ReceiveResult.value(value) else value, select.completion)
}

override fun resumeReceiveClosed(closed: Closed<*>) {
if (select.trySelect(null)) {
if (closed.closeCause == null && nullOnClose) {
if (!select.trySelect(null)) return

when (mode) {
ResumeMode.THROW_ON_CLOSE -> select.resumeSelectCancellableWithException(closed.receiveException)
ResumeMode.RECEIVE_RESULT -> block.startCoroutine(ReceiveResult.closed<R>(closed.receiveException), select.completion)
ResumeMode.NULL_ON_CLOSE -> if (closed.closeCause == null) {
block.startCoroutine(null, select.completion)
} else {
// even though we are dispatching coroutine to process channel close on receive,
// which is an atomically cancellable suspending function,
// close is a final state, so we can use a cancellable resume mode
select.resumeSelectCancellableWithException(closed.receiveException)
}
}
Expand All @@ -991,7 +1070,7 @@ internal abstract class AbstractChannel<E> : AbstractSendChannel<E>(), Channel<E
onReceiveDequeued() // notify cancellation of receive
}

override fun toString(): String = "ReceiveSelect[$select,nullOnClose=$nullOnClose]"
override fun toString(): String = "ReceiveSelect[$select,mode=$mode]"
}

private class IdempotentTokenValue<out E>(
Expand Down Expand Up @@ -1083,3 +1162,17 @@ private abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
}

@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
private inline fun <E> Any?.toResult(): ReceiveResult<E> =
if (this is Closed<*>) ReceiveResult.closed(receiveException) else ReceiveResult.value(this as E)

@Suppress("NOTHING_TO_INLINE")
private inline fun <E> Closed<*>.toResult(): ReceiveResult<E> = ReceiveResult.closed(receiveException)

// Marker for receive, receiveOrNull and receiveOrClosed
private enum class ResumeMode {
THROW_ON_CLOSE,
NULL_ON_CLOSE,
RECEIVE_RESULT
}
104 changes: 102 additions & 2 deletions common/kotlinx-coroutines-core-common/src/channels/Channel.kt
Expand Up @@ -218,7 +218,7 @@ public interface ReceiveChannel<out E> {

/**
* Clause for [select] expression of [receiveOrNull] suspending function that selects with the element that
* is received from the channel or selects with `null` if if the channel
* is received from the channel or selects with `null` if the channel
* [isClosedForReceive] without cause. The [select] invocation fails with
* the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
Expand All @@ -227,6 +227,37 @@ public interface ReceiveChannel<out E> {
@ExperimentalCoroutinesApi
public val onReceiveOrNull: SelectClause1<E?>

/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty].
* This method returns [ReceiveResult] with a value if element was successfully retrieved from the channel
* or [ReceiveResult] with close cause if channel was closed.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
*
* *Cancellation of suspended receive is atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
* As a side-effect of atomic cancellation, a thread-bound coroutine (to some UI thread, for example) may
* continue to execute even after it was cancelled from the same thread in the case when this receive operation
* was already resumed and the continuation was posted for execution to the thread's queue.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*
* This function can be used in [select] invocation with [onReceiveOrClosed] clause.
* Use [poll] to try receiving from this channel without waiting.
*/
@ExperimentalCoroutinesApi
public suspend fun receiveOrClosed(): ReceiveResult<E>

/**
* Clause for [select] expression of [receiveOrClosed] suspending function that selects with the [ReceiveResult] with a value
* that is received from the channel or selects with [ReceiveResult] with a close cause if the channel
* [isClosedForReceive].
*/
@ExperimentalCoroutinesApi
public val onReceiveOrClosed: SelectClause1<ReceiveResult<E>>

/**
* Retrieves and removes the element from this channel, or returns `null` if this channel [isEmpty]
* or is [isClosedForReceive] without cause.
Expand All @@ -251,7 +282,7 @@ public interface ReceiveChannel<out E> {
* afterwards will throw [ClosedSendChannelException], while attempts to receive will throw
* [ClosedReceiveChannelException].
*/
public fun cancel(): Unit
public fun cancel()

/**
* @suppress
Expand All @@ -269,6 +300,75 @@ public interface ReceiveChannel<out E> {
public fun cancel(cause: Throwable? = null): Boolean
}

/**
* A discriminated union of [ReceiveChannel.receiveOrClosed] result,
* that encapsulates either successfully received element of type [T] from the channel or a close cause.
*/
@Suppress("NON_PUBLIC_PRIMARY_CONSTRUCTOR_OF_INLINE_CLASS")
public inline class ReceiveResult<out T>
internal constructor(private val holder: Any?) {
/**
* Returns `true` if this instance represents received element.
* In this case [isClosed] returns `false`.
*/
public val isValue: Boolean get() = holder !is Closed

/**
* Returns `true` if this instance represents close cause.
* In this case [isValue] returns `false`.
*/
public val isClosed: Boolean get() = holder is Closed

/**
* Returns received value if this instance represents received value or throws [IllegalStateException] otherwise.
*/
@Suppress("UNCHECKED_CAST")
public val value: T
get() = if (isClosed) throw IllegalStateException() else holder as T

/**
* Returns received value if this element represents received value or `null` otherwise.
*/
@Suppress("UNCHECKED_CAST")
public val valueOrNull: T?
get() = if (isClosed) null else holder as T

@Suppress("UNCHECKED_CAST")
public val valueOrThrow: T
get() = if (isClosed) throw closeCause else holder as T

/**
* Returns close cause of the channel if this instance represents close cause or throws [IllegalStateException] otherwise.
*/
@Suppress("UNCHECKED_CAST")
public val closeCause: Throwable get() = if (isClosed) (holder as Closed).exception else error("ReceiveResult is not closed")

/**
* @suppress
*/
public override fun toString(): String =
when (holder) {
is Closed -> holder.toString()
else -> "Value($holder)"
}

internal class Closed(@JvmField val exception: Throwable) {
override fun equals(other: Any?): Boolean = other is Closed && exception == other.exception
override fun hashCode(): Int = exception.hashCode()
override fun toString(): String = "Closed($exception)"
}

internal companion object {
@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> value(value: E): ReceiveResult<E> =
ReceiveResult(value)

@Suppress("NOTHING_TO_INLINE")
internal inline fun <E> closed(cause: Throwable): ReceiveResult<E> =
ReceiveResult(Closed(cause))
}
}

/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
Expand Down

0 comments on commit 3a86f65

Please sign in to comment.