diff --git a/.idea/dictionaries/shared.xml b/.idea/dictionaries/shared.xml
index 3da8e22952..63d2a9e096 100644
--- a/.idea/dictionaries/shared.xml
+++ b/.idea/dictionaries/shared.xml
@@ -3,6 +3,7 @@
kotlinx
lincheck
+ linearizable
redirector
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt b/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
index 40ddc8ec36..c7948ff5cf 100644
--- a/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
+++ b/benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
@@ -14,8 +14,8 @@ import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit
-@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
-@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
+@Warmup(iterations = 2, time = 500, timeUnit = TimeUnit.MICROSECONDS)
+@Measurement(iterations = 5, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@@ -31,7 +31,6 @@ open class SemaphoreBenchmark {
private var _3_maxPermits: Int = 0
@Param("1", "2", "4") // local machine
-// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var _4_parallelism: Int = 0
@@ -51,7 +50,7 @@ open class SemaphoreBenchmark {
val semaphore = Semaphore(_3_maxPermits)
val jobs = ArrayList(coroutines)
repeat(coroutines) {
- jobs += GlobalScope.launch {
+ jobs += GlobalScope.launch(dispatcher) {
repeat(n) {
semaphore.withPermit {
doGeomDistrWork(WORK_INSIDE)
@@ -69,7 +68,7 @@ open class SemaphoreBenchmark {
val semaphore = Channel(_3_maxPermits)
val jobs = ArrayList(coroutines)
repeat(coroutines) {
- jobs += GlobalScope.launch {
+ jobs += GlobalScope.launch(dispatcher) {
repeat(n) {
semaphore.send(Unit) // acquire
doGeomDistrWork(WORK_INSIDE)
@@ -89,4 +88,4 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor
private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
-private const val BATCH_SIZE = 1000000
+private const val BATCH_SIZE = 1_000_000
diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
index 50bfb60d62..2fca4658ec 100644
--- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
+++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
@@ -1271,6 +1271,18 @@ public final class kotlinx/coroutines/sync/MutexKt {
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}
+public abstract interface class kotlinx/coroutines/sync/ReadWriteMutex {
+ public abstract fun getWrite ()Lkotlinx/coroutines/sync/Mutex;
+ public abstract fun readLock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public abstract fun readUnlock ()V
+}
+
+public final class kotlinx/coroutines/sync/ReadWriteMutexKt {
+ public static final fun ReadWriteMutex ()Lkotlinx/coroutines/sync/ReadWriteMutex;
+ public static final fun read (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+ public static final fun write (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
+}
+
public abstract interface class kotlinx/coroutines/sync/Semaphore {
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getAvailablePermits ()I
diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle
index c45ca08cef..4ea7d5c58f 100644
--- a/kotlinx-coroutines-core/build.gradle
+++ b/kotlinx-coroutines-core/build.gradle
@@ -249,8 +249,8 @@ static void configureJvmForLincheck(task) {
task.maxHeapSize = '6g' // we may need more space for building an interleaving tree in the model checking mode
task.jvmArgs = ['--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED', // required for transformation
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
- task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
- task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
+ task.systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
+ task.systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '1' // better for the model checking mode
}
task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
diff --git a/kotlinx-coroutines-core/common/src/Debug.common.kt b/kotlinx-coroutines-core/common/src/Debug.common.kt
index 185ad295d8..94a086f8b4 100644
--- a/kotlinx-coroutines-core/common/src/Debug.common.kt
+++ b/kotlinx-coroutines-core/common/src/Debug.common.kt
@@ -8,6 +8,7 @@ internal expect val DEBUG: Boolean
internal expect val Any.hexAddress: String
internal expect val Any.classSimpleName: String
internal expect fun assert(value: () -> Boolean)
+internal inline fun assertNot(crossinline value: () -> Boolean) = assert { !value() }
/**
* Throwable which can be cloned during stacktrace recovery in a class-specific way.
diff --git a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt
index 638ec43200..ff9069c84d 100644
--- a/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt
+++ b/kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt
@@ -150,11 +150,13 @@ internal abstract class ConcurrentLinkedListNode
*/
fun remove() {
assert { removed } // The node should be logically removed at first.
- assert { !isTail } // The physical tail cannot be removed.
+ // The physical tail cannot be removed. Instead, we remove it when
+ // a new segment is added and this segment is not the tail one anymore.
+ if (isTail) return
while (true) {
// Read `next` and `prev` pointers ignoring logically removed nodes.
- val prev = leftmostAliveNode
- val next = rightmostAliveNode
+ val prev = aliveSegmentLeft
+ val next = aliveSegmentRight
// Link `next` and `prev`.
next._prev.value = prev
if (prev !== null) prev._next.value = next
@@ -166,17 +168,17 @@ internal abstract class ConcurrentLinkedListNode
}
}
- private val leftmostAliveNode: N? get() {
+ private val aliveSegmentLeft: N? get() {
var cur = prev
while (cur !== null && cur.removed)
cur = cur._prev.value
return cur
}
- private val rightmostAliveNode: N get() {
+ private val aliveSegmentRight: N get() {
assert { !isTail } // Should not be invoked on the tail node
var cur = next!!
- while (cur.removed)
+ while (cur.removed && !cur.isTail)
cur = cur.next!!
return cur
}
@@ -204,19 +206,20 @@ internal abstract class Segment>(val id: Long, prev: S?, pointers
* There are no pointers to this segment from outside, and
* it is not a physical tail in the linked list of segments.
*/
- override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
+ override val removed get() = cleanedAndPointers.value == maxSlots
// increments the number of pointers if this segment is not logically removed.
- internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
+ internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots }
// returns `true` if this segment is logically removed after the decrement.
- internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
+ internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots
/**
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
*/
fun onSlotCleaned() {
- if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
+ if (cleanedAndPointers.incrementAndGet() < maxSlots) return
+ if (removed) remove()
}
}
diff --git a/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt b/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt
new file mode 100644
index 0000000000..5103eea28b
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt
@@ -0,0 +1,680 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.internal
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.*
+import kotlinx.coroutines.sync.*
+import kotlin.coroutines.*
+import kotlin.math.*
+import kotlin.native.concurrent.*
+
+/**
+ * [SegmentQueueSynchronizer] (SQS) is an abstraction for implementing _fair_ synchronization and communication primitives.
+ * Essentially, It maintains a FIFO queue of waiting requests and provides two main functions:
+ * - [suspend] that stores the specified waiter into the queue, and
+ * - [resume] that tries to retrieve and resume the first waiter, passing the specified value to it.
+ * The key advantage of these semantics is that SQS allows to invoke [resume] before [suspend], and synchronization
+ * primitives can leverage such races. For example, our [Semaphore] implementation actively uses this property
+ * for better performance.
+ *
+ * One useful mental image of [SegmentQueueSynchronizer] is an infinite array with two counters: one references the
+ * next cell in which new waiter is enqueued as a part of [suspend] call, and another references the next cell
+ * for [resume]. The intuition is that [suspend] increments its counter and stores the continuation in the corresponding
+ * cell. Likewise, [resume] increments its counter, visits the corresponding cell, and completes the stored
+ * continuation with the specified value. However, [resume] may come to the cell before [suspend] and find the cell
+ * in the empty state. To solve this race, we introduce two [resumption modes][ResumeMode]: synchronous and asynchronous.
+ * In both case, [resume] puts the value into the empty cell, and then either finishes immediately in the [asynchronous][ASYNC]
+ * resumption mode, or waits until the value is taken by a concurrent [suspend] in the [synchronous][SYNC] one.
+ * In the latter case, if the value is not taken within a bounded time, [resume] marks the cell as _broken_.
+ * Thus, both this [resume] and the corresponding [suspend] fail. The intuition is that allowing for broken cells keeps
+ * the balance of pairwise operations, such as [acquire()][Semaphore.acquire] and [release()][Semaphore.release],
+ * so they should simply restart. This way, we can achieve wait-freedom with the [asynchronous][ASYNC] mode, and
+ * obstruction-freedom with the [synchronous][SYNC] mode.
+ *
+ * **CANCELLATION**
+ *
+ * Since [suspend] can store [CancellableContinuation]-s, it is possible for [resume] to fail if the completing continuation
+ * is already cancelled. In this case, most of the algorithms retry the whole operation. In [Semaphore], for example,
+ * when [Semaphore.release] fails on the next waiter resumption, it logically provides the permit to the waiter but takes it back
+ * after if the waiter is canceled. In the latter case, the permit should be released again so the operation restarts.
+ *
+ * However, the cancellation logic above requires to process all canceled cells. Thus, the operation that resumes waiters
+ * (e.g., [release][Semaphore.release] in [Semaphore]) works in linear time in the number of consecutive canceled waiters.
+ * This way, if N coroutines come to a semaphore, suspend, and cancel, the following [release][Semaphore.release] works
+ * in O(N) because it should process all these N cells (and increment the counter of available permits in our implementation).
+ * While the complexity is amortized by cancellations, it would be better to make such operations as [release][Semaphore.release]
+ * in [Semaphore] work predictably fast, independently on the number of cancelled requests.
+ *
+ * The main idea to improve cancellation is skipping these `CANCELLED` cells, so [resume] always succeeds
+ * if no elimination happens (remember that [resume] may fail in the [synchronous][SYNC] resumption mode if
+ * it finds the cell empty). Additionally, users should specify a cancellation handler that modifies the data structure
+ * after the continuation is cancelled (in [Semaphore], this handler increments the available permits counter back).
+ *
+ * As a result, we support two cancellation policies in [SegmentQueueSynchronizer]. In the [simple cancellation mode][SIMPLE],
+ * [resume] fails and returns `false` if it finds the cell in the `CANCELLED` state or if the waiter resumption
+ * (see [CancellableContinuation.tryResume]) fails. As we discussed, these failures are typically handled
+ * by restarting the whole operation from the beginning. With the [smart cancellation][SMART], [resume] skips `CANCELLED`
+ * cells (the cells where waiter resumption failed are also considered as `CANCELLED`). This way, even if a million of
+ * canceled continuations are stored in [SegmentQueueSynchronizer], one [resume] invocation is sufficient to pass
+ * the value to the next alive waiter since it skips all these canceled waiters. However, the smart cancellation mode
+ * provides less intuitive contract and requires users to write more complicated code -- the details are described further.
+ *
+ * The main issue with skipping `CANCELLED` cells in [resume] is that it can become illegal to put the value into
+ * the next cell. Consider the following execution: [suspend] is called, then [resume] starts, but the suspended
+ * waiter becomes canceled. This way, no waiter is waiting in [SegmentQueueSynchronizer] anymore. Thus, if [resume]
+ * skips this canceled cell, puts the value into the next empty cell, and completes, the data structure's state becomes
+ * incorrect. Instead, the value provided by this [resume] should be refused and returned to the outer data structure.
+ * Unfortunately, there is no way for [SegmentQueueSynchronizer] to decide whether the value should be refused or not.
+ * Thus, users should implement a custom cancellation handler by overriding the [onCancellation] function, which returns
+ * `true` if the cancellation completes successfully and `false` if the [resume] that will come to this cell
+ * should be refused. In the latter case, the [resume] that comes to this cell invokes [tryReturnRefusedValue] to return
+ * the value back to the outer data structure. However, it is possible for [tryReturnRefusedValue] to fail, and
+ * [returnValue] is called in this case. Typically, this [returnValue] function coincides with the one that resumes waiters
+ * (e.g., with [release][Semaphore.release] in [Semaphore]). There is also an important difference between [synchronous][SYNC]
+ * and [asynchronous][ASYNC] resumption modes. In the [synchronous][SYNC] mode, the [resume] that comes to a cell with
+ * a canceled waiter (but the cell is not in the `CANCELLED` state yet) waits in a spin-loop until the cancellation handler
+ * is processed and the cell is moved to either `CANCELLED` or `REFUSE` state. In contrast, in the [asynchronous][ASYNC] mode,
+ * [resume] replaces the canceled waiter with the value of this resumption and finishes immediately -- the cancellation handler
+ * completes this [resume] eventually. This way, in the [asynchronous][ASYNC] mode, the value passed to [resume] can be out
+ * of the data structure for a while but is guaranteed to be processed eventually.
+ *
+ * To support prompt cancellation, [SegmentQueueSynchronizer] returns the value back to the data structure by calling
+ * [returnValue] if the continuation is cancelled while dispatching. Typically, [returnValue] delegates to the operation
+ * that calls [resume], such as [release][Semaphore.release] in [Semaphore].
+ *
+ * Here is a state machine for cells. Note that only one [suspend] and at most one [resume] can deal with each cell.
+ *
+ * +-------+ `suspend` succeeds. +--------------+ `resume` is +---------------+ store `RESUMED` to +---------+ ( `cont` HAS BEEN )
+ * | NULL | ----------------------> | cont: active | -------------> | cont: resumed | --------------------> | RESUMED | ( RESUMED AND THIS )
+ * +-------+ +--------------+ successful. +---------------+ avoid memory leaks. +---------+ ( `resume` SUCCEEDS )
+ * | |
+ * | | The continuation
+ * | `resume` comes to | is cancelled.
+ * | the cell before |
+ * | `suspend` and puts V ( THE CORRESPONDING `resume` SHOULD BE )
+ * | the element into +-----------------+ The concurrent `resume` should be refused, +--------+ ( REFUSED AND `tryReturnRefusedValue`, OR )
+ * | the cell, waiting for | cont: cancelled | -----------------------------------------------> | REFUSE | ( `returnValue` IF IT FAILS, IS USED TO )
+ * | `suspend` if the resume +-----------------+ `onCancellation` returned `false`. +--------+ ( RETURN THE VALUE BACK TO THE OUTER )
+ * | mode is `SYNC`. | \ ^ ( SYNCHRONIZATION PRIMITIVE )
+ * | | \ |
+ * | Mark the cell as `CANCELLED` | \ |
+ * | if the cancellation mode is | \ `resume` delegates its completion to | `onCancellation` returned `false,
+ * | `SIMPLE` or `onCancellation` | \ the concurrent cancellation handler if | mark the state accordingly and
+ * | returned `true`. | \ `SMART` cancellation mode is used. | complete the hung `resume`.
+ * | | +------------------------------------------+ |
+ * | | \ |
+ * | ( THE CONTINUATION IS ) V V |
+ * | ( CANCELLED AND `resume` ) +-----------+ +-------+
+ * | ( FAILS IN THE SIMPLE ) | CANCELLED | <-------------------------------------------------- | value |
+ * | ( CANCELLATION MODE OR ) +-----------+ Mark the cell as `CANCELLED` if `onCancellation` +-------+
+ * | ( SKIPS THIS CELL WITH ) returned true, complete the hung `resume` accordingly.
+ * | ( SMART CANCELLATION )
+ * |
+ * |
+ * | `suspend` gets +-------+ ( RENDEZVOUS HAPPENED, )
+ * | +-----------------> | TAKEN | ( BOTH `resume` AND )
+ * V | the element. +-------+ ( `suspend` SUCCEED )
+ * +-------+ |
+ * | value | --<
+ * +-------+ |
+ * | `tryResume` has waited a bounded time, +--------+
+ * +---------------------------------------> | BROKEN | (BOTH `suspend` AND `resume` FAIL)
+ * but `suspend` has not come. +--------+
+ *
+ *
+ * **INFINITE ARRAY IMPLEMENTATION**
+ *
+ * The last open question is how to emulate the infinite array used in the mental image described above. Notice that
+ * all cells are processed in sequential order. Therefore, the algorithm needs to have access only to the cells
+ * between the counters for [resume] and [suspend], and does not need to store an infinite array of cells.
+ *
+ * To make the implementation efficient, we maintain a linked list of [segments][SQSSegment], see the basic [Segment]
+ * class and the corresponding source file. In short, each segment has a unique id, and can be considered as a node in
+ * a Michael-Scott queue. Following this structure, we can maintain the cells that are in the current active
+ * range (between the counters), and access the cells similarly to the plain array. Specifically, we change the current working
+ * segment once every [SEGMENT_SIZE] operations, where [SEGMENT_SIZE] is the number of cells stored in each segment.
+ * It is worth noting that this linked list of segments does not store the ones full of cancelled cells; thus, avoiding
+ * memory leaks and guaranteeing constant time complexity for [resume], even when the [smart cancellation][SMART] is used.
+ */
+internal abstract class SegmentQueueSynchronizer {
+ private val resumeSegment: AtomicRef
+ private val resumeIdx = atomic(0L)
+ private val suspendSegment: AtomicRef
+ private val suspendIdx = atomic(0L)
+
+ init {
+ val s = SQSSegment(0, null, 2)
+ resumeSegment = atomic(s)
+ suspendSegment = atomic(s)
+ }
+
+ /**
+ * Specifies whether [resume] should work in
+ * [synchronous][SYNC] or [asynchronous][ASYNC] mode.
+ */
+ protected open val resumeMode: ResumeMode get() = SYNC
+
+ /**
+ * Specifies whether [resume] should fail on cancelled waiters ([SIMPLE] mode) or
+ * skip them ([SMART] mode). Remember that in case of [smart][SMART] cancellation mode,
+ * [onCancellation] handler should be implemented.
+ */
+ protected open val cancellationMode: CancellationMode get() = SIMPLE
+
+ /**
+ * This function is called when waiter is cancelled and smart
+ * cancellation mode is used (so cancelled cells are skipped by
+ * [resume]). By design, this handler performs the logical cancellation
+ * and returns `true` if the cancellation succeeds and the cell can be
+ * moved to the `CANCELLED` state. In this case, [resume] skips the cell and passes
+ * the value to the next waiter in the waiting queue. However, if the [resume]
+ * that comes to this cell should be refused, [onCancellation] should return false.
+ * In this case, [tryReturnRefusedValue] is invoked with the value of this [resume],
+ * following by [returnValue] if [tryReturnRefusedValue] fails.
+ */
+ protected open fun onCancellation() : Boolean = false
+
+ /**
+ * This function specifies how the value refused by this [SegmentQueueSynchronizer]
+ * (when [onCancellation] returns `false`) should be returned back to the data structure.
+ * It returns `true` on success and `false` when the attempt fails. In the latter case,
+ * [returnValue] is used to complete the returning process.
+ */
+ protected open fun tryReturnRefusedValue(value: T): Boolean = true
+
+ /**
+ * This function specifies how the value from a failed [resume] should be returned back to
+ * the data structure. Typically, this function delegates to the one that invokes [resume]
+ * (e.g., [release()][Semaphore.release] in [Semaphore]).
+ *
+ * This function is invoked when [onCancellation] returns `false` and the following [tryReturnRefusedValue]
+ * fails, or when prompt cancellation occurs and the value should be returned back to the data structure.
+ */
+ protected open fun returnValue(value: T) {}
+
+ /**
+ * This is a shortcut for [tryReturnRefusedValue] and
+ * the following [returnValue] invocation on failure.
+ */
+ private fun returnRefusedValue(value: T) {
+ if (tryReturnRefusedValue(value)) return
+ returnValue(value)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ internal fun suspend(cont: Continuation): Boolean {
+ // Increment `suspendIdx` and find the segment
+ // with the corresponding id. It is guaranteed
+ // that this segment is not removed since at
+ // least the cell for this `suspend` invocation
+ // is not in the `CANCELLED` state.
+ val curSuspendSegm = this.suspendSegment.value
+ val suspendIdx = suspendIdx.getAndIncrement()
+ val segment = this.suspendSegment.findSegmentAndMoveForward(id = suspendIdx / SEGMENT_SIZE, startFrom = curSuspendSegm,
+ createNewSegment = ::createSegment).segment
+ assert { segment.id == suspendIdx / SEGMENT_SIZE }
+ // Try to install the waiter into the cell - this is the regular path.
+ val i = (suspendIdx % SEGMENT_SIZE).toInt()
+ if (segment.cas(i, null, cont)) {
+ // The continuation is successfully installed, and
+ // `resume` cannot break the cell now, so this
+ // suspension is successful.
+ // Add a cancellation handler if required and finish.
+ if (cont is CancellableContinuation<*>) {
+ cont.invokeOnCancellation(SQSCancellationHandler(segment, i).asHandler)
+ }
+ return true
+ }
+ // The continuation installation has failed. This happened because a concurrent
+ // `resume` came earlier to this cell and put its value into it. Remember that
+ // in the `SYNC` resumption mode this concurrent `resume` can mark the cell as broken.
+ //
+ // Try to grab the value if the cell is not in the `BROKEN` state.
+ val value = segment.get(i)
+ if (value !== BROKEN && segment.cas(i, value, TAKEN)) {
+ // The elimination is performed successfully,
+ // complete with the value stored in the cell.
+ value as T
+ when (cont) {
+ is CancellableContinuation<*> -> {
+ cont as CancellableContinuation
+ cont.resume(value, { returnValue(value) }) // TODO do we really need this?
+ }
+ else -> {
+ cont.resume(value)
+ }
+ }
+ return true
+ }
+ // The cell is broken, this can happen only in the `SYNC` resumption mode.
+ assert { resumeMode == SYNC && segment.get(i) === BROKEN }
+ return false
+ }
+
+ /**
+ * Tries to resume the next waiter and returns `true` if
+ * the resumption succeeds. However, it can fail due to
+ * several reasons. First, if the [synchronous][SYNC] resumption
+ * mode is used, this [resume] invocation may come before [suspend],
+ * find the cell in the empty state, mark it as [broken][BROKEN],
+ * and fail returning `false` as a result. Another reason for [resume]
+ * to fail is waiter cancellation if the [simple cancellation mode][SIMPLE]
+ * is used.
+ *
+ * Note that with the [smart][SMART] cancellation mode [resume] skips
+ * cancelled waiters and can fail only in case of unsuccessful elimination
+ * due to [synchronous][SYNC] resumption.
+ */
+ fun resume(value: T): Boolean {
+ // Should we skip cancelled cells?
+ val skipCancelled = cancellationMode != SIMPLE
+ while (true) {
+ // Try to resume the next waiter, adjust `resumeIdx` if
+ // cancelled cells will be skipped anyway.
+ when (tryResumeImpl(value = value, adjustResumeIdx = skipCancelled)) {
+ TRY_RESUME_SUCCESS -> return true
+ TRY_RESUME_FAIL_CANCELLED -> if (!skipCancelled) return false
+ TRY_RESUME_FAIL_BROKEN -> return false
+ }
+ }
+ }
+
+ /**
+ * Tries to resume the next waiter, and returns [TRY_RESUME_SUCCESS] on
+ * success, [TRY_RESUME_FAIL_CANCELLED] if the next waiter is cancelled,
+ * or [TRY_RESUME_FAIL_BROKEN] if the next cell has been marked as broken
+ * by this [tryResumeImpl] invocation due to a race in the [SYNC] resumption mode.
+ *
+ * In the [smart cancellation mode][SMART], all cells marked as
+ * [cancelled][CANCELLED] should be skipped, so there is no need
+ * to increment [resumeIdx] one-by-one if there is a removed segment
+ * (logically full of [cancelled][CANCELLED] cells). Instead, the algorithm
+ * moves [resumeIdx] to the first possibly non-cancelled cell, i.e.,
+ * to the first segment id multiplied by [SEGMENT_SIZE].
+ */
+ @Suppress("UNCHECKED_CAST")
+ private fun tryResumeImpl(value: T, adjustResumeIdx: Boolean): Int {
+ // Check that `adjustResumeIdx` is `false` in the simple cancellation mode.
+ assertNot { cancellationMode == SIMPLE && adjustResumeIdx }
+ // Increment `resumeIdx` and find the first segment with
+ // the corresponding or higher (if the required segment
+ // is physically removed) id.
+ val curResumeSegm = this.resumeSegment.value
+ val resumeIdx = resumeIdx.getAndIncrement()
+ val id = resumeIdx / SEGMENT_SIZE
+ val segment = this.resumeSegment.findSegmentAndMoveForward(id, startFrom = curResumeSegm,
+ createNewSegment = ::createSegment).segment
+ // The previous segments can be safely collected by GC, clean the pointer to them.
+ segment.cleanPrev()
+ // Is the required segment physically removed?
+ if (segment.id > id) {
+ // Adjust `resumeIdx` to the first non-removed segment if needed.
+ if (adjustResumeIdx) adjustResumeIdx(segment.id * SEGMENT_SIZE)
+ // The cell #resumeIdx is in the `CANCELLED` state, return the corresponding failure.
+ return TRY_RESUME_FAIL_CANCELLED
+ }
+ // Modify the cell according to the state machine,
+ // all the transitions are performed atomically.
+ val i = (resumeIdx % SEGMENT_SIZE).toInt()
+ modify_cell@while (true) {
+ val cellState = segment.get(i)
+ when {
+ // Is the cell empty?
+ cellState === null -> {
+ // Try to perform an elimination by putting the
+ // value to the empty cell and wait until it is
+ // taken by a concurrent `suspend` in case of
+ // using the synchronous resumption mode.
+ if (!segment.cas(i, null, value)) continue@modify_cell
+ // Finish immediately in the asynchronous resumption mode.
+ if (resumeMode == ASYNC) return TRY_RESUME_SUCCESS
+ // Wait for a concurrent `suspend` (which should mark
+ // the cell as taken) for a bounded time in a spin-loop.
+ repeat(MAX_SPIN_CYCLES) {
+ if (segment.get(i) === TAKEN) return TRY_RESUME_SUCCESS
+ }
+ // The value is still not taken, try to atomically mark the cell as broken.
+ // A CAS failure indicates that the value is successfully taken.
+ return if (segment.cas(i, value, BROKEN)) TRY_RESUME_FAIL_BROKEN else TRY_RESUME_SUCCESS
+ }
+ // Is the waiter cancelled?
+ cellState === CANCELLED -> {
+ // Return the corresponding failure.
+ return TRY_RESUME_FAIL_CANCELLED
+ }
+ // Should the current `resume` be refused by this SQS?
+ cellState === REFUSE -> {
+ // This state should not occur
+ // in the simple cancellation mode.
+ assert { cancellationMode != SIMPLE }
+ // Return the refused value back to the
+ // data structure and finish successfully.
+ returnRefusedValue(value)
+ return TRY_RESUME_SUCCESS
+ }
+ // Does the cell store a cancellable continuation?
+ cellState is CancellableContinuation<*> -> {
+ // Change the cell state to `RESUMED`, so
+ // the cancellation handler cannot be invoked
+ // even if the continuation becomes cancelled.
+ if (!segment.cas(i, cellState, RESUMED)) continue@modify_cell
+ // Try to resume the continuation.
+ val token = (cellState as CancellableContinuation).tryResume(value, null, { returnValue(value) })
+ if (token != null) {
+ // Hooray, the continuation is successfully resumed!
+ cellState.completeResume(token)
+ } else {
+ // Unfortunately, the continuation resumption has failed.
+ // Fail the current `resume` if the simple cancellation mode is used.
+ if (cancellationMode === SIMPLE)
+ return TRY_RESUME_FAIL_CANCELLED
+ // In the smart cancellation mode, the cancellation handler should be invoked.
+ val cancelled = onCancellation()
+ if (cancelled) {
+ // We could mark the cell as `CANCELLED` for consistency,
+ // but there is no need for this since the cell cannot
+ // be processed by another operation anymore.
+ //
+ // Try to resume the next waiter. If the resumption fails due to
+ // a race in the synchronous mode, the value should be returned
+ // back to the data structure.
+ if (!resume(value)) returnValue(value)
+ } else {
+ // The value is refused by this SQS, return it back to the data structure.
+ returnRefusedValue(value)
+ }
+ }
+ // Once the state is changed to `RESUMED`, `resume` is considered as successful.
+ return TRY_RESUME_SUCCESS
+ }
+ // Does the cell store a cancelling waiter, which is already logically
+ // cancelled but the cancellation handler has not been completed yet?
+ cellState === CANCELLING -> {
+ // Fail in the simple cancellation mode.
+ if (cancellationMode == SIMPLE) return TRY_RESUME_FAIL_CANCELLED
+ // In the smart cancellation mode, this cell should be either skipped
+ // (when it becomes `CANCELLED`), or the current `resume` should be refused.
+ //
+ // In the synchronous resumption mode, `resume(..)` waits in a an unbounded spin-loop until
+ // the state of this cell is changed to either `CANCELLED` or `REFUSE`. While this part makes
+ // the overall algorithm blocking in theory, this cancellation handler and `resume` overlap occurs
+ // relatively rare in practice and it is guaranteed that one cancellation can block at most one
+ // `resume`, what makes the algorithm almost non-blocking in any real-world high-contended scenario.
+ if (resumeMode == SYNC) continue@modify_cell
+ // In the asynchronous resumption mode, `resume` puts the resumption value into the cell,
+ // so the concurrent cancellation handler completes this `resume` after it decides whether
+ // the cell should be marked as `CANCELLED` or `REFUSE`. Thus, this `resume` is delegated to
+ // the cancellation handler and can be postponed for a while.
+ //
+ // To distinguish continuations related to the `suspend` operation with the continuations passed
+ // as values (this is strange but possible), we wrap the last ones with `WrappedContinuationValue`.
+ val valueToStore: Any = if (value is Continuation<*>) WrappedContinuationValue(value) else value
+ if (segment.cas(i, cellState, valueToStore)) return TRY_RESUME_SUCCESS
+ }
+ // The cell stores a plane non-cancellable continuation, we can simply resume it.
+ cellState is Continuation<*> -> {
+ // Resume the continuation and mark the cell
+ // as `RESUMED` to avoid memory leaks.
+ segment.set(i, RESUMED)
+ (cellState as Continuation).resume(value)
+ return TRY_RESUME_SUCCESS
+ }
+ else -> error("Unexpected cell state: $cellState")
+ }
+ }
+ }
+
+ /**
+ * Updates [resumeIdx] to [newValue] if the current value is lower.
+ */
+ private fun adjustResumeIdx(newValue: Long): Unit = resumeIdx.loop { cur ->
+ if (cur >= newValue) return
+ if (resumeIdx.compareAndSet(cur, newValue)) return
+ }
+
+ /**
+ * These modes define the strategy that [resume] should
+ * use if it comes to the cell before [suspend] and finds it empty.
+ * In the [asynchronous][ASYNC] mode, [resume] puts the value into the cell,
+ * so [suspend] grabs it after that and completes without actual suspension.
+ * In other words, an elimination happens in this case.
+ *
+ * However, such a strategy produces an incorrect behavior when used for some
+ * data structures (e.g., for [tryAcquire][Semaphore.tryAcquire] in [Semaphore]),
+ * so the [synchronous][SYNC] mode has been introduced in addition.
+ * Similarly to the asynchronous one, [resume] puts the value into the cell,
+ * but do not finish immediately. In opposite, it waits in a bounded spin-loop
+ * (see [MAX_SPIN_CYCLES]) until the value is taken and completes only after that.
+ * If the value is not taken after this spin-loop ends, [resume] marks the cell as
+ * [broken][BROKEN] and fails, so the corresponding [suspend] invocation finds the cell
+ * [broken][BROKEN] and fails as well.
+ */
+ internal enum class ResumeMode { SYNC, ASYNC }
+
+ /**
+ * These modes define the strategy that should be used when a waiter becomes cancelled.
+ *
+ * In the [simple cancellation mode][SIMPLE], [resume] fails when the waiter in the working cell is cancelled.
+ * In the [smart cancellation mode][SMART], [resume] skips cancelled cells and passes the value to the first
+ * non-cancelled waiter. However, it is also possible that the cancelled waiter was the last one, so this
+ * [resume] should be refused (in this case, the corresponding [onCancellation] call returns false), and
+ * the value is returned back to the data structure via [returnValue].
+ */
+ internal enum class CancellationMode { SIMPLE, SMART }
+
+ /**
+ * This cancellation handler is invoked when the waiter
+ * located by ```segment[index]``` becomes cancelled.
+ */
+ private inner class SQSCancellationHandler(
+ private val segment: SQSSegment,
+ private val index: Int
+ ) : CancelHandler() {
+ override fun invoke(cause: Throwable?) {
+ // Invoke the cancellation handler
+ // only if the state is not `RESUMED`.
+ //
+ // After the state is changed to `RESUMED`, the
+ // resumption is considered as logically successful,
+ // and the value can be returned back to the data structure
+ // only via a `returnValue(..)` call.
+ if (!segment.tryMarkCancelling(index)) return
+ // Do we use simple or smart cancellation?
+ if (cancellationMode === SIMPLE) {
+ // In the simple cancellation mode the logic
+ // is straightforward -- mark the cell as
+ // cancelled to avoid memory leaks and complete.
+ segment.markCancelled(index)
+ return
+ }
+ // We are in the smart cancellation mode.
+ // Invoke `onCancellation()` and mark the cell as `CANCELLED`
+ // if the call returns `true`, or as `REFUSE` if it
+ // returns `false`. Note that it is possible for a
+ // concurrent `resume(..)` to put its resumption value
+ // into the cell in the asynchronous mode. In this case,
+ // the cancellation handler should complete this `resume(..)`.
+ val cancelled = onCancellation()
+ if (cancelled) {
+ // The cell should be considered as cancelled.
+ // Mark the cell correspondingly and help a concurrent
+ // `resume(..)` to process its value if needed.
+ val value = segment.markCancelled(index) ?: return
+ // Resume the next waiter with the value
+ // provided by a concurrent `resume(..)`.
+ // The value could be put only in the asynchronous mode,
+ // so the `resume(..)` call above must not fail.
+ resume(value as T)
+ } else {
+ // The `resume(..)` that will come to this cell should be refused.
+ // Mark the cell correspondingly and help a concurrent
+ // `resume(..)` to process its value if needed.
+ val value = segment.markRefuse(index) ?: return
+ returnRefusedValue(value as T)
+ }
+ }
+
+ override fun toString() = "SQSCancellationHandler[$segment, $index]"
+ }
+
+ // We use this string representation for traces in Lincheck tests
+ override fun toString(): String {
+ val waiters = ArrayList()
+ var curSegment = resumeSegment.value
+ var curIdx = resumeIdx.value
+ while (curIdx < max(suspendIdx.value, resumeIdx.value)) {
+ val i = (curIdx % SEGMENT_SIZE).toInt()
+ waiters += when {
+ curIdx < curSegment.id * SEGMENT_SIZE -> "CANCELLED"
+ curSegment.get(i) is Continuation<*> -> ""
+ else -> curSegment.get(i).toString()
+ }
+ curIdx++
+ if (curIdx == (curSegment.id + 1) * SEGMENT_SIZE)
+ curSegment = curSegment.next ?: break
+ }
+ return "suspendIdx=${suspendIdx.value},resumeIdx=${resumeIdx.value},waiters=$waiters"
+ }
+}
+
+private fun createSegment(id: Long, prev: SQSSegment?) = SQSSegment(id, prev, 0)
+
+/**
+ * The queue of waiters in [SegmentQueueSynchronizer] is represented as a linked list of [SQSSegment].
+ */
+private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment(id, prev, pointers) {
+ private val waiters = atomicArrayOfNulls(SEGMENT_SIZE)
+ override val maxSlots: Int get() = SEGMENT_SIZE
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun get(index: Int): Any? = waiters[index].value
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun set(index: Int, value: Any?) {
+ waiters[index].value = value
+ }
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = waiters[index].compareAndSet(expected, value)
+
+ @Suppress("NOTHING_TO_INLINE")
+ inline fun getAndSet(index: Int, value: Any?): Any? = waiters[index].getAndSet(value)
+
+ /**
+ * In the SQS algorithm, we use different handlers for normal and prompt cancellations.
+ * However, the current [CancellableContinuation] API (which is, hopefully, subject to change)
+ * does not allow to split the handlers -- the one set by [CancellableContinuation.invokeOnCancellation]
+ * is always invoked, even when prompt cancellation occurs. To guarantee that only the proper handler is used
+ * (either the one installed by `invokeOnCancellation { ... }` or the one passed to `tryResume(..)`),
+ * we use a special intermediate `CANCELLING` state for the normal cancellation. Thus, once the waiter becomes
+ * cancelled, it should be atomically replaced with the `CANCELLING` marker. At the same time, if the state
+ * is already `RESUMED`, the continuation is considered as logically resumed, [tryMarkCancelling] returns false,
+ * and any cancellation is considered as prompt one, so the handler passed to `tryResume(..)` is used.
+ * This handler simply returns the value back to the data structure via `returnValue(..)` invocation.
+ */
+ fun tryMarkCancelling(index: Int): Boolean {
+ while (true) {
+ val cellState = get(index)
+ when {
+ cellState === RESUMED -> return false
+ cellState is CancellableContinuation<*> -> {
+ if (cas(index, cellState, CANCELLING)) return true
+ }
+ else -> {
+ if (cellState is Continuation<*>)
+ error("Only cancellable continuations can be cancelled, ${cellState::class.simpleName} has been detected")
+ else
+ error("Unexpected cell state: $cellState")
+ }
+ }
+ }
+ }
+
+ /**
+ * Atomically replaces [CANCELLING] with [CANCELLED] and returns `null` on success.
+ * However, in the asynchronous resumption mode, [resume] may to come to the cell
+ * while it is in the [CANCELLING] state, replace the [CANCELLING] marker with the
+ * resumption value, and finish, delegating the rest of the resumption. In this case,
+ * the function returns this value, and the caller must complete the resumption.
+ *
+ * In addition, this function checks whether the segment becomes full of cancelled
+ * cells, and physically removes the segment from the linked list in this case; thus,
+ * avoiding possible memory leaks caused by cancellation.
+ */
+ fun markCancelled(index: Int): Any? = mark(index, CANCELLED).also {
+ onSlotCleaned()
+ }
+
+ /**
+ * Atomically replaces [CANCELLING] with [REFUSE] and returns `null` on success.
+ * However, in the asynchronous resumption mode, [resume] may to come to the cell
+ * while it is in the [CANCELLING] state, replace the [CANCELLING] marker with the
+ * resumption value, and finish, delegating the rest of the resumption. In this case,
+ * the function returns this value, and the caller must complete the resumption.
+ */
+ fun markRefuse(index: Int): Any? = mark(index, REFUSE)
+
+ /**
+ * Updates the cell state to either [CANCELLED] or [REFUSE] from [CANCELLING], the
+ * corresponding update [marker] is passed as an argument. However, in the asynchronous
+ * resumption mode, it is possible for [resume] to come to the cell while it is in
+ * the [CANCELLING] state. In this case, [resume] replaces the [CANCELLING] marker
+ * with the resumption value and finishes, delegating the rest of the resumption to
+ * the concurrent cancellation handler. Therefore, this [mark] function atomically
+ * checks whether there is a value put into the cell by a concurrent [resume] and
+ * either returns this value if found, or `null` if the cell was in the [CANCELLING] state.
+ */
+ private fun mark(index: Int, marker: Any?): Any? {
+ val old = getAndSet(index, marker)
+ // The cell should be in the `CANCELLING` state or store
+ // an asynchronously put value at the point of this update.
+ assert { old !== RESUMED && old !== CANCELLED && old !== REFUSE && old !== TAKEN && old !== BROKEN }
+ assert { old !is Continuation<*> }
+ // Return `null` if no value has been passed in meantime.
+ if (old === CANCELLING) return null
+ // A concurrent `resume(..)` has put a value into the cell, return it as a result.
+ return if (old is WrappedContinuationValue) old.cont else old
+ }
+
+ override fun toString() = "SQSSegment[id=$id, hashCode=${hashCode()}]"
+}
+
+/**
+ * In the [smart cancellation mode][SegmentQueueSynchronizer.CancellationMode.SMART]
+ * it is possible for [resume] to come to a cell with cancelled continuation and
+ * asynchronously put the resumption value into the cell, so the cancellation handler decides whether
+ * this value should be used for resuming the next waiter or be refused. When this
+ * value is a continuation, it is hard to distinguish it with the one related to the cancelled
+ * waiter. To solve the problem, such values of type [Continuation] are wrapped with
+ * [WrappedContinuationValue]. Note that the wrapper is required only in [SegmentQueueSynchronizer.CancellationMode.SMART]
+ * mode and is used in the asynchronous race resolution logic between cancellation and [resume]
+ * invocation; this way, it is used relatively rare.
+ */
+private class WrappedContinuationValue(val cont: Continuation<*>)
+
+@SharedImmutable
+private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.sqs.segmentSize", 16)
+@SharedImmutable
+private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.sqs.maxSpinCycles", 100)
+@SharedImmutable
+private val TAKEN = Symbol("TAKEN")
+@SharedImmutable
+private val BROKEN = Symbol("BROKEN")
+@SharedImmutable
+private val CANCELLING = Symbol("CANCELLING")
+@SharedImmutable
+private val CANCELLED = Symbol("CANCELLED")
+@SharedImmutable
+private val REFUSE = Symbol("REFUSE")
+@SharedImmutable
+private val RESUMED = Symbol("RESUMED")
+
+private const val TRY_RESUME_SUCCESS = 0
+private const val TRY_RESUME_FAIL_CANCELLED = 1
+private const val TRY_RESUME_FAIL_BROKEN = 2
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
index 7d0a343d95..b757d0adbd 100644
--- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt
@@ -28,15 +28,21 @@ import kotlin.native.concurrent.*
*/
public interface Mutex {
/**
- * Returns `true` when this mutex is locked.
+ * Returns `true` if this mutex is locked.
*/
public val isLocked: Boolean
/**
* Tries to lock this mutex, returning `false` if this mutex is already locked.
*
+ * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
+ * released at the end of your critical section and [unlock] is never invoked before a successful
+ * lock acquisition.
+ *
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* is already locked with the same token (same identity), this function throws [IllegalStateException].
+ *
+ *
*/
public fun tryLock(owner: Any? = null): Boolean
@@ -58,6 +64,10 @@ public interface Mutex {
*
* This function is fair; suspended callers are resumed in first-in-first-out order.
*
+ * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
+ * released at the end of your critical section and [unlock] is never invoked before a successful
+ * lock acquisition.
+ *
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* is already locked with the same token (same identity), this function throws [IllegalStateException].
*/
@@ -67,13 +77,19 @@ public interface Mutex {
* Clause for [select] expression of [lock] suspending function that selects when the mutex is locked.
* Additional parameter for the clause in the `owner` (see [lock]) and when the clause is selected
* the reference to this mutex is passed into the corresponding block.
+ *
+ * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
+ * released at the end of your critical section and [unlock] is never invoked before a successful
+ * lock acquisition.
*/
public val onLock: SelectClause2
/**
- * Checks mutex locked by owner
+ * Checks whether this mutex is locked by the specified owner.
*
* @return `true` on mutex lock by owner, `false` if not locker or it is locked by different owner
+ *
+ * TODO let's deprecate this method, as well as owners in general
*/
public fun holdsLock(owner: Any): Boolean
@@ -81,6 +97,10 @@ public interface Mutex {
* Unlocks this mutex. Throws [IllegalStateException] if invoked on a mutex that is not locked or
* was locked with a different owner token (by identity).
*
+ * It is recommended to use [withLock] for safety reasons, so that the acquired lock is always
+ * released at the end of your critical section and [unlock] is never invoked before a successful
+ * lock acquisition.
+ *
* @param owner Optional owner token for debugging. When `owner` is specified (non-null value) and this mutex
* was locked with the different token (by identity), this function throws [IllegalStateException].
*/
diff --git a/kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt b/kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt
new file mode 100644
index 0000000000..0d33e4c53d
--- /dev/null
+++ b/kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt
@@ -0,0 +1,612 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.sync
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.*
+import kotlinx.coroutines.selects.*
+import kotlinx.coroutines.sync.ReadWriteMutexImpl.WriteUnlockPolicy.*
+import kotlin.contracts.*
+import kotlin.js.*
+
+/**
+ * This readers-writer mutex maintains a logical pair of locks, one for read-only
+ * operations that can be processed concurrently (see [readLock()][readLock] and [readUnlock()][readUnlock]),
+ * and another one for write operations that require an exclusive access (see [write]).
+ * It is guaranteed that write and read operations never interfere.
+ *
+ * The table below shows which locks can be held simultaneously.
+ * +-------------+-------------+-------------+
+ * | | reader lock | writer lock |
+ * +-------------+-------------+-------------+
+ * | reader lock | ALLOWED | FORBIDDEN |
+ * +-------------+-------------+-------------+
+ * | writer lock | FORBIDDEN | FORBIDDEN |
+ * +-------------+-------------+-------------+
+ *
+ * Similar to [Mutex], this readers-writer mutex is **non-reentrant**,
+ * so invoking [readLock()][readLock] or [write.lock()][write] even from the coroutine that
+ * currently holds the corresponding lock suspends the invoker. Likewise, invoking
+ * [readLock()][readLock] from the holder of the writer lock also suspends the invoker.
+ *
+ * Typical usage of [ReadWriteMutex] is wrapping each read invocation with
+ * [read { ... }][read] and each write invocation with [write { ... }][write].
+ * These wrapper functions guarantee that the readers-writer mutex is used correctly
+ * and safely. However, one can use `lock()` and `unlock()` operations directly.
+ *
+ * The advantage of using [ReadWriteMutex] compared to the plain [Mutex] is the ability
+ * to parallelize read operations and, therefore, increase the level of concurrency.
+ * This is extremely useful for the workloads with dominating read operations so they can be
+ * executed in parallel, improving the performance and scalability. However, depending on the
+ * updates frequency, the execution cost of read and write operations, and the contention,
+ * it can be simpler and cheaper to use the plain [Mutex]. Therefore, it is highly recommended
+ * to measure the performance difference to make the right choice.
+ */
+@ExperimentalCoroutinesApi
+public interface ReadWriteMutex {
+ /**
+ * // TODO: how to reference `val write: Mutex` instead of the extension function?
+ * Acquires a reader lock of this mutex if the [writer lock][write] is not held and there is no writer
+ * waiting for it. Suspends the caller otherwise until the writer lock is released and this reader is resumed.
+ * Please note, that in this case the next waiting writer instead of this reader can be resumed after
+ * the currently active writer releases the lock.
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
+ * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
+ * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
+ * This function releases the lock if it was already acquired by this function before the [CancellationException]
+ * was thrown.
+ *
+ * Note that this function does not check for cancellation when it is not suspended.
+ * Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
+ *
+ * It is recommended to use [read { ... }][read] block for safety reasons, so the acquired reader lock
+ * is always released at the end of the critical section, and [readUnlock()][readUnlock] is never invoked
+ * before a successful [readLock()][readLock].
+ */
+ @ExperimentalCoroutinesApi
+ public suspend fun readLock()
+
+ /**
+ * Releases a reader lock of this mutex and resumes the first waiting writer
+ * if this operation has released the last acquired reader lock.
+ *
+ * It is recommended to use [read { ... }][read] block for safety reasons, so the acquired reader lock
+ * is always released at the end of the critical section, and [readUnlock()][readUnlock] is never invoked
+ * before a successful [readLock()][readLock].
+ */
+ @ExperimentalCoroutinesApi
+ public fun readUnlock()
+
+ /**
+ * Returns a [mutex][Mutex] which manipulates with the writer lock of this [ReadWriteMutex].
+ *
+ * When acquires the writer lock, the operation completes immediately if neither the writer lock nor
+ * a reader lock is held. Otherwise, the acquisition suspends the caller until the exclusive access
+ * is granted by either [readUnlock()][readUnlock] or [write.unlock()][Mutex.unlock]. Please note that
+ * all suspended writers are processed in first-in-first-out (FIFO) order.
+ *
+ * When releasing the writer lock, the operation resumes the first waiting writer or waiting readers.
+ * Note that different fairness policies can be applied by an implementation, such as
+ * prioritizing readers or writers and attempting to always resume them at first,
+ * choosing the prioritization policy by flipping a coin, or providing a truly fair
+ * strategy where all waiters, both readers and writers, form a single FIFO queue.
+ *
+ * This [Mutex] implementation for writers does not support owners in [lock()][Mutex.lock]
+ * and [withLock { ... }][Mutex.withLock] functions as well as the [onLock][Mutex.onLock] select clause.
+ *
+ * It is also recommended to use [write { ... }][write] block for safety reasons, so the acquired writer lock
+ * is always released at the end of the critical section, and [write.unlock()][Mutex.unlock] is never invoked
+ * before a successful [write.lock()][Mutex.lock].
+ */
+ @ExperimentalCoroutinesApi
+ public val write: Mutex
+}
+
+/**
+ * Creates a new [ReadWriteMutex] instance, both reader and writer locks are not acquired.
+ *
+ * Instead of ensuring the strict fairness, when all waiting readers and writers form
+ * a single queue, this implementation provides a slightly relaxed but more efficient guarantee.
+ * In this version, two separate queues for waiting readers and waiting writers are maintained.
+ * When the last reader lock is released, the first waiting writer is released -- this behaviour
+ * respects the strict fairness property. However, when the writer lock is released, the implementation
+ * either releases all the waiting readers or the first waiting writer, choosing the policy by the
+ * round-robin strategy. Thus, if the choice differs from the strict fairness, it is guaranteed that
+ * the proper waiter(s) will be resumed on the next step. Simultaneously, we find it more efficient to
+ * resume all waiting readers even if it violates the strict fairness.
+ */
+@JsName("_ReadWriteMutex")
+public fun ReadWriteMutex(): ReadWriteMutex = ReadWriteMutexImpl()
+
+/**
+ * Executes the given [action] under a _reader_ lock of this readers-writer mutex.
+ *
+ * @return the return value of the [action].
+ */
+@OptIn(ExperimentalContracts::class)
+public suspend inline fun ReadWriteMutex.read(action: () -> T): T {
+ contract {
+ callsInPlace(action, InvocationKind.EXACTLY_ONCE)
+ }
+
+ readLock()
+ try {
+ return action()
+ } finally {
+ readUnlock()
+ }
+}
+
+/**
+ * Executes the given [action] under the _writer_ lock of this readers-writer mutex.
+ *
+ * @return the return value of the [action].
+ */
+public suspend inline fun ReadWriteMutex.write(action: () -> T): T =
+ write.withLock(null, action)
+
+/**
+ * This readers-writer mutex maintains the numbers of active and waiting readers,
+ * a flag on whether the writer lock is acquired, and the number of writers waiting
+ * for the lock. This tuple represents the current state of the readers-writer mutex and
+ * is split into [waitingReaders] and [state] fields -- it is impossible to store everything
+ * in a single register since its maximal capacity is 64 bit, and this is not sufficient
+ * for three counters and several flags. Additionally, separate [SegmentQueueSynchronizer]-s
+ * are used for waiting readers and writers.
+ *
+ * To acquire a reader lock, the algorithm checks whether the writer lock is held or there is a writer
+ * waiting for it, increasing the number of _active_ readers and grabbing a read lock immediately if not.
+ * Otherwise, it atomically decreases the number of _active_ readers and increases the number of _waiting_
+ * readers and suspends.
+ * As for the writer lock acquisition, the idea is the same -- the algorithm checks whether both reader and
+ * writer locks are not acquired and takes the lock immediately in this case. Otherwise, if the writer should
+ * wait for the lock, the algorithm increases the counter of waiting writers and suspends.
+ *
+ * When releasing a reader lock, the algorithm decrements the number of active readers.
+ * If the counter reaches zero, it checks whether a writer is waiting for the lock
+ * and resumes the first waiting one.
+ * On the writer lock release, the algorithm resumes either the next waiting writer
+ * (decrementing the counter of them) or all waiting readers (decrementing the counter of waiting
+ * readers and incrementing the counter of active ones).
+ *
+ * When there are both readers and writers waiting for a lock at the point of the writer lock release,
+ * the truly fair implementation would form a single queue where all waiters, both readers and writers,
+ * are stored. Instead of ensuring the strict fairness, this implementation provides a slightly relaxed
+ * but more efficient guarantee. In short, it maintains two separate queues, for waiting readers and
+ * waiting writers. When the writer lock is released, the algorithm either releases all the waiting readers
+ * or the first waiting writer, choosing the policy by the round-robin strategy. Thus, if the choice differs
+ * from the strict fairness, it is guaranteed that the proper waiter(s) will be resumed on the next step.
+ * Simultaneously, we find it more efficient to resume all waiting readers even if it violates the strict fairness.
+ *
+ * As for cancellation, the main idea is to revert the state update. However, possible logical races
+ * should be managed carefully, which makes the revert part non-trivial. The details are discussed in the code
+ * comments and appear almost everywhere.
+ */
+internal class ReadWriteMutexImpl : ReadWriteMutex, Mutex {
+ // The number of coroutines waiting for a reader lock in `sqsReaders`.
+ private val waitingReaders = atomic(0)
+ // This state field contains several counters and is always updated atomically by `CAS`:
+ // - `AR` (active readers) is a 30-bit counter which represents the number
+ // of coroutines holding a read lock;
+ // - `WLA` (writer lock acquired) is a flag which is `true` when
+ // the writer lock is acquired;
+ // - `WW` (waiting writers) is a 30-bit counter which represents the number
+ // of coroutines waiting for the writer lock in `sqsWriters`;
+ // - `RWR` (resuming waiting readers) is a flag which is `true` when waiting readers
+ // resumption is in progress.
+ private val state = atomic(0L)
+
+ private val sqsReaders = ReadersSQS() // the place where readers should suspend and be resumed
+ private val sqsWriters = WritersSQS() // the place where writers should suspend and be resumed
+
+ private var curUnlockPolicy = false // false -- prioritize readers on the writer lock release
+ // true -- prioritize writers on the writer lock release
+
+ @ExperimentalCoroutinesApi
+ override val write: Mutex get() = this // we do not create an extra object this way.
+ override val isLocked: Boolean get() = state.value.wla
+ override fun tryLock(owner: Any?): Boolean = error("ReadWriteMutex.write does not support `tryLock()`")
+ override suspend fun lock(owner: Any?) {
+ if (owner != null) error("ReadWriteMutex.write does not support owners")
+ writeLock()
+ }
+ override val onLock: SelectClause2 get() = error("ReadWriteMutex.write does not support `onLock`")
+ override fun holdsLock(owner: Any) = error("ReadWriteMutex.write does not support owners")
+ override fun unlock(owner: Any?) {
+ if (owner != null) error("ReadWriteMutex.write does not support owners")
+ writeUnlock()
+ }
+
+ override suspend fun readLock() {
+ // Try to acquire a reader lock without suspension.
+ if (tryReadLock()) return
+ // The attempt fails, invoke the slow-path. This slow-path
+ // part is implemented in a separate function to guarantee
+ // that the tail call optimization is applied here.
+ readLockSlowPath()
+ }
+
+ private fun tryReadLock(): Boolean {
+ while (true) {
+ // Read the current state.
+ val s = state.value
+ // Is the writer lock acquired or is there a waiting writer?
+ if (!s.wla && s.ww <= 0) {
+ // A reader lock is available to acquire, try to do it!
+ // Note that there can be a concurrent `write.unlock()` which is
+ // resuming readers now, so the `RWR` flag is set in this case.
+ if (state.compareAndSet(s, state(s.ar + 1, false, 0, s.rwr)))
+ return true
+ // CAS failed => the state has changed.
+ // Re-read it and try to acquire a reader lock again.
+ continue
+ } else return false
+ }
+ }
+
+ private suspend fun readLockSlowPath() {
+ // Increment the number of waiting readers at first.
+ // If the current invocation should not suspend,
+ // the counter will be decremented back later.
+ waitingReaders.incrementAndGet()
+ // Check whether this operation should suspend. If not, try
+ // to decrement the counter of waiting readers and restart.
+ while (true) {
+ // Read the current state.
+ val s = state.value
+ // Is there a writer holding the lock or waiting for it?
+ if (s.wla || s.ww > 0) {
+ // The number of waiting readers was incremented
+ // correctly, wait for a reader lock in `sqsReaders`.
+ suspendCancellableCoroutineReusable { cont ->
+ sqsReaders.suspend(cont)
+ }
+ return
+ } else {
+ // A race has been detected! The increment of the counter of
+ // waiting readers was wrong, try to decrement it back. However,
+ // it could already become zero due to a concurrent `write.unlock()`
+ // which reads the number of waiting readers, replaces it with `0`,
+ // and resumes all these readers. In this case, it is guaranteed
+ // that a reader lock will be provided via `sqsReaders`.
+ while (true) {
+ // Read the current number of waiting readers.
+ val wr = waitingReaders.value
+ // Is our invocation already handled by a concurrent
+ // `write.unlock()` and a reader lock is going to be
+ // passed via `sqsReaders`? Suspend in this case --
+ // it is guaranteed that the lock will be provided
+ // when this concurrent `write.unlock()` completes.
+ if (wr == 0) {
+ suspendCancellableCoroutineReusable { cont ->
+ sqsReaders.suspend(cont)
+ }
+ return
+ }
+ // Otherwise, try to decrement the number of waiting
+ // readers and retry the operation from the beginning.
+ if (waitingReaders.compareAndSet(wr, wr - 1)) {
+ // Try again starting from the fast path.
+ readLock()
+ return
+ }
+ }
+ }
+ }
+ }
+
+ override fun readUnlock() {
+ // When releasing a reader lock, the algorithm checks whether
+ // this reader lock is the last acquired one and resumes
+ // the first waiting writer (if applicable) in this case.
+ while (true) {
+ // Read the current state.
+ val s = state.value
+ check(!s.wla) { "Invalid `readUnlock` invocation: the writer lock is acquired. $INVALID_UNLOCK_INVOCATION_TIP" }
+ check(s.ar > 0) { "Invalid `readUnlock` invocation: no reader lock is acquired. $INVALID_UNLOCK_INVOCATION_TIP" }
+ // Is this reader the last one and is the `RWR` flag unset (=> it is valid to resume the next writer)?
+ if (s.ar == 1 && !s.rwr) {
+ // Check whether there is a waiting writer and resume it.
+ // Otherwise, simply change the state and finish.
+ if (s.ww > 0) {
+ // Try to decrement the number of waiting writers and set the `WLA` flag.
+ // Resume the first waiting writer on success.
+ if (state.compareAndSet(s, state(0, true, s.ww - 1, false))) {
+ sqsWriters.resume(Unit)
+ return
+ }
+ } else {
+ // There is no waiting writer according to the state.
+ // Try to clear the number of active readers and finish.
+ if (state.compareAndSet(s, state(0, false, 0, false)))
+ return
+ }
+ } else {
+ // Try to decrement the number of active readers and finish.
+ // Please note that the `RWR` flag can be set here if there is
+ // a concurrent unfinished `write.unlock()` operation which
+ // has resumed the current reader but the corresponding
+ // `readUnlock()` happened before this `write.unlock()` completion.
+ if (state.compareAndSet(s, state(s.ar - 1, false, s.ww, s.rwr)))
+ return
+ }
+ }
+ }
+
+ /**
+ * This customization of [SegmentQueueSynchronizer] for waiting readers
+ * use the asynchronous resumption mode and smart cancellation mode,
+ * so neither [suspend] nor [resume] fail. However, to support
+ * `tryReadLock()` the synchronous resumption mode should be used.
+ */
+ private inner class ReadersSQS : SegmentQueueSynchronizer() {
+ override val resumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ override fun onCancellation(): Boolean {
+ // The cancellation logic here is pretty similar to
+ // the one in `readLock()` when the number of waiting
+ // readers has been incremented incorrectly.
+ while (true) {
+ // First, read the current number of waiting readers.
+ val wr = waitingReaders.value
+ // Check whether it has already reached zero -- in this
+ // case a concurrent `write.unlock()` will invoke `resume()`
+ // for this cancelled operation eventually, so `onCancellation()`
+ // should return `false` to refuse the granted lock.
+ if (wr == 0) return false
+ // Otherwise, try to decrement the number of waiting readers keeping
+ // the counter non-negative and successfully finish the cancellation.
+ if (waitingReaders.compareAndSet(wr, wr - 1)) return true
+ }
+ }
+
+ // When `onCancellation()` fails, the state keeps unchanged. Therefore,
+ // the reader lock should be returned back to the mutex in `returnValue(..)`.
+ override fun tryReturnRefusedValue(value: Unit) = false
+
+ // Returns the reader lock back to the mutex.
+ // This function is also used for prompt cancellation.
+ override fun returnValue(value: Unit) = readUnlock()
+ }
+
+ internal suspend fun writeLock() {
+ // The algorithm is straightforward -- it reads the current state,
+ // checks that there is no reader or writer lock acquired, and
+ // tries to change the state by atomically setting the `WLA` flag.
+ // Otherwise, if the writer lock cannot be acquired immediatelly,
+ // it increments the number of waiting writers and suspends in
+ // `sqsWriters` waiting for the lock.
+ while (true) {
+ // Read the current state.
+ val s = state.value
+ // Is there an active writer (the WLA flag is set), a concurrent `writeUnlock` operation,
+ // which is releasing readers now (the RWR flag is set), or an active reader (AR >= 0)?
+ if (!s.wla && !s.rwr && s.ar == 0) {
+ // Try to acquire the writer lock, re-try the operation if this CAS fails.
+ assert { s.ww == 0 }
+ if (state.compareAndSet(s, state(0, true, 0, false)))
+ return
+ } else {
+ // The lock cannot be acquired immediately, and this operation has to suspend.
+ // Try to increment the number of waiting writers and suspend in `sqsWriters`.
+ if (state.compareAndSet(s, state(s.ar, s.wla, s.ww + 1, s.rwr))) {
+ suspendCancellableCoroutineReusable { cont ->
+ sqsWriters.suspend(cont)
+ }
+ return
+ }
+ }
+ }
+ }
+
+ internal fun writeUnlock() {
+ // Since we store waiting readers and writers separately, it is not easy
+ // to determine whether the next readers or the next writer should be resumed.
+ // However, it is natural to have the following policies:
+ // - PRIORITIZE_READERS -- always resume all waiting readers at first;
+ // the next waiting writer is resumed only if no reader is waiting for a lock.
+ // - PRIORITIZE_WRITERS -- always resumed the next writer first;
+ // - ROUND_ROBIN -- switch between the policies above on every invocation.
+ //
+ // We find the round-robin strategy fair enough in practice, but the others are used
+ // in Lincheck tests. However, it could be useful to have `PRIORITIZE_WRITERS` policy
+ // in the public API for the cases when the writer lock is used for UI updates.
+ writeUnlock(ROUND_ROBIN)
+ }
+
+ internal fun writeUnlock(policy: WriteUnlockPolicy) {
+ // The algorithm for releasing the writer lock is straightforward by design,
+ // but has a lot of corner cases that should be properly managed.
+ // If the next writer should be resumed (see `PRIORITIZE_WRITERS` policy),
+ // the algorithm tries to atomically decrement the number of waiting writers
+ // and keep the `WLA` flag, resuming the first writer in `sqsWriters` after that.
+ // Otherwise, if the `PRIORITIZE_READERS` policy is used or there is no waiting writer,
+ // the algorithm sets the `RWR` (resuming waiting readers) flag and invokes a special
+ // `completeWaitingReadersResumption()` to resume all the waiting readers.
+ while (true) {
+ // Read the current state at first.
+ val s = state.value
+ check(s.wla) { "Invalid `writeUnlock` invocation: the writer lock is not acquired. $INVALID_UNLOCK_INVOCATION_TIP" }
+ assertNot { s.rwr }
+ assert { s.ar == 0 }
+ // Should we resume the next writer?
+ curUnlockPolicy = !curUnlockPolicy // change the unlock policy for the `ROUND_ROBIN` strategy
+ val resumeWriter = (s.ww > 0) && (policy == PRIORITIZE_WRITERS || policy == ROUND_ROBIN && curUnlockPolicy)
+ if (resumeWriter) {
+ // Resume the next writer - try to decrement the number of waiting
+ // writers and resume the first one in `sqsWriters` on success.
+ if (state.compareAndSet(s, state(0, true, s.ww - 1, false))) {
+ sqsWriters.resume(Unit)
+ return
+ }
+ } else {
+ // Resume waiting readers. Reset the `WLA` flag and set the `RWR` flag atomically,
+ // completing the resumption via `completeWaitingReadersResumption()` after that.
+ // Note that this function also checks whether the next waiting writer should be resumed
+ // on completion and does it if required. It also resets the `RWR` flag at the end.
+ // While it is possible that no reader is waiting for a lock, so that this CAS can be omitted,
+ // we do not add the corresponding code for simplicity since it does not improve the performance
+ // significantly but reduces the code readability.
+ if (state.compareAndSet(s, state(0, false, s.ww, true))) {
+ completeWaitingReadersResumption()
+ return
+ }
+ }
+ }
+ }
+
+ private fun completeWaitingReadersResumption() {
+ // This function is called after the `RWR` flag is set
+ // and completes the readers resumption process. Note that
+ // it also checks whether the next waiting writer should be
+ // resumed on completion and performs this resumption if needed.
+ assert { state.value.rwr }
+ // At first, atomically replace the number of waiting
+ // readers (to be resumed) with 0, retrieving the old value.
+ val wr = waitingReaders.getAndSet(0)
+ // After that, these waiting readers should be logically resumed
+ // by incrementing the corresponding counter in the `state` field.
+ // We also skip this step if the obtained number of waiting readers is zero.
+ if (wr > 0) { // should we update the state?
+ state.update { s ->
+ assertNot { s.wla } // the writer lock cannot be acquired now.
+ assert { s.rwr } // the `RWR` flag should still be set.
+ state(s.ar + wr, false, s.ww, true)
+ }
+ }
+ // After the readers are resumed logically, they should be resumed physically in `sqsReaders`.
+ repeat(wr) {
+ sqsReaders.resume(Unit)
+ }
+ // Once all the waiting readers are resumed, the `RWR` flag should be reset.
+ // It is possible that all the resumed readers have already completed their
+ // work and successfully invoked `readUnlock()` at this point, but since
+ // the `RWR` flag was set, they were unable to resume the next waiting writer.
+ // Similarly, it is possible that there were no waiting readers at all.
+ // Therefore, in the end, we check whether the number of active readers is 0
+ // and resume the next waiting writer in this case (if there exists one).
+ var resumeWriter = false
+ state.getAndUpdate { s ->
+ resumeWriter = s.ar == 0 && s.ww > 0
+ val wwUpd = if (resumeWriter) s.ww - 1 else s.ww
+ state(s.ar, resumeWriter, wwUpd, false)
+ }
+ if (resumeWriter) {
+ // Resume the next writer physically and finish
+ sqsWriters.resume(Unit)
+ return
+ }
+ // Meanwhile, it could be possible for a writer to come and suspend due to the `RWR` flag.
+ // After that, all the following readers suspend since a writer is waiting for the lock.
+ // However, if the writer becomes canceled, it cannot resume these suspended readers if the `RWR` flag
+ // is still set, so we have to help him with the resumption process. To detect such a situation, we re-read
+ // the number of waiting readers and try to start the resumption process again if the writer lock is not acquired.
+ if (waitingReaders.value > 0) { // Is there a waiting reader?
+ while (true) {
+ val s = state.value // Read the current state.
+ if (s.wla || s.ww > 0 || s.rwr) return // Check whether the readers resumption is valid.
+ // Try to set the `RWR` flag again and resume the waiting readers.
+ if (state.compareAndSet(s, state(s.ar, false, 0, true))) {
+ completeWaitingReadersResumption()
+ return
+ }
+ }
+ }
+ }
+
+ /**
+ * This customization of [SegmentQueueSynchronizer] for waiting writers
+ * uses the asynchronous resumption mode and smart cancellation mode,
+ * so neither [suspend] nor [resume] fail. However, in order to support
+ * `tryWriteLock()` the synchronous resumption mode should be used instead.
+ */
+ private inner class WritersSQS : SegmentQueueSynchronizer() {
+ override val resumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ override fun onCancellation(): Boolean {
+ // In general, on cancellation, the algorithm tries to decrement the number of waiting writers.
+ // Similarly to the cancellation logic for readers, if the number of waiting writers has already reached 0,
+ // the current canceling writer will be resumed in `sqsWriters`. In this case, the function returns
+ // `false`, and the permit will be returned via `returnValue()`. Otherwise, if the number of waiting
+ // writers >= 1, the decrement is sufficient. However, if this canceling writer is the last waiting one,
+ // the algorithm sets the `RWR` flag and resumes waiting readers. This logic is similar to `writeUnlock(..)`.
+ while (true) {
+ val s = state.value // Read the current state.
+ if (s.ww == 0) return false // Is this writer going to be resumed in `sqsWriters`?
+ // Is this writer the last one and is the readers resumption valid?
+ if (s.ww == 1 && !s.wla && !s.rwr) {
+ // Set the `RWR` flag and resume the waiting readers.
+ // While it is possible that no reader is waiting for a lock, so that this CAS can be omitted,
+ // we do not add the corresponding code for simplicity since it does not improve the performance
+ // significantly but reduces the code readability. Note that the same logic appears in `writeUnlock(..)`,
+ // and the cancellation performance is less critical since the cancellation itself does not come for free.
+ if (state.compareAndSet(s, state(s.ar, false, 0, true))) {
+ completeWaitingReadersResumption()
+ return true
+ }
+ } else {
+ // There are multiple writers waiting for the lock. Try to decrement the number of them.
+ if (state.compareAndSet(s, state(s.ar, s.wla, s.ww - 1, s.rwr)))
+ return true
+ }
+ }
+ }
+
+ // Resumes the next waiting writer if the current `writeLock()` operation
+ // is already cancelled but the next writer is logically resumed
+ override fun tryReturnRefusedValue(value: Unit): Boolean {
+ writeUnlock(PRIORITIZE_WRITERS)
+ return true
+ }
+
+ // Returns the writer lock back to the mutex.
+ // This function is also used for prompt cancellation.
+ override fun returnValue(value: Unit) = writeUnlock()
+ }
+
+ // This state representation is used in Lincheck tests.
+ internal val stateRepresentation: String get() =
+ ""
+
+ internal enum class WriteUnlockPolicy { PRIORITIZE_READERS, PRIORITIZE_WRITERS, ROUND_ROBIN }
+}
+
+/**
+ * Constructs a value for [ReadWriteMutexImpl.state] field.
+ * The created state can be parsed via the extension functions below.
+ */
+private fun state(activeReaders: Int, writeLockAcquired: Boolean, waitingWriters: Int, resumingWaitingReaders: Boolean): Long =
+ (if (writeLockAcquired) WLA_BIT else 0) +
+ (if (resumingWaitingReaders) RWR_BIT else 0) +
+ activeReaders * AR_MULTIPLIER +
+ waitingWriters * WW_MULTIPLIER
+
+// Equals `true` if the `WLA` flag is set in this state.
+private val Long.wla: Boolean get() = this or WLA_BIT == this
+// Equals `true` if the `RWR` flag is set in this state.
+private val Long.rwr: Boolean get() = this or RWR_BIT == this
+// The number of waiting writers specified in this state.
+private val Long.ww: Int get() = ((this % AR_MULTIPLIER) / WW_MULTIPLIER).toInt()
+// The number of active readers specified in this state.
+private val Long.ar: Int get() = (this / AR_MULTIPLIER).toInt()
+
+private const val WLA_BIT = 1L
+private const val RWR_BIT = 1L shl 1
+private const val WW_MULTIPLIER = 1L shl 2
+private const val AR_MULTIPLIER = 1L shl 33
+
+private const val INVALID_UNLOCK_INVOCATION_TIP = "This can be caused by releasing the lock without acquiring it at first, " +
+ "or incorrectly putting the acquisition inside the \"try\" block of the \"try-finally\" section that safely releases " +
+ "the lock in the \"finally\" block - the acquisition should be performed right before this \"try\" block."
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
index e8b28bc15c..879a7127d7 100644
--- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
+++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt
@@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.math.*
-import kotlin.native.concurrent.SharedImmutable
/**
* A counting semaphore for coroutines that logically maintains a number of available permits.
@@ -33,6 +32,9 @@ public interface Semaphore {
*
* This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
* function is suspended, this function immediately resumes with [CancellationException].
+ *
+ * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this
+ * function is suspended, this function immediately resumes with [CancellationException].
* There is a **prompt cancellation guarantee**. If the job was cancelled while this function was
* suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details.
* This function releases the semaphore if it was already acquired by this function before the [CancellationException]
@@ -43,12 +45,20 @@ public interface Semaphore {
* check for cancellation in tight loops if needed.
*
* Use [tryAcquire] to try acquire a permit of this semaphore without suspension.
+ *
+ * It is recommended to use [withPermit] for safety reasons, so that the acquired permit is always
+ * released at the end of your critical section and [release] is never invoked before a successful
+ * permit acquisition.
*/
public suspend fun acquire()
/**
* Tries to acquire a permit from this semaphore without suspension.
*
+ * It is recommended to use [withPermit] for safety reasons, so that the acquired permit is always
+ * released at the end of your critical section and [release] is never invoked before a successful
+ * permit acquisition.
+ *
* @return `true` if a permit was acquired, `false` otherwise.
*/
public fun tryAcquire(): Boolean
@@ -57,6 +67,10 @@ public interface Semaphore {
* Releases a permit, returning it into this semaphore. Resumes the first
* suspending acquirer if there is one at the point of invocation.
* Throws [IllegalStateException] if the number of [release] invocations is greater than the number of preceding [acquire].
+ *
+ * It is recommended to use [withPermit] for safety reasons, so that the acquired permit is always
+ * released at the end of your critical section and [release] is never invoked before a successful
+ * permit acquisition.
*/
public fun release()
}
@@ -90,214 +104,139 @@ public suspend inline fun Semaphore.withPermit(action: () -> T): T {
}
}
-private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore {
- /*
- The queue of waiting acquirers is essentially an infinite array based on the list of segments
- (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue
- and dequeue operation, we increment the corresponding counter at the beginning of the operation
- and use the value before the increment as a slot number. This way, each enqueue-dequeue pair
- works with an individual cell. We use the corresponding segment pointers to find the required ones.
-
- Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation
- can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons
- so that the state `PERMIT` represents different logical states.
-
- +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then
- | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets
- +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes.
- | |
- | | `acquire` request is cancelled and the continuation is
- | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks
- | to the slot before V
- | `acquire` and puts +-----------+ `release` has +--------+
- | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED)
- | slot, waiting for +-----------+ failed +--------+
- | `acquire` after
- | that.
- |
- | `acquire` gets +-------+
- | +-----------------> | TAKEN | (ELIMINATION HAPPENED)
- V | the permit +-------+
- +--------+ |
- | PERMIT | -<
- +--------+ |
- | `release` has waited a bounded time, +--------+
- +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED)
- but `acquire` has not come +--------+
- */
-
- private val head: AtomicRef
- private val deqIdx = atomic(0L)
- private val tail: AtomicRef
- private val enqIdx = atomic(0L)
-
+internal open class SemaphoreImpl(
+ private val permits: Int,
+ acquiredPermits: Int
+) : SegmentQueueSynchronizer(), Semaphore {
init {
- require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" }
- require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" }
- val s = SemaphoreSegment(0, null, 2)
- head = atomic(s)
- tail = atomic(s)
+ require(permits > 0) { "Semaphore must have at least 1 permit, but is initialized with $permits" }
+ require(acquiredPermits in 0..permits) { "The number of acquired permits should be in range [0..$permits]" }
}
/**
- * This counter indicates a number of available permits if it is non-negative,
- * or the size with minus sign otherwise. Note, that 32-bit counter is enough here
- * since the maximal number of available permits is [permits] which is [Int],
- * and the maximum number of waiting acquirers cannot be greater than 2^31 in any
- * real application.
+ * This counter indicates the number of available permits if it is positive,
+ * or the negated number of waiters on this semaphore otherwise.
+ * Note, that 32-bit counter is enough here since the maximal number of available
+ * permits is [permits] which is [Int], and the maximum number of waiting acquirers
+ * cannot be greater than 2^31 in any real application.
*/
private val _availablePermits = atomic(permits - acquiredPermits)
override val availablePermits: Int get() = max(_availablePermits.value, 0)
- private val onCancellationRelease = { _: Throwable -> release() }
-
override fun tryAcquire(): Boolean {
- _availablePermits.loop { p ->
+ while (true) {
+ // Get the current number of available permits.
+ val p = _availablePermits.value
+ // Is the number of available permits greater
+ // than the maximal one because of an incorrect
+ // `release()` call without a preceding `acquire()`?
+ // Change it to `permits` and start from the beginning.
+ if (p > permits) {
+ coerceAvailablePermitsAtMaximum()
+ continue
+ }
+ // Try to decrement the number of available
+ // permits if it is greater than zero.
if (p <= 0) return false
if (_availablePermits.compareAndSet(p, p - 1)) return true
}
}
override suspend fun acquire() {
- val p = _availablePermits.getAndDecrement()
- if (p > 0) return // permit acquired
- // While it looks better when the following function is inlined,
- // it is important to make `suspend` function invocations in a way
- // so that the tail-call optimization can be applied.
- acquireSlowPath()
- }
-
- private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable sc@ { cont ->
while (true) {
- if (addAcquireToQueue(cont)) return@sc
- val p = _availablePermits.getAndDecrement()
- if (p > 0) { // permit acquired
- cont.resume(Unit, onCancellationRelease)
- return@sc
- }
+ // Decrement the number of available permits.
+ val p = decPermits()
+ // Is the permit acquired?
+ if (p > 0) return
+ // Try to suspend otherwise.
+ // While it looks better when the following function is inlined,
+ // it is important to make `suspend` function invocations in a way
+ // so that the tail-call optimization can be applied here.
+ return acquireSlowPath()
}
}
- override fun release() {
+ private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable sc@{ cont ->
while (true) {
- val p = _availablePermits.getAndUpdate { cur ->
- check(cur < permits) { "The number of released permits cannot be greater than $permits" }
- cur + 1
+ // Try to suspend.
+ if (suspend(cont)) return@sc
+ // The suspension has been failed
+ // due to the synchronous resumption mode.
+ // Restart the whole `acquire`, and decrement
+ // the number of available permits at first.
+ val p = decPermits()
+ // Is the permit acquired?
+ if (p > 0) {
+ cont.resume(Unit)
+ return@sc
}
- if (p >= 0) return
- if (tryResumeNextFromQueue()) return
+ // Permit has not been acquired, go to
+ // the beginning of the loop and suspend.
}
}
/**
- * Returns `false` if the received permit cannot be used and the calling operation should restart.
+ * Decrements the number of available permits
+ * and ensures that it is not greater than [permits]
+ * at the point of decrement. The last may happen
+ * due to an incorrect `release()` call without
+ * a preceding `acquire()`.
*/
- private fun addAcquireToQueue(cont: CancellableContinuation): Boolean {
- val curTail = this.tail.value
- val enqIdx = enqIdx.getAndIncrement()
- val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail,
- createNewSegment = ::createSegment).segment // cannot be closed
- val i = (enqIdx % SEGMENT_SIZE).toInt()
- // the regular (fast) path -- if the cell is empty, try to install continuation
- if (segment.cas(i, null, cont)) { // installed continuation successfully
- cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler)
- return true
- }
- // On CAS failure -- the cell must be either PERMIT or BROKEN
- // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
- if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
- /// This continuation is not yet published, but still can be cancelled via outer job
- cont.resume(Unit, onCancellationRelease)
- return true
+ private fun decPermits(): Int {
+ while (true) {
+ // Decrement the number of available permits.
+ val p = _availablePermits.getAndDecrement()
+ // Is the number of available permits greater
+ // than the maximal one due to an incorrect
+ // `release()` call without a preceding `acquire()`?
+ if (p > permits) continue
+ // The number of permits is correct, return it.
+ return p
}
- assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
- return false // broken cell, need to retry on a different cell
}
- @Suppress("UNCHECKED_CAST")
- private fun tryResumeNextFromQueue(): Boolean {
- val curHead = this.head.value
- val deqIdx = deqIdx.getAndIncrement()
- val id = deqIdx / SEGMENT_SIZE
- val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead,
- createNewSegment = ::createSegment).segment // cannot be closed
- segment.cleanPrev()
- if (segment.id > id) return false
- val i = (deqIdx % SEGMENT_SIZE).toInt()
- val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state
- when {
- cellState === null -> {
- // Acquire has not touched this cell yet, wait until it comes for a bounded time
- // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue
- repeat(MAX_SPIN_CYCLES) {
- if (segment.get(i) === TAKEN) return true
- }
- // Try to break the slot in order not to wait
- return !segment.cas(i, PERMIT, BROKEN)
+ override fun release() {
+ while (true) {
+ // Increment the number of available permits.
+ val p = _availablePermits.getAndIncrement()
+ // Is this `release` call correct and does not
+ // exceed the maximal number of permits?
+ if (p >= permits) {
+ // Revert the number of available permits
+ // back to the correct one and fail with error.
+ coerceAvailablePermitsAtMaximum()
+ error("The number of released permits cannot be greater than $permits")
}
- cellState === CANCELLED -> return false // the acquire was already cancelled
- else -> return (cellState as CancellableContinuation).tryResumeAcquire()
+ // Is there a waiter that should be resumed?
+ if (p >= 0) return
+ // Try to resume the first waiter, and
+ // restart the operation if either this
+ // first waiter is cancelled or
+ // due to `SYNC` resumption mode.
+ if (resume(Unit)) return
}
}
- private fun CancellableContinuation.tryResumeAcquire(): Boolean {
- val token = tryResume(Unit, null, onCancellationRelease) ?: return false
- completeResume(token)
- return true
- }
-}
-
-private class CancelSemaphoreAcquisitionHandler(
- private val segment: SemaphoreSegment,
- private val index: Int
-) : CancelHandler() {
- override fun invoke(cause: Throwable?) {
- segment.cancel(index)
- }
-
- override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]"
-}
-
-private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0)
-
-private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment(id, prev, pointers) {
- val acquirers = atomicArrayOfNulls(SEGMENT_SIZE)
- override val maxSlots: Int get() = SEGMENT_SIZE
-
- @Suppress("NOTHING_TO_INLINE")
- inline fun get(index: Int): Any? = acquirers[index].value
-
- @Suppress("NOTHING_TO_INLINE")
- inline fun set(index: Int, value: Any?) {
- acquirers[index].value = value
+ override fun returnValue(value: Unit) {
+ // Return the permit if the current continuation
+ // is cancelled after the `tryResume` invocation
+ // because of the prompt cancellation.
+ // Note that this `release()` call can throw
+ // exception if there was a successful concurrent
+ // `release()` invoked without acquiring a permit.
+ release()
}
- @Suppress("NOTHING_TO_INLINE")
- inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value)
-
- @Suppress("NOTHING_TO_INLINE")
- inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value)
-
- // Cleans the acquirer slot located by the specified index
- // and removes this segment physically if all slots are cleaned.
- fun cancel(index: Int) {
- // Clean the slot
- set(index, CANCELLED)
- // Remove this segment if needed
- onSlotCleaned()
+ /**
+ * Changes the number of available permits to
+ * [permits] if it became greater due to an
+ * incorrect [release] call.
+ */
+ private fun coerceAvailablePermitsAtMaximum() {
+ while (true) {
+ val cur = _availablePermits.value
+ if (cur <= permits) break
+ if (_availablePermits.compareAndSet(cur, permits)) break
+ }
}
-
- override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]"
-}
-@SharedImmutable
-private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100)
-@SharedImmutable
-private val PERMIT = Symbol("PERMIT")
-@SharedImmutable
-private val TAKEN = Symbol("TAKEN")
-@SharedImmutable
-private val BROKEN = Symbol("BROKEN")
-@SharedImmutable
-private val CANCELLED = Symbol("CANCELLED")
-@SharedImmutable
-private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16)
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/TestBase.common.kt b/kotlinx-coroutines-core/common/test/TestBase.common.kt
index 71c45769cb..72cbe2faf2 100644
--- a/kotlinx-coroutines-core/common/test/TestBase.common.kt
+++ b/kotlinx-coroutines-core/common/test/TestBase.common.kt
@@ -100,3 +100,12 @@ class BadClass {
override fun hashCode(): Int = error("hashCode")
override fun toString(): String = error("toString")
}
+
+/**
+ * Tries to resume this continuation, returns `true` if succeeds and `false` otherwise.
+ */
+internal fun CancellableContinuation.tryResume0(value: T, onCancellation: ((cause: Throwable) -> Unit)?): Boolean {
+ val token = tryResume(value, null, onCancellation) ?: return false
+ completeResume(token)
+ return true
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/common/test/sync/ReadWriteMutexTest.kt b/kotlinx-coroutines-core/common/test/sync/ReadWriteMutexTest.kt
new file mode 100644
index 0000000000..010e2ef28b
--- /dev/null
+++ b/kotlinx-coroutines-core/common/test/sync/ReadWriteMutexTest.kt
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.sync
+
+import kotlinx.coroutines.*
+import kotlin.test.*
+
+class ReadWriteMutexTest : TestBase() {
+ @Test
+ fun simpleSingleCoroutineTest() = runTest {
+ val m = ReadWriteMutex()
+ m.readLock()
+ m.readLock()
+ m.readUnlock()
+ m.readUnlock()
+ m.write.lock()
+ m.write.unlock()
+ m.readLock()
+ }
+
+ @Test
+ fun multipleCoroutinesTest() = runTest {
+ val m = ReadWriteMutex()
+ m.readLock()
+ expect(1)
+ launch {
+ expect(2)
+ m.readLock()
+ expect(3)
+ }
+ yield()
+ expect(4)
+ launch {
+ expect(5)
+ m.write.lock()
+ expect(8)
+ }
+ yield()
+ expect(6)
+ m.readUnlock()
+ yield()
+ expect(7)
+ m.readUnlock()
+ yield()
+ finish(9)
+ }
+
+ @Test
+ fun acquireReadSucceedsAfterCancelledAcquireWrite() = runTest {
+ val m = ReadWriteMutex()
+ m.readLock()
+ val wJob = launch {
+ expect(1)
+ m.write.lock()
+ expectUnreached()
+ }
+ yield()
+ expect(2)
+ wJob.cancel()
+ launch {
+ expect(3)
+ m.readLock()
+ expect(4)
+ }
+ yield()
+ finish(5)
+ }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/ReadWriteMutexLincheckTests.kt b/kotlinx-coroutines-core/jvm/test/lincheck/ReadWriteMutexLincheckTests.kt
new file mode 100644
index 0000000000..742e1c30ae
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/ReadWriteMutexLincheckTests.kt
@@ -0,0 +1,209 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+@file:Suppress("unused")
+
+package kotlinx.coroutines.lincheck
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.sync.*
+import kotlinx.coroutines.sync.ReadWriteMutexImpl.WriteUnlockPolicy.*
+import org.jetbrains.kotlinx.lincheck.*
+import org.jetbrains.kotlinx.lincheck.annotations.*
+import org.jetbrains.kotlinx.lincheck.annotations.Operation
+import org.jetbrains.kotlinx.lincheck.paramgen.*
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+import org.jetbrains.kotlinx.lincheck.verifier.*
+
+class ReadWriteMutexLincheckTest : AbstractLincheckTest() {
+ private val m = ReadWriteMutexImpl()
+ private val readLockAcquired = IntArray(6)
+ private val writeLockAcquired = BooleanArray(6)
+
+ @Operation(allowExtraSuspension = true, promptCancellation = false)
+ suspend fun readLock(@Param(gen = ThreadIdGen::class) threadId: Int) {
+ m.readLock()
+ readLockAcquired[threadId]++
+ }
+
+ @Operation
+ fun readUnlock(@Param(gen = ThreadIdGen::class) threadId: Int): Boolean {
+ if (readLockAcquired[threadId] == 0) return false
+ m.readUnlock()
+ readLockAcquired[threadId]--
+ return true
+ }
+
+ @Operation(allowExtraSuspension = true, promptCancellation = false)
+ suspend fun writeLock(@Param(gen = ThreadIdGen::class) threadId: Int) {
+ m.writeLock()
+ assert(!writeLockAcquired[threadId]) {
+ "The mutex is not reentrant, this `writeLock()` invocation had to suspend"
+ }
+ writeLockAcquired[threadId] = true
+ }
+
+ @Operation
+ fun writeUnlock(@Param(gen = ThreadIdGen::class) threadId: Int, prioritizeWriters: Boolean): Boolean {
+ if (!writeLockAcquired[threadId]) return false
+ m.writeUnlock(if (prioritizeWriters) PRIORITIZE_WRITERS else PRIORITIZE_READERS)
+ writeLockAcquired[threadId] = false
+ return true
+ }
+
+ @StateRepresentation
+ fun stateRepresentation() = m.stateRepresentation
+
+ override fun > O.customize(isStressTest: Boolean) =
+ actorsBefore(0)
+ .actorsAfter(0)
+ .sequentialSpecification(ReadWriteMutexLincheckTestSequential::class.java)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+
+class ReadWriteMutexLincheckTestSequential : VerifierState() {
+ private val m = ReadWriteMutexSequential()
+ private val readLockAcquired = IntArray(6)
+ private val writeLockAcquired = BooleanArray(6)
+
+ fun tryReadLock(threadId: Int): Boolean =
+ m.tryReadLock().also { success ->
+ if (success) readLockAcquired[threadId]++
+ }
+
+ suspend fun readLock(threadId: Int) {
+ m.readLock()
+ readLockAcquired[threadId]++
+ }
+
+ fun readUnlock(threadId: Int): Boolean {
+ if (readLockAcquired[threadId] == 0) return false
+ m.readUnlock()
+ readLockAcquired[threadId]--
+ return true
+ }
+
+ fun tryWriteLock(threadId: Int): Boolean =
+ m.tryWriteLock().also { success ->
+ if (success) writeLockAcquired[threadId] = true
+ }
+
+ suspend fun writeLock(threadId: Int) {
+ m.writeLock()
+ writeLockAcquired[threadId] = true
+ }
+
+ fun writeUnlock(threadId: Int, prioritizeWriters: Boolean): Boolean {
+ if (!writeLockAcquired[threadId]) return false
+ m.writeUnlock(prioritizeWriters)
+ writeLockAcquired[threadId] = false
+ return true
+ }
+
+ override fun extractState() =
+ "mutex=${m.state},rlaPerThread=${readLockAcquired.contentToString()},wlaPerThread=${writeLockAcquired.contentToString()}"
+}
+
+internal class ReadWriteMutexSequential {
+ private var ar = 0
+ private var wla = false
+ private val wr = ArrayList>()
+ private val ww = ArrayList>()
+
+ fun tryReadLock(): Boolean {
+ if (wla || ww.isNotEmpty()) return false
+ ar++
+ return true
+ }
+
+ suspend fun readLock() {
+ if (wla || ww.isNotEmpty()) {
+ suspendCancellableCoroutine { cont ->
+ wr += cont
+ cont.invokeOnCancellation { wr -= cont }
+ }
+ } else {
+ ar++
+ }
+ }
+
+ fun readUnlock() {
+ ar--
+ if (ar == 0 && ww.isNotEmpty()) {
+ wla = true
+ val w = ww.removeAt(0)
+ w.resume(Unit) { writeUnlock(true) }
+ }
+ }
+
+ fun tryWriteLock(): Boolean {
+ if (wla || ar > 0) return false
+ wla = true
+ return true
+ }
+
+ suspend fun writeLock() {
+ if (wla || ar > 0) {
+ suspendCancellableCoroutine { cont ->
+ ww += cont
+ cont.invokeOnCancellation {
+ ww -= cont
+ if (!wla && ww.isEmpty()) {
+ ar += wr.size
+ wr.forEach { it.resume(Unit) { readUnlock() } }
+ wr.clear()
+ }
+ }
+ }
+ } else {
+ wla = true
+ }
+ }
+
+ fun writeUnlock(prioritizeWriters: Boolean) {
+ if (ww.isNotEmpty() && prioritizeWriters) {
+ val w = ww.removeAt(0)
+ w.resume(Unit) { writeUnlock(prioritizeWriters) }
+ } else {
+ wla = false
+ ar = wr.size
+ wr.forEach { it.resume(Unit) { readUnlock() } }
+ wr.clear()
+ }
+ }
+
+ val state get() = "ar=$ar,wla=$wla,wr=${wr.size},ww=${ww.size}"
+}
+
+// This is an additional test to check the [ReadWriteMutex] synchronization contract.
+internal class ReadWriteMutexCounterLincheckTest : AbstractLincheckTest() {
+ private val m = ReadWriteMutexImpl()
+ private var c = 0
+
+ @Operation(allowExtraSuspension = true, promptCancellation = false)
+ suspend fun inc(): Int = m.write { c++ }
+
+ @Operation(allowExtraSuspension = true, promptCancellation = false)
+ suspend fun get(): Int = m.read { c }
+
+ @StateRepresentation
+ fun stateRepresentation(): String = "$c + ${m.stateRepresentation}"
+
+ override fun > O.customize(isStressTest: Boolean): O =
+ actorsBefore(0)
+ .actorsAfter(0)
+ .sequentialSpecification(ReadWriteMutexCounterSequential::class.java)
+}
+
+@Suppress("RedundantSuspendModifier")
+class ReadWriteMutexCounterSequential : VerifierState() {
+ private var c = 0
+
+ fun incViaTryLock() = c++
+ suspend fun inc() = c++
+ suspend fun get() = c
+
+ override fun extractState() = c
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueSynchronizerLincheckTests.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueSynchronizerLincheckTests.kt
new file mode 100644
index 0000000000..7d08adec0d
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/SegmentQueueSynchronizerLincheckTests.kt
@@ -0,0 +1,927 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+@file:Suppress("unused", "MemberVisibilityCanBePrivate")
+package kotlinx.coroutines.lincheck
+
+import kotlinx.atomicfu.*
+import kotlinx.coroutines.*
+import kotlinx.coroutines.internal.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.*
+import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.*
+import kotlinx.coroutines.sync.Semaphore
+import org.jetbrains.kotlinx.lincheck.*
+import org.jetbrains.kotlinx.lincheck.annotations.*
+import org.jetbrains.kotlinx.lincheck.annotations.Operation
+import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+import org.jetbrains.kotlinx.lincheck.verifier.*
+import java.util.concurrent.*
+import kotlin.collections.ArrayList
+import kotlin.coroutines.*
+import kotlin.reflect.*
+
+// This test suit serves two purposes. First of all, it tests the `SegmentQueueSynchronizer`
+// implementation under different use-cases and workloads. On the other side, this test suite
+// provides different well-known synchronization and communication primitive implementations
+// via `SegmentQueueSynchronizer`, which can be considered as an API richness check as well as
+// a collection of examples on how to use `SegmentQueueSynchronizer` to build new primitives.
+
+// ##############
+// # SEMAPHORES #
+// ##############
+
+/**
+ * This [Semaphore] implementation is similar to the one in the library,
+ * but uses the [asynchronous][ASYNC] resumption mode. However, it is non-trivial
+ * to make [tryAcquire] linearizable in this case, so it is not supported here.
+ */
+internal class AsyncSemaphore(permits: Int) : SegmentQueueSynchronizer(), Semaphore {
+ override val resumeMode get() = ASYNC
+
+ private val _availablePermits = atomic(permits)
+ override val availablePermits get() = _availablePermits.value.coerceAtLeast(0)
+
+ override fun tryAcquire() = error("Not implemented") // Not supported in the ASYNC version
+
+ override suspend fun acquire() {
+ // Decrement the number of available permits.
+ val p = _availablePermits.getAndDecrement()
+ // Is the permit successfully acquired?
+ if (p > 0) return
+ // Suspend otherwise.
+ suspendCancellableCoroutine { cont ->
+ check(suspend(cont)) { "Should not fail in ASYNC mode" }
+ }
+ }
+
+ override fun release() {
+ while (true) {
+ // Increment the number of available permits.
+ val p = _availablePermits.getAndIncrement()
+ // Is there a waiter that should be resumed?
+ if (p >= 0) return
+ // Try to resume the first waiter, and
+ // restart the operation if it is cancelled.
+ if (resume(Unit)) return
+ }
+ }
+
+ // For prompt cancellation.
+ override fun returnValue(value: Unit) = release()
+}
+
+/**
+ * This semaphore implementation is correct only if [release] is always
+ * invoked after a successful [acquire]; in other words, when semaphore
+ * is used properly, without unexpected [release] invocations. The main
+ * advantage is using smart cancellation, so [release] always works
+ * in constant time under no contention, and the cancelled [acquire]
+ * requests do not play any role. It is worth noting, that it is possible
+ * to make this implementation correct under the prompt cancellation model
+ * even with unexpected [release]-s.
+ */
+internal class AsyncSemaphoreSmart(permits: Int) : SegmentQueueSynchronizer(), Semaphore {
+ override val resumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ private val _availablePermits = atomic(permits)
+ override val availablePermits get() = _availablePermits.value.coerceAtLeast(0)
+
+ override fun tryAcquire() = error("Not implemented") // Not supported in the ASYNC version.
+
+ override suspend fun acquire() {
+ // Decrement the number of available permits.
+ val p = _availablePermits.getAndDecrement()
+ // Is the permit acquired?
+ if (p > 0) return
+ // Suspend otherwise.
+ suspendCancellableCoroutine { cont ->
+ check(suspend(cont)) { "Should not fail in ASYNC mode" }
+ }
+ }
+
+ override fun release() {
+ // Increment the number of available permits.
+ val p = _availablePermits.getAndIncrement()
+ // Is there a waiter that should be resumed?
+ if (p >= 0) return
+ // Resume the first waiter. Due to the smart
+ // cancellation mode it is possible that this
+ // permit will be refused, so this release
+ // invocation can take effect with a small lag
+ // and with an extra suspension, but it is guaranteed
+ // that the permit will be refused eventually.
+ resume(Unit)
+ }
+
+ override fun onCancellation(): Boolean {
+ // Increment the number of available permits.
+ val p = _availablePermits.getAndIncrement()
+ // Return `true` if there is no `release` that
+ // is going to resume this cancelling `acquire()`,
+ // or `false` if there is one, and this permit
+ // should be refused.
+ return p < 0
+ }
+
+ // For prompt cancellation.
+ override fun returnValue(value: Unit) = release()
+}
+
+/**
+ * This implementation is similar to the previous one, but with [synchronous][SYNC]
+ * resumption mode, so it is possible to implement [tryAcquire] correctly.
+ * The only notable difference happens when a permit to be released is refused,
+ * and the following [resume] attempt in the cancellation handler fails due to
+ * the synchronization on resumption, so the permit is going to be returned
+ * back to the semaphore in [returnValue] function. It is worth noting, that it
+ * is possible to make this implementation correct with prompt cancellation.
+ */
+internal class SyncSemaphoreSmart(permits: Int) : SegmentQueueSynchronizer(), Semaphore {
+ override val resumeMode get() = SYNC
+ override val cancellationMode get() = SMART
+
+ private val _availablePermits = atomic(permits)
+ override val availablePermits get() = _availablePermits.value.coerceAtLeast(0)
+
+ override suspend fun acquire() {
+ while (true) {
+ // Decrement the number of available permits.
+ val p = _availablePermits.getAndDecrement()
+ // Is the permit acquired?
+ if (p > 0) return
+ // Try to suspend otherwise.
+ val acquired = suspendCancellableCoroutine { cont ->
+ if (!suspend(cont)) cont.resume(false)
+ }
+ if (acquired) return
+ }
+ }
+
+ override fun tryAcquire(): Boolean = _availablePermits.loop { cur ->
+ // Try to decrement the number of available
+ // permits if it is greater than zero.
+ if (cur <= 0) return false
+ if (_availablePermits.compareAndSet(cur, cur -1)) return true
+ }
+
+ override fun release() {
+ while (true) {
+ // Increment the number of available permits.
+ val p = _availablePermits.getAndIncrement()
+ // Is there a waiter that should be resumed?
+ if (p >= 0) return
+ // Try to resume the first waiter, can fail
+ // according to the SYNC mode contract.
+ if (resume(true)) return
+ }
+ }
+
+ override fun onCancellation(): Boolean {
+ // Increment the number of available permits.
+ val p = _availablePermits.getAndIncrement()
+ // Return `true` if there is no `release` which
+ // is going to resume us and cannot skip us and
+ // resume the next waiter.
+ return p < 0
+ }
+
+ override fun returnValue(value: Boolean) {
+ // Simply release the permit.
+ release()
+ }
+}
+
+class SemaphoreUnboundedSequential1 : SemaphoreSequential(1, false)
+class SemaphoreUnboundedSequential2 : SemaphoreSequential(2, false)
+
+// Comparing to `SemaphoreLincheckTestBase`, it does not support `tryAcquire()`.
+abstract class AsyncSemaphoreLincheckTestBase(
+ semaphore: Semaphore,
+ private val seqSpec: KClass<*>
+) : AbstractLincheckTest() {
+ private val s = semaphore
+
+ @Operation(allowExtraSuspension = true, promptCancellation = false)
+ suspend fun acquire() = s.acquire()
+
+ @Operation(handleExceptionsAsResult = [IllegalStateException::class])
+ fun release() = s.release()
+
+ override fun > O.customize(isStressTest: Boolean): O =
+ actorsBefore(0)
+ .sequentialSpecification(seqSpec.java)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+
+class AsyncSemaphore1LincheckTest : AsyncSemaphoreLincheckTestBase(AsyncSemaphore(1), SemaphoreUnboundedSequential1::class)
+class AsyncSemaphore2LincheckTest : AsyncSemaphoreLincheckTestBase(AsyncSemaphore(2), SemaphoreUnboundedSequential2::class)
+
+class AsyncSemaphoreSmart1LincheckTest : AsyncSemaphoreLincheckTestBase(AsyncSemaphoreSmart(1), SemaphoreUnboundedSequential1::class)
+class AsyncSemaphoreSmart2LincheckTest : AsyncSemaphoreLincheckTestBase(AsyncSemaphoreSmart(2), SemaphoreUnboundedSequential2::class)
+
+class SyncSemaphoreSmart1LincheckTest : SemaphoreLincheckTestBase(SyncSemaphoreSmart(1), SemaphoreUnboundedSequential1::class) {
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) = checkObstructionFreedom(false)
+}
+class SyncSemaphoreSmart2LincheckTest : SemaphoreLincheckTestBase(SyncSemaphoreSmart(2), SemaphoreUnboundedSequential2::class) {
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) = checkObstructionFreedom(false)
+}
+
+
+// ####################################
+// # COUNT-DOWN-LATCH SYNCHRONIZATION #
+// ####################################
+
+/**
+ * This primitive allows to wait until several operation are completed.
+ * It is initialized with a given count, and each [countDown] invocation
+ * decrements the count of remaining operations to be completed. At the
+ * same time, [await] suspends until this count reaches zero.
+ *
+ * This implementation uses simple cancellation, so the [countDown] invocation
+ * that reaches the counter zero works in a linear of the number of [await]
+ * invocations, including the ones that are already cancelled.
+ */
+internal open class CountDownLatch(count: Int) : SegmentQueueSynchronizer() {
+ override val resumeMode get() = ASYNC
+
+ private val count = atomic(count)
+ // The number of suspended `await` invocations.
+ // `DONE_MARK` should be set when the count reaches zero,
+ // so the following suspension attempts detect this change by
+ // checking the mark and complete immediately in this case.
+ private val waiters = atomic(0)
+
+ protected fun decWaiters() = waiters.decrementAndGet()
+
+ /**
+ * Decrements the count and resumes waiting
+ * [await] invocations if it reaches zero.
+ */
+ fun countDown() {
+ // Decrement the count.
+ val r = count.decrementAndGet()
+ // Should the waiters be resumed?
+ if (r <= 0) resumeWaiters()
+ }
+
+ private fun resumeWaiters() {
+ val w = waiters.getAndUpdate { cur ->
+ // Is the done mark set?
+ if (cur and DONE_MARK != 0) return
+ cur or DONE_MARK
+ }
+ // This thread has successfully set
+ // the mark, resume the waiters.
+ repeat(w) { resume(Unit) }
+ }
+
+ /**
+ * Waits until the count reaches zero,
+ * completes immediately if it is already zero.
+ */
+ suspend fun await() {
+ // Check whether the count has already reached zero;
+ // this check can be considered as an optimization.
+ if (remaining() == 0) return
+ // Increment the number of waiters and check
+ // that `DONE_MARK` is not set, finish otherwise.
+ val w = waiters.incrementAndGet()
+ if (w and DONE_MARK != 0) return
+ // The number of waiters is
+ // successfully incremented, suspend.
+ suspendCancellableCoroutine { suspend(it) }
+ }
+
+ /**
+ * Returns the current count.
+ */
+ fun remaining(): Int = count.value.coerceAtLeast(0)
+
+ protected companion object {
+ const val DONE_MARK = 1 shl 31
+ }
+}
+
+/**
+ * This implementation uses the smart cancellation mode, so the [countDown]
+ * invocation that reaches the counter zero works in linear of the number
+ * of non-cancelled [await] invocations. This way, it does not matter
+ * how many [await] requests has been cancelled - they do not play any role.
+ */
+internal class CountDownLatchSmart(count: Int) : CountDownLatch(count) {
+ override val resumeMode: ResumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ override fun onCancellation(): Boolean {
+ // Decrement the number of waiters.
+ val w = decWaiters()
+ // Succeed if the `DONE_MARK` is not set yet.
+ return (w and DONE_MARK) == 0
+ }
+}
+
+internal abstract class CountDownLatchLincheckTestBase(
+ private val cdl: CountDownLatch,
+ private val seqSpec: KClass<*>
+) : AbstractLincheckTest() {
+ @Operation
+ fun countDown() = cdl.countDown()
+
+ @Operation
+ fun remaining() = cdl.remaining()
+
+ @Operation(promptCancellation = false)
+ suspend fun await() = cdl.await()
+
+ override fun > O.customize(isStressTest: Boolean): O =
+ actorsBefore(0)
+ .actorsAfter(0)
+ .sequentialSpecification(seqSpec.java)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+
+class CountDownLatchSequential1 : CountDownLatchSequential(1)
+class CountDownLatchSequential2 : CountDownLatchSequential(2)
+
+internal class CountDownLatch1LincheckTest : CountDownLatchLincheckTestBase(CountDownLatch(1), CountDownLatchSequential1::class)
+internal class CountDownLatch2LincheckTest : CountDownLatchLincheckTestBase(CountDownLatch(2), CountDownLatchSequential2::class)
+
+internal class CountDownLatchSmart1LincheckTest : CountDownLatchLincheckTestBase(CountDownLatchSmart(1), CountDownLatchSequential1::class)
+internal class CountDownLatchSmart2LincheckTest : CountDownLatchLincheckTestBase(CountDownLatchSmart(2), CountDownLatchSequential2::class)
+
+open class CountDownLatchSequential(initialCount: Int) : VerifierState() {
+ private var count = initialCount
+ private val waiters = ArrayList>()
+
+ fun countDown() {
+ if (--count == 0) {
+ waiters.forEach { it.tryResume0(Unit, {}) }
+ waiters.clear()
+ }
+ }
+
+ suspend fun await() {
+ if (count <= 0) return
+ suspendCancellableCoroutine { cont ->
+ waiters.add(cont)
+ }
+ }
+
+ fun remaining(): Int = count.coerceAtLeast(0)
+
+ override fun extractState() = remaining()
+}
+
+
+// ###########################
+// # BARRIER SYNCHRONIZATION #
+// ###########################
+
+/**
+ * This synchronization primitive allows a set of coroutines to
+ * all wait for each other to reach a common barrier point.
+ *
+ * The implementation is straightforward: it maintains a counter
+ * of arrived coroutines and increments it in the beginning of
+ * [arrive] operation. The last coroutine should resume all the
+ * previously arrived ones.
+ *
+ * In case of cancellation, the handler decrements the counter if
+ * not all the parties are arrived. However, it is impossible to
+ * make cancellation atomic (e.g., Java's implementation simply
+ * does not work in case of thread interruption) since there is
+ * no way to resume a set of coroutines atomically. However,
+ * this implementation is correct with prompt cancellation.
+ */
+internal class Barrier(private val parties: Int) : SegmentQueueSynchronizer() {
+ override val resumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ // The number of coroutines arrived to this barrier point.
+ private val arrived = atomic(0L)
+
+ /**
+ * Waits for other parties and returns `true`.
+ * Fails if this invocation exceeds the number
+ * of parties, returns `false` in this case.
+ *
+ * It is also possible to make this barrier
+ * implementation cyclic after introducing
+ * `resume(count, value)` operation on the
+ * `SegmentQueueSynchronizer`, which resumes
+ * the specified number of coroutines and
+ * with the same value atomically.
+ */
+ suspend fun arrive(): Boolean {
+ // Are all parties has already arrived?
+ if (arrived.value > parties)
+ return false // fail this `arrive()`.
+ // Increment the number of arrived parties.
+ val a = arrived.incrementAndGet()
+ return when {
+ // Should we suspend?
+ a < parties -> {
+ suspendCoroutine { cont -> suspend(cont) }
+ true
+ }
+ // Are we the last party?
+ a == parties.toLong() -> {
+ // Resume all waiters.
+ repeat(parties - 1) {
+ resume(Unit)
+ }
+ true
+ }
+ // Should we fail?
+ else -> false
+ }
+ }
+
+ override fun onCancellation(): Boolean {
+ // Decrement the number of arrived parties if possible.
+ arrived.loop { cur ->
+ // Are we going to be resumed?
+ // The resumption permit should be refused in this case.
+ if (cur == parties.toLong()) return false
+ // Successful cancellation, return `true`.
+ if (arrived.compareAndSet(cur, cur - 1)) return true
+ }
+ }
+}
+
+abstract class BarrierLincheckTestBase(parties: Int, val seqSpec: KClass<*>) : AbstractLincheckTest() {
+ private val b = Barrier(parties)
+
+ @Operation(promptCancellation = false)
+ suspend fun arrive() = b.arrive()
+
+ override fun > O.customize(isStressTest: Boolean) =
+ actorsBefore(0)
+ .actorsAfter(0)
+ .threads(3)
+ .sequentialSpecification(seqSpec.java)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+
+class BarrierSequential1 : BarrierSequential(1)
+class Barrier1LincheckTest : BarrierLincheckTestBase(1, BarrierSequential1::class)
+class BarrierSequential2 : BarrierSequential(2)
+class Barrier2LincheckTest : BarrierLincheckTestBase(2, BarrierSequential2::class)
+class BarrierSequential3 : BarrierSequential(3)
+class Barrier3LincheckTest : BarrierLincheckTestBase(3, BarrierSequential3::class)
+
+open class BarrierSequential(parties: Int) : VerifierState() {
+ private var remaining = parties
+ private val waiters = ArrayList>()
+
+ suspend fun arrive(): Boolean {
+ val r = --remaining
+ return when {
+ r > 0 -> {
+ suspendCancellableCoroutine { cont ->
+ waiters.add(cont)
+ cont.invokeOnCancellation {
+ remaining++
+ waiters.remove(cont)
+ }
+ }
+ true
+ }
+ r == 0 -> {
+ waiters.forEach { it.resume(Unit) }
+ true
+ }
+ else -> false
+ }
+ }
+
+ override fun extractState() = remaining > 0
+}
+
+
+// ##################
+// # BLOCKING POOLS #
+// ##################
+
+/**
+ * While using resources such as database connections, sockets, etc.,
+ * it is common to reuse them; that requires a fast and handy mechanism.
+ * This [BlockingPool] abstraction maintains a set of elements that can be put
+ * into the pool for further reuse or be retrieved to process the current operation.
+ * When [retrieve] comes to an empty pool, it blocks, and the following [put] operation
+ * resumes it; all the waiting requests are processed in the first-in-first-out (FIFO) order.
+ *
+ * In our tests we consider two pool implementations: the [queue-based][BlockingQueuePool]
+ * and the [stack-based][BlockingStackPool]. Intuitively, the queue-based implementation is
+ * faster since it is built on arrays and uses `Fetch-And-Add`-s on the contended path,
+ * while the stack-based pool retrieves the last inserted, thus, the "hottest", element.
+ *
+ * Please note that both these implementations are not linearizable and can retrieve elements
+ * out-of-order under some races. However, since pools by themselves do not guarantee
+ * that the stored elements are ordered (the one may consider them as bags),
+ * these queue- and stack-based versions should be considered as pools with specific heuristics.
+ */
+interface BlockingPool {
+ /**
+ * Either resumes the first waiting [retrieve] operation
+ * and passes the [element] to it, or simply puts the
+ * [element] into the pool.
+ */
+ fun put(element: T)
+
+ /**
+ * Retrieves one of the elements from the pool
+ * (the order is not specified), or suspends if it is
+ * empty -- the following [put] operations resume
+ * waiting [retrieve]-s in the first-in-first-out order.
+ */
+ suspend fun retrieve(): T
+}
+
+/**
+ * This pool uses queue under the hood and is implemented with simple cancellation.
+ */
+internal class BlockingQueuePool : SegmentQueueSynchronizer(), BlockingPool {
+ override val resumeMode get() = ASYNC
+
+ // > 0 -- number of elements;
+ // = 0 -- empty pool;
+ // < 0 -- number of waiters.
+ private val availableElements = atomic(0L)
+
+ // This is an infinite array by design, a plain array is used for simplicity.
+ private val elements = atomicArrayOfNulls(100)
+
+ // Indices in `elements` for the next `tryInsert()` and `tryRetrieve()` operations.
+ // Each `tryInsert()`/`tryRetrieve()` pair works on a separate slot. When `tryRetrieve()`
+ // comes earlier, it marks the slot as `BROKEN` so both this operation and the
+ // corresponding `tryInsert()` fail.
+ private val insertIdx = atomic(0)
+ private val retrieveIdx = atomic(0)
+
+ override fun put(element: T) {
+ while (true) {
+ // Increment the number of elements in advance.
+ val b = availableElements.getAndIncrement()
+ // Is there a waiting `retrieve`?
+ if (b < 0) {
+ // Try to resume the first waiter,
+ // can fail if it is already cancelled.
+ if (resume(element)) return
+ } else {
+ // Try to insert the element into the
+ // queue, can fail if the slot is broken.
+ if (tryInsert(element)) return
+ }
+ }
+ }
+
+ /**
+ * Tries to insert the [element] into the next
+ * [elements] array slot. Returns `true` if
+ * succeeds, or `false` if the slot is [broken][BROKEN].
+ */
+ private fun tryInsert(element: T): Boolean {
+ val i = insertIdx.getAndIncrement()
+ return elements[i].compareAndSet(null, element)
+ }
+
+ override suspend fun retrieve(): T {
+ while (true) {
+ // Decrements the number of elements.
+ val b = availableElements.getAndDecrement()
+ // Is there an element in the pool?
+ if (b > 0) {
+ // Try to retrieve the first element,
+ // can fail if the first slot
+ // is empty due to a race.
+ val x = tryRetrieve()
+ if (x != null) return x
+ } else {
+ // The pool is empty, suspend.
+ return suspendCancellableCoroutine { cont ->
+ suspend(cont)
+ }
+ }
+ }
+ }
+
+ /**
+ * Tries to retrieve the first element from
+ * the [elements] array. Returns the element if
+ * succeeds, or `null` if the first slot is empty
+ * due to a race -- it marks the slot as [broken][BROKEN]
+ * in this case, so the corresponding [tryInsert]
+ * invocation fails.
+ */
+ @Suppress("UNCHECKED_CAST")
+ private fun tryRetrieve(): T? {
+ val i = retrieveIdx.getAndIncrement()
+ return elements[i].getAndSet(BROKEN) as T?
+ }
+
+ fun stateRepresentation(): String {
+ val elementsBetweenIndices = mutableListOf()
+ val first = kotlin.math.min(retrieveIdx.value, insertIdx.value)
+ val last = kotlin.math.max(retrieveIdx.value, insertIdx.value)
+ for (i in first until last) {
+ elementsBetweenIndices.add(elements[i].value)
+ }
+ return "availableElements=${availableElements.value}," +
+ "insertIdx=${insertIdx.value}," +
+ "retrieveIdx=${retrieveIdx.value}," +
+ "elements=$elementsBetweenIndices," +
+ "sqs=<${super.toString()}>"
+ }
+
+ companion object {
+ @JvmStatic
+ val BROKEN = Symbol("BROKEN")
+ }
+}
+
+/**
+ * This pool uses stack under the hood and shows how to use
+ * smart cancellation for data structures that store resources.
+ */
+internal class BlockingStackPool : SegmentQueueSynchronizer(), BlockingPool {
+ override val resumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ // The stack is implemented via a concurrent linked list,
+ // this is its head; `null` means that the stack is empty.
+ private val head = atomic?>(null)
+
+ // > 0 -- number of elements;
+ // = 0 -- empty pool;
+ // < 0 -- number of waiters.
+ private val availableElements = atomic(0)
+
+ override fun put(element: T) {
+ while (true) {
+ // Increment the number of elements in advance.
+ val b = availableElements.getAndIncrement()
+ // Is there a waiting retrieve?
+ if (b < 0) {
+ // Resume the first waiter, never fails
+ // in the smart cancellation mode.
+ resume(element)
+ return
+ } else {
+ // Try to insert the element into the
+ // stack, can fail if a concurrent [tryRetrieve]
+ // came earlier and marked it with a failure node.
+ if (tryInsert(element)) return
+ }
+ }
+ }
+
+ /**
+ * Tries to insert the [element] into the stack.
+ * Returns `true` on success`, or `false` if the
+ * stack is marked with a failure node, retrieving
+ * it in this case.
+ */
+ private fun tryInsert(element: T): Boolean = head.loop { h ->
+ // Is the stack marked with a failure node?
+ if (h != null && h.element == null) {
+ // Try to retrieve the failure node.
+ if (head.compareAndSet(h, h.next)) return false
+ } else {
+ // Try to insert the element.
+ val newHead = StackNode(element, h)
+ if (head.compareAndSet(h, newHead)) return true
+ }
+ }
+
+ override suspend fun retrieve(): T {
+ while (true) {
+ // Decrement the number of elements.
+ val b = availableElements.getAndDecrement()
+ // Is there an element in the pool?
+ if (b > 0) {
+ // Try to retrieve the top element,
+ // can fail if the stack if empty
+ // due to a race.
+ val x = tryRetrieve()
+ if (x != null) return x
+ } else {
+ // The pool is empty, suspend.
+ return suspendCancellableCoroutine { cont ->
+ suspend(cont)
+ }
+ }
+ }
+ }
+
+ /**
+ * Tries to retrieve the top (last) element and return `true`
+ * if the stack is not empty; returns `false` and
+ * inserts a failure node otherwise.
+ */
+ @Suppress("NullChecksToSafeCall")
+ private fun tryRetrieve(): T? = head.loop { h ->
+ // Is the queue empty or full of failure nodes?
+ if (h == null || h.element == null) {
+ // Try to add one more failure node and fail.
+ val failNode = StackNode(null, h)
+ if (head.compareAndSet(h, failNode)) return null
+ } else {
+ // Try to retrieve the top element.
+ if (head.compareAndSet(h, h.next)) return h.element
+ }
+ }
+
+ // The logic of cancellation is very similar to the one
+ // in semaphore, with the only difference that elements
+ // should be physically returned to the pool.
+ override fun onCancellation(): Boolean {
+ val b = availableElements.getAndIncrement()
+ return b < 0
+ }
+
+ // If an element is refused, it should be inserted back to the stack.
+ override fun tryReturnRefusedValue(value: T) = tryInsert(value)
+
+ // In order to return the value back
+ // to the pool, [put] is naturally used.
+ override fun returnValue(value: T) = put(value)
+
+ internal fun stateRepresentation(): String {
+ val elements = ArrayList()
+ var curNode = head.value
+ while (curNode != null) {
+ elements += curNode.element
+ curNode = curNode.next
+ }
+ return "availableElements=${availableElements.value},elements=$elements,sqs=<${super.toString()}>"
+ }
+
+ class StackNode(val element: T?, val next: StackNode?)
+}
+
+abstract class BlockingPoolLincheckTestBase(val p: BlockingPool) : AbstractLincheckTest() {
+ @Operation
+ fun put() = p.put(Unit)
+
+ @Suppress("NullChecksToSafeCall")
+ @Operation(allowExtraSuspension = true, promptCancellation = false)
+ suspend fun retrieve() = p.retrieve()
+
+ @StateRepresentation
+ fun stateRepresentation() = when(p) {
+ is BlockingStackPool<*> -> p.stateRepresentation()
+ is BlockingQueuePool<*> -> p.stateRepresentation()
+ else -> error("Unknown pool type: ${p::class.simpleName}")
+ }
+
+ override fun > O.customize(isStressTest: Boolean) =
+ sequentialSpecification(BlockingPoolUnitSequential::class.java)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+class BlockingQueuePoolLincheckTest : BlockingPoolLincheckTestBase(BlockingQueuePool())
+class BlockingStackPoolLincheckTest : BlockingPoolLincheckTestBase(BlockingStackPool())
+
+class BlockingPoolUnitSequential : VerifierState() {
+ private var elements = 0
+ private val waiters = ArrayList>()
+
+ fun put() {
+ while (true) {
+ if (waiters.isNotEmpty()) {
+ val w = waiters.removeAt(0)
+ if (w.tryResume0(Unit, { put() })) return
+ } else {
+ elements ++
+ return
+ }
+ }
+ }
+
+ suspend fun retrieve() {
+ if (elements > 0) {
+ elements--
+ } else {
+ suspendCancellableCoroutine { cont ->
+ waiters.add(cont)
+ }
+ }
+ }
+
+ override fun extractState() = elements
+}
+
+
+// ##################
+// # BLOCKING QUEUE #
+// ##################
+
+/**
+ * This algorithm maintains a separate queue for elements so blocked [receive] operations
+ * retrieve elements _after_ resumption from this queue. Similar to other data structures,
+ * an atomic [size] counter is maintained to decide whether the queue is empty for [receive]
+ * and whether there is a waiting receiver for [send].
+ *
+ * Comparing to the blocking pools above, this implementation is fully linearizable but requires
+ * the waiters to retrieve elements from separate storage after the resumption. However, we find
+ * this implementation more flexible since any container (not only the FIFO queue) can be used
+ * under the hood. It is also safe in case of prompt cancellation since the elements cannot be lost.
+ */
+internal class BlockingQueueSmart : SegmentQueueSynchronizer() {
+ private val size = atomic(0) // #send - #receive
+ private val elements = ConcurrentLinkedQueue() // any queue can be here, even not FIFO.
+
+ override val resumeMode get() = ASYNC
+ override val cancellationMode get() = SMART
+
+ /**
+ * Puts the specified [element] into the queue
+ * or transfers it to the first waiting receiver.
+ */
+ fun send(element: E) {
+ elements.add(element) // store the element into queue at first.
+ val s = size.getAndIncrement() // increment the number of elements.
+ if (s < 0) { // is there a waiting receiver?
+ resume(Unit) // resume the first waiter.
+ }
+ }
+
+ suspend fun receive(): E {
+ val s = size.getAndDecrement() // decrement the number of available elements.
+ if (s <= 0) { // should this `receive()` suspend?
+ suspendCancellableCoroutine { suspend(it) }
+ }
+ return elements.remove() // retrieve the first element.
+ }
+
+ override fun onCancellation(): Boolean {
+ val s = size.getAndIncrement() // decrement the number of waiting `receive()`-s.
+ return s < 0 // succeed if this `receive()` is not already resumed.
+ }
+
+ // For prompt cancellation; no need to return elements since
+ // they are not transferred via SQS but stored separately.
+ override fun returnValue(value: Unit) {
+ val b = size.getAndIncrement()
+ if (b < 0) resume(Unit)
+ }
+}
+
+class BlockingQueueSmartLincheckTest : AbstractLincheckTest() {
+ private val c = BlockingQueueSmart()
+
+ @Operation
+ fun send(element: Int) = c.send(element)
+
+ @Operation(allowExtraSuspension=true, promptCancellation = false)
+ suspend fun receive() = c.receive()
+
+ override fun > O.customize(isStressTest: Boolean): O =
+ logLevel(LoggingLevel.INFO)
+ .sequentialSpecification(BlockingQueueSequential::class.java)
+
+ override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
+ checkObstructionFreedom()
+}
+
+class BlockingQueueSequential : VerifierState() {
+ private val receivers = ArrayList>()
+ private val elements = ArrayList()
+
+ fun send(element: Int) {
+ if (receivers.isNotEmpty()) {
+ val r = receivers.removeAt(0)
+ elements += element
+ r.resume(Unit)
+ } else {
+ elements += element
+ }
+ }
+
+ suspend fun receive(): Int =
+ if (elements.isNotEmpty()) {
+ retrieveFirstElement()
+ } else {
+ suspendCancellableCoroutine { cont ->
+ receivers += cont
+ cont.invokeOnCancellation { receivers.remove(cont) }
+ }
+ retrieveFirstElement()
+ }
+
+ private fun retrieveFirstElement(): Int {
+ return elements.removeAt(0)
+ }
+
+ override fun extractState() = elements
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
index 2b471d7f26..a93204a66c 100644
--- a/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/lincheck/SemaphoreLincheckTest.kt
@@ -2,6 +2,7 @@
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:Suppress("unused")
+
package kotlinx.coroutines.lincheck
import kotlinx.coroutines.*
@@ -9,27 +10,68 @@ import kotlinx.coroutines.sync.*
import org.jetbrains.kotlinx.lincheck.*
import org.jetbrains.kotlinx.lincheck.annotations.Operation
import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.*
+import org.jetbrains.kotlinx.lincheck.verifier.*
+import kotlin.reflect.*
-abstract class SemaphoreLincheckTestBase(permits: Int) : AbstractLincheckTest() {
- private val semaphore = Semaphore(permits)
-
+abstract class SemaphoreLincheckTestBase(
+ private val semaphore: Semaphore,
+ private val seqSpec: KClass<*>
+) : AbstractLincheckTest() {
@Operation
- fun tryAcquire() = semaphore.tryAcquire()
+ fun tryAcquire() = this.semaphore.tryAcquire()
- @Operation(promptCancellation = true, allowExtraSuspension = true)
- suspend fun acquire() = semaphore.acquire()
+ @Operation(promptCancellation = false, allowExtraSuspension = true)
+ suspend fun acquire() = this.semaphore.acquire()
@Operation(handleExceptionsAsResult = [IllegalStateException::class])
- fun release() = semaphore.release()
+ fun release() = this.semaphore.release()
override fun > O.customize(isStressTest: Boolean): O =
actorsBefore(0)
-
- override fun extractState() = semaphore.availablePermits
+ .sequentialSpecification(seqSpec.java)
override fun ModelCheckingOptions.customize(isStressTest: Boolean) =
checkObstructionFreedom()
}
-class Semaphore1LincheckTest : SemaphoreLincheckTestBase(1)
-class Semaphore2LincheckTest : SemaphoreLincheckTestBase(2)
\ No newline at end of file
+open class SemaphoreSequential(
+ private val permits: Int,
+ private val boundMaxPermits: Boolean
+) : VerifierState() {
+ private var availablePermits = permits
+ private val waiters = ArrayList>()
+
+ open fun tryAcquire() = tryAcquireImpl()
+
+ private fun tryAcquireImpl(): Boolean {
+ if (availablePermits <= 0) return false
+ availablePermits--
+ return true
+ }
+
+ suspend fun acquire() {
+ if (tryAcquireImpl()) return
+ availablePermits--
+ suspendCancellableCoroutine { cont ->
+ waiters.add(cont)
+ }
+ }
+
+ fun release() {
+ while (true) {
+ if (boundMaxPermits) check(availablePermits < permits)
+ availablePermits++
+ if (availablePermits > 0) return
+ val w = waiters.removeAt(0)
+ if (w.tryResume0(Unit, { release() })) return
+ }
+ }
+
+ override fun extractState() = availablePermits.coerceAtLeast(0)
+}
+
+class SemaphoreSequential1 : SemaphoreSequential(1, true)
+class Semaphore1LincheckTest : SemaphoreLincheckTestBase(Semaphore(1), SemaphoreSequential1::class)
+
+class SemaphoreSequential2 : SemaphoreSequential(2, true)
+class Semaphore2LincheckTest : SemaphoreLincheckTestBase(Semaphore(2), SemaphoreSequential2::class)
\ No newline at end of file