Skip to content

Commit

Permalink
ConflatedChannel with lock to protect the one-element buffer, double-…
Browse files Browse the repository at this point in the history
…linked list used for suspending receivers only.
  • Loading branch information
mvicsokolova authored and elizarov committed Feb 12, 2020
1 parent 90a9faf commit 0126dba
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 74 deletions.
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/build.gradle
Expand Up @@ -107,7 +107,7 @@ task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
testClassesDirs = files { jvmTest.testClassesDirs }
executable = "$System.env.JDK_16/bin/java"
exclude '**/*LFStressTest.*' // lock-freedom tests use LockFreedomTestEnvironment which needs JDK8
exclude '**/*LCStressTest.*' // lic-check tests use LinChecker which needs JDK8
exclude '**/*LCStressTest.*' // lin-check tests use LinChecker which needs JDK8
exclude '**/exceptions/**' // exceptions tests check suppressed exception which needs JDK8
exclude '**/ExceptionsGuideTest.*'
}
Expand Down
164 changes: 103 additions & 61 deletions kotlinx-coroutines-core/common/src/channels/ConflatedChannel.kt
@@ -1,11 +1,9 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.channels

import kotlinx.coroutines.selects.*
import kotlinx.coroutines.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.native.concurrent.SharedImmutable

/**
* Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
Expand All @@ -15,80 +13,124 @@ import kotlinx.coroutines.internal.*
* Sender to this channel never suspends and [offer] always returns `true`.
*
* This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
*
* This implementation is fully lock-free.
*/
internal open class ConflatedChannel<E> : AbstractChannel<E>() {
protected final override val isBufferAlwaysEmpty: Boolean get() = true
protected final override val isBufferEmpty: Boolean get() = true
protected final override val isBufferAlwaysEmpty: Boolean get() = false
protected final override val isBufferEmpty: Boolean get() = value === EMPTY
protected final override val isBufferAlwaysFull: Boolean get() = false
protected final override val isBufferFull: Boolean get() = false

override fun onClosedIdempotent(closed: LockFreeLinkedListNode) {
@Suppress("UNCHECKED_CAST")
(closed.prevNode as? SendBuffered<E>)?.let { lastBuffered ->
conflatePreviousSendBuffered(lastBuffered)
}
}
override val isEmpty: Boolean get() = lock.withLock { isEmptyImpl }

/**
* Queues conflated element, returns null on success or
* returns node reference if it was already closed or is waiting for receive.
*/
private fun sendConflated(element: E): ReceiveOrClosed<*>? {
val node = SendBuffered(element)
queue.addLastIfPrev(node) { prev ->
if (prev is ReceiveOrClosed<*>) return@sendConflated prev
true
}
conflatePreviousSendBuffered(node)
return null
private val lock = ReentrantLock()

private var value: Any? = EMPTY

private companion object {
@SharedImmutable
private val EMPTY = Symbol("EMPTY")
}

private fun conflatePreviousSendBuffered(node: SendBuffered<E>) {
// Conflate all previous SendBuffered, helping other sends to conflate
var prev = node.prevNode
while (prev is SendBuffered<*>) {
if (!prev.remove()) {
prev.helpRemove()
// result is `OFFER_SUCCESS | Closed`
protected override fun offerInternal(element: E): Any {
var receive: ReceiveOrClosed<E>? = null
lock.withLock {
closedForSend?.let { return it }
// if there is no element written in buffer
if (value === EMPTY) {
// check for receivers that were waiting on the empty buffer
loop@ while(true) {
receive = takeFirstReceiveOrPeekClosed() ?: break@loop // break when no receivers queued
if (receive is Closed) {
return receive!!
}
val token = receive!!.tryResumeReceive(element, null)
if (token != null) {
assert { token === RESUME_TOKEN }
return@withLock
}
}
}
prev = prev.prevNode
value = element
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
return receive!!.offerResult
}

// result is always `OFFER_SUCCESS | Closed`
protected override fun offerInternal(element: E): Any {
while (true) {
val result = super.offerInternal(element)
when {
result === OFFER_SUCCESS -> return OFFER_SUCCESS
result === OFFER_FAILED -> { // try to buffer
when (val sendResult = sendConflated(element)) {
null -> return OFFER_SUCCESS
is Closed<*> -> return sendResult
// result is `ALREADY_SELECTED | OFFER_SUCCESS | Closed`
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
var receive: ReceiveOrClosed<E>? = null
lock.withLock {
closedForSend?.let { return it }
if (value === EMPTY) {
loop@ while(true) {
val offerOp = describeTryOffer(element)
val failure = select.performAtomicTrySelect(offerOp)
when {
failure == null -> { // offered successfully
receive = offerOp.result
return@withLock
}
failure === OFFER_FAILED -> break@loop // cannot offer -> Ok to queue to buffer
failure === RETRY_ATOMIC -> {} // retry
failure === ALREADY_SELECTED || failure is Closed<*> -> return failure
else -> error("performAtomicTrySelect(describeTryOffer) returned $failure")
}
// otherwise there was receiver in queue, retry super.offerInternal
}
result is Closed<*> -> return result
else -> error("Invalid offerInternal result $result")
}
// try to select sending this element to buffer
if (!select.trySelect()) {
return ALREADY_SELECTED
}
value = element
return OFFER_SUCCESS
}
// breaks here if offer meets receiver
receive!!.completeResumeReceive(element)
return receive!!.offerResult
}

// result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
while (true) {
val result = if (hasReceiveOrClosed)
super.offerSelectInternal(element, select) else
(select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
when {
result === ALREADY_SELECTED -> return ALREADY_SELECTED
result === OFFER_SUCCESS -> return OFFER_SUCCESS
result === OFFER_FAILED -> {} // retry
result === RETRY_ATOMIC -> {} // retry
result is Closed<*> -> return result
else -> error("Invalid result $result")
// result is `E | POLL_FAILED | Closed`
protected override fun pollInternal(): Any? {
var result: Any? = null
lock.withLock {
if (value === EMPTY) return closedForSend ?: POLL_FAILED
result = value
value = EMPTY
}
return result
}

// result is `E | POLL_FAILED | Closed`
protected override fun pollSelectInternal(select: SelectInstance<*>): Any? {
var result: Any? = null
lock.withLock {
if (value === EMPTY) return closedForSend ?: POLL_FAILED
if (!select.trySelect())
return ALREADY_SELECTED
result = value
value = EMPTY
}
return result
}

protected override fun onCancelIdempotent(wasClosed: Boolean) {
if (wasClosed) {
lock.withLock {
value = EMPTY
}
}
super.onCancelIdempotent(wasClosed)
}
}

override fun enqueueReceiveInternal(receive: Receive<E>): Boolean = lock.withLock {
super.enqueueReceiveInternal(receive)
}

// ------ debug ------

override val bufferDebugString: String
get() = "(value=$value)"
}
Expand Up @@ -41,14 +41,6 @@ class ChannelLFStressTest : TestBase() {
checkAllReceived()
}

@Test
fun testConflatedLockFreedom() {
// This test does not really verify that all sent elements were received
// and checks only LF property
channel = Channel(Channel.CONFLATED)
performLockFreedomTest()
}

private fun performLockFreedomTest() {
env.onCompletion {
// We must cancel the channel to abort both senders & receivers
Expand Down
Expand Up @@ -17,7 +17,6 @@ import org.jetbrains.kotlinx.lincheck.paramgen.*
import org.jetbrains.kotlinx.lincheck.verifier.*
import org.junit.*


class RendezvousChannelLCStressTest : ChannelLCStressTestBase(
c = Channel(RENDEZVOUS),
sequentialSpecification = SequentialRendezvousChannel::class.java
Expand Down Expand Up @@ -48,7 +47,6 @@ class ConflatedChannelLCStressTest : ChannelLCStressTestBase(
)
class SequentialConflatedChannel : SequentialIntChannelBase(CONFLATED)


@Param.Params(
Param(name = "value", gen = IntGen::class, conf = "1:5"),
Param(name = "closeToken", gen = IntGen::class, conf = "1:3")
Expand Down Expand Up @@ -105,10 +103,10 @@ abstract class ChannelLCStressTestBase(private val c: Channel<Int>, private val
// @Operation
fun cancel(@Param(name = "closeToken") token: Int) = c.cancel(NumberedCancellationException(token))

// @Operation
// @Operation
fun isClosedForReceive() = c.isClosedForReceive

// @Operation
// @Operation
fun isClosedForSend() = c.isClosedForSend

// TODO: this operation should be (and can be!) linearizable, but is not
Expand Down

0 comments on commit 0126dba

Please sign in to comment.