Skip to content

Commit

Permalink
~ Make sure that channel operations allocate memory at the same rate …
Browse files Browse the repository at this point in the history
…as before
  • Loading branch information
elizarov committed Oct 7, 2020
1 parent 3df38e9 commit 164d3e5
Showing 1 changed file with 37 additions and 18 deletions.
55 changes: 37 additions & 18 deletions kotlinx-coroutines-core/common/src/channels/AbstractChannel.kt
Expand Up @@ -178,7 +178,9 @@ internal abstract class AbstractSendChannel<E>(
private suspend fun sendSuspend(element: E): Unit = suspendCancellableCoroutineReusable sc@ { cont ->
loop@ while (true) {
if (isFullImpl) {
val send = SendElement(element, onUndeliveredElement, cont)
val send = if (onUndeliveredElement == null)
SendElement(element, cont) else
SendElementWithUndeliveredHandler(element, cont, onUndeliveredElement)
val enqueueResult = enqueueSend(send)
when {
enqueueResult == null -> { // enqueued successfully
Expand Down Expand Up @@ -574,7 +576,9 @@ internal abstract class AbstractChannel<E>(

@Suppress("UNCHECKED_CAST")
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont ->
val receive = ReceiveElement(onUndeliveredElement, cont as CancellableContinuation<Any?>, receiveMode)
val receive = if (onUndeliveredElement == null)
ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else
ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement)
while (true) {
if (enqueueReceive(receive)) {
removeReceiveOnCancel(cont, receive)
Expand Down Expand Up @@ -846,7 +850,7 @@ internal abstract class AbstractChannel<E>(
}

private suspend fun hasNextSuspend(): Boolean = suspendCancellableCoroutineReusable sc@ { cont ->
val receive = ReceiveHasNext(channel.onUndeliveredElement, this, cont)
val receive = ReceiveHasNext(this, cont)
while (true) {
if (channel.enqueueReceive(receive)) {
channel.removeReceiveOnCancel(cont, receive)
Expand Down Expand Up @@ -883,8 +887,7 @@ internal abstract class AbstractChannel<E>(
}
}

private class ReceiveElement<in E>(
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>?,
private open class ReceiveElement<in E>(
@JvmField val cont: CancellableContinuation<Any?>,
@JvmField val receiveMode: Int
) : Receive<E>() {
Expand All @@ -893,10 +896,6 @@ internal abstract class AbstractChannel<E>(
else -> value
}

fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
onUndeliveredElement?.bindCancellationFun(value, cont.context)

@Suppress("IMPLICIT_CAST_TO_ANY")
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(resumeValue(value), otherOp?.desc, resumeOnCancellationFun(value)) ?: return null
assert { token === RESUME_TOKEN } // the only other possible result
Expand All @@ -917,13 +916,21 @@ internal abstract class AbstractChannel<E>(
override fun toString(): String = "ReceiveElement@$hexAddress[receiveMode=$receiveMode]"
}

private class ReceiveHasNext<E>(
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>?,
private class ReceiveElementWithUndeliveredHandler<in E>(
cont: CancellableContinuation<Any?>,
receiveMode: Int,
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>
) : ReceiveElement<E>(cont, receiveMode) {
override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
onUndeliveredElement.bindCancellationFun(value, cont.context)
}

private open class ReceiveHasNext<E>(
@JvmField val iterator: Itr<E>,
@JvmField val cont: CancellableContinuation<Boolean>
) : Receive<E>() {
override fun tryResumeReceive(value: E, otherOp: PrepareOp?): Symbol? {
val token = cont.tryResume(true, otherOp?.desc, onUndeliveredElement?.bindCancellationFun(value, cont.context))
val token = cont.tryResume(true, otherOp?.desc, resumeOnCancellationFun(value))
?: return null
assert { token === RESUME_TOKEN } // the only other possible result
// We can call finishPrepare only after successful tryResume, so that only good affected node is saved
Expand Down Expand Up @@ -951,6 +958,10 @@ internal abstract class AbstractChannel<E>(
cont.completeResume(token)
}
}

override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
iterator.channel.onUndeliveredElement?.bindCancellationFun(value, cont.context)

This comment has been minimized.

Copy link
@qwwdfsad

qwwdfsad Oct 12, 2020

Member

channel property can also be private (or @JvmField)


override fun toString(): String = "ReceiveHasNext@$hexAddress"
}

Expand All @@ -968,7 +979,7 @@ internal abstract class AbstractChannel<E>(
block.startCoroutineCancellable(
if (receiveMode == RECEIVE_RESULT) ValueOrClosed.value(value) else value,
select.completion,
channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)
resumeOnCancellationFun(value)
)
}

Expand All @@ -990,6 +1001,9 @@ internal abstract class AbstractChannel<E>(
channel.onReceiveDequeued() // notify cancellation of receive
}

override fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? =
channel.onUndeliveredElement?.bindCancellationFun(value, select.completion.context)

override fun toString(): String = "ReceiveSelect@$hexAddress[$select,receiveMode=$receiveMode]"
}
}
Expand Down Expand Up @@ -1056,10 +1070,8 @@ internal interface ReceiveOrClosed<in E> {
/**
* Represents sender for a specific element.
*/
@Suppress("UNCHECKED_CAST")
internal class SendElement<E>(
internal open class SendElement<E>(
override val pollResult: E,
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>?,
@JvmField val cont: CancellableContinuation<Unit>
) : Send() {
override fun tryResumeSend(otherOp: PrepareOp?): Symbol? {
Expand All @@ -1072,8 +1084,14 @@ internal class SendElement<E>(

override fun completeResumeSend() = cont.completeResume(RESUME_TOKEN)
override fun resumeSendClosed(closed: Closed<*>) = cont.resumeWithException(closed.sendException)
override fun toString(): String = "SendElement@$hexAddress($pollResult)"
override fun toString(): String = "$classSimpleName@$hexAddress($pollResult)"
}

internal class SendElementWithUndeliveredHandler<E>(
pollResult: E,
cont: CancellableContinuation<Unit>,
@JvmField val onUndeliveredElement: OnUndeliveredElement<E>
) : SendElement<E>(pollResult, cont) {
override fun remove(): Boolean {
if (!super.remove()) return false
// if the node was successfully removed (meaning it was added but was not received) then we have undelivered element
Expand All @@ -1082,7 +1100,7 @@ internal class SendElement<E>(
}

override fun undeliveredElement() {
onUndeliveredElement?.callUndeliveredElement(pollResult, cont.context)
onUndeliveredElement.callUndeliveredElement(pollResult, cont.context)
}
}

Expand All @@ -1108,6 +1126,7 @@ internal class Closed<in E>(
internal abstract class Receive<in E> : LockFreeLinkedListNode(), ReceiveOrClosed<E> {
override val offerResult get() = OFFER_SUCCESS
abstract fun resumeReceiveClosed(closed: Closed<*>)
open fun resumeOnCancellationFun(value: E): ((Throwable) -> Unit)? = null
}

@Suppress("NOTHING_TO_INLINE", "UNCHECKED_CAST")
Expand Down

0 comments on commit 164d3e5

Please sign in to comment.