diff --git a/benchmarks/scripts/generate_plots_semaphore_jvm.py b/benchmarks/scripts/generate_plots_semaphore_jvm.py new file mode 100644 index 0000000000..5daf34e8ab --- /dev/null +++ b/benchmarks/scripts/generate_plots_semaphore_jvm.py @@ -0,0 +1,74 @@ +# To run this script run the command 'python3 scripts/generate_plots_semaphore_jvm.py' in the /benchmarks folder + + +import pandas as pd +import sys +import locale +import matplotlib.pyplot as plt +from matplotlib.ticker import FormatStrFormatter + +input_file = "build/reports/jmh/results.csv" +output_file = "out/semaphore_jvm.svg" +# Please change the value of this variable according to the FlowFlattenMergeBenchmarkKt.ELEMENTS +operations = 1000000 +csv_columns = ["Score", "Param: parallelism", "Param: maxPermits", "Param: algo"] +rename_columns = {"Score": "score", "Param: parallelism" : "threads", "Param: maxPermits" : "permits", "Param: algo": "algo"} + +markers = ['v', 'P', 'x', '8', 'd', '1', '2', '8', 'p'] +# markers = ['.', 'v', 'P', 'x', '8', 'd', '1', '2', '8', 'p'] +colours = ["darkorange", "seagreen", "red", "blueviolet", "sienna"] +# colours = ["royalblue", "darkorange", "seagreen", "red", "blueviolet", "sienna"] + +def next_colour(): + i = 0 + while True: + yield colours[i % len(colours)] + i += 1 + +def next_marker(): + i = 0 + while True: + yield markers[i % len(markers)] + i += 1 + +def draw(data, plt): + plt.xscale('log', basex=2) + plt.gca().xaxis.set_major_formatter(FormatStrFormatter('%0.f')) + plt.grid(linewidth='0.5', color='lightgray') + plt.ylabel("us / op") + plt.xlabel('threads') + plt.xticks(data.threads.unique()) + + colour_gen = next_colour() + marker_gen = next_marker() + for algo in data.algo.unique(): + gen_colour = next(colour_gen) + gen_marker = next(marker_gen) + res = data[(data.algo == algo)] + plt.plot(res.threads, res.score * 1000 / operations, label="{}".format(algo), color=gen_colour, marker=gen_marker) + # plt.errorbar(x=res.concurrency, y=res.score*elements/1000, yerr=res.score_error*elements/1000, solid_capstyle='projecting', + # label="flows={}".format(flows), capsize=4, color=gen_colour, linewidth=2.2) + +langlocale = locale.getdefaultlocale()[0] +locale.setlocale(locale.LC_ALL, langlocale) +dp = locale.localeconv()['decimal_point'] +if dp == ",": + csv_columns.append("Score Error (99,9%)") + rename_columns["Score Error (99,9%)"] = "score_error" +elif dp == ".": + csv_columns.append("Score Error (99.9%)") + rename_columns["Score Error (99.9%)"] = "score_error" +else: + print("Unexpected locale delimeter: " + dp) + sys.exit(1) +data = pd.read_csv(input_file, sep=",", decimal=dp) +data = data[csv_columns].rename(columns=rename_columns) +data = data[(data.permits==8)] +data = data[(data.algo!="Java ReentrantLock")] +data = data[(data.algo!="Java Semaphore")] +plt.rcParams.update({'font.size': 15}) +plt.figure(figsize=(12, 12)) +draw(data, plt) +plt.legend(loc='lower center', borderpad=0, bbox_to_anchor=(0.5, 1.3), ncol=2, frameon=False, borderaxespad=2, prop={'size': 15}) +plt.tight_layout(pad=12, w_pad=2, h_pad=1) +plt.savefig(output_file, bbox_inches='tight') diff --git a/gradle.properties b/gradle.properties index a4057dfcc3..b0f14a5223 100644 --- a/gradle.properties +++ b/gradle.properties @@ -12,7 +12,7 @@ junit_version=4.12 atomicfu_version=0.14.4 knit_version=0.2.2 html_version=0.6.8 -lincheck_version=2.7.1 +lincheck_version=2.9-SNAPSHOT dokka_version=0.9.16-rdev-2-mpp-hacks byte_buddy_version=1.10.9 reactor_version=3.2.5.RELEASE diff --git a/kotlinx-coroutines-core/build.gradle b/kotlinx-coroutines-core/build.gradle index f98f6a529c..a470ebf700 100644 --- a/kotlinx-coroutines-core/build.gradle +++ b/kotlinx-coroutines-core/build.gradle @@ -201,8 +201,8 @@ task jvmStressTest(type: Test, dependsOn: compileTestKotlinJvm) { enableAssertions = true testLogging.showStandardStreams = true systemProperty 'kotlinx.coroutines.scheduler.keep.alive.sec', '100000' // any unpark problem hangs test - systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2' - systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '10' + systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2' + systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '10' } task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) { diff --git a/kotlinx-coroutines-core/common/src/Annotations.kt b/kotlinx-coroutines-core/common/src/Annotations.kt index 5475c6b10e..11e12f1109 100644 --- a/kotlinx-coroutines-core/common/src/Annotations.kt +++ b/kotlinx-coroutines-core/common/src/Annotations.kt @@ -64,3 +64,12 @@ public annotation class ObsoleteCoroutinesApi "so stable API could be provided instead" ) public annotation class InternalCoroutinesApi + +@MustBeDocumented +@Retention(value = AnnotationRetention.BINARY) +@RequiresOptIn( + level = RequiresOptIn.Level.ERROR, + message = "TODO" +) +@Target(AnnotationTarget.FUNCTION, AnnotationTarget.PROPERTY) +public annotation class HazardousConcurrentApi \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt b/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt new file mode 100644 index 0000000000..9acad8ea04 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/internal/SegmentQueueSynchronizer.kt @@ -0,0 +1,606 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.internal + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.ASYNC +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.SYNC +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.* +import kotlinx.coroutines.sync.* +import kotlin.coroutines.* +import kotlin.native.concurrent.* + +/** + * [SegmentQueueSynchronizer] is an abstraction for implementing _fair_ synchronization + * and communication primitives. It maintains a FIFO queue of waiting requests and is + * provided with two main functions: + * + [suspend] that stores the specified waiter into the queue, and + * + [resume] that tries to retrieve and resume the first waiter with the specified value. + * + * One may consider this abstraction as an infinite array with two counters that reference the next cells + * for enqueueing a continuation in [suspend] and for retrieving it in [resume]. To be short, when + * [suspend] is invoked, it increments the corresponding counter via fast `Fetch-And-Add` and stores the + * continuation into the cell. At the same time, [resume] increments its own counter and comes to the + * corresponding cell. + * + * A typical implementation via [SegmentQueueSynchronizer] performs some synchronization at first, + * (e.g., [Semaphore] modifies the number of available permits), and invokes [suspend] or [resume] + * after that. Following this pattern, it is possible in a concurrent environment that [resume] + * is executed before [suspend] (similarly to the race between `park` and `unpark` for threads), + * so that [resume] comes to a cell in the empty state. This race can be solved with two [strategies][ResumeMode]: + * [asynchronous][ASYNC] and [synchronous][SYNC]. In the [asynchronous][ASYNC] mode, [resume] puts + * the value into the cell if it is still empty and finishes immediately; this way, the following [suspend] comes + * to this cell and simply grabs the element without suspension. At the same time, in the [synchronous][SYNC] mode, + * [resume] waits in a bounded spin-loop cycle until the put element is taken by a concurrent [suspend], + * marking the cell as broken if it is not taken after the spin-loop is finished. In this case, both the current + * [resume] and the [suspend] that comes to this broken cell fail and the corresponding operations on the data + * structure are typically restarted from the beginning. + * + * Since [suspend] can store [CancellableContinuation]-s, it is possible for [resume] to fail if the + * continuation is already cancelled. In this case, most of the algorithms retry the whole operation. + * However, if there are many consecutive cancelled waiters, it seems more efficient to skip them + * somehow. Thus, there are different [cancellation modes][cancellationMode] that can be used. + * In the simple mode ([SIMPLE]), which is used by default, [resume] simply fails on cancelled waiters as + * described above, while in smart modes ([SMART_SYNC] and [SMART_ASYNC]) [resume] skips cells in the + * [cancelled][CANCELLED] state. However, if cancellation happens concurrently with [resume], it can be illegal + * to simply skip the cell and resume the next waiter, e.g., if this cancelled waiter is the last one. + * Thus, it is possible for [SegmentQueueSynchronizer] to refuse this [resume]. In order to support this logic, + * users should implement [onCancellation] function, which returns `true` if the cell can be + * moved to the [cancelled][CANCELLED] state, or `false` if the [resume] that comes to this cell + * should be refused. In the last case, [tryReturnRefusedValue] is invoked, so that the value + * can be put back to the data structure. However, it is possible for [tryReturnRefusedValue] to + * fail, and [returnValue] is called in this case. Typically, this [returnValue] function coincide + * with the one that resumes waiters (e.g., [release][Semaphore.release] in [Semaphore]). The difference + * between [SMART_SYNC] and [SMART_ASYNC] modes is that in the first one the [resume] that comes to the + * cell with cancelled waiter waits in a spin-loop until the cell is marked either with [CANCELLED] + * or [REFUSE] mark, while in [SMART_ASYNC] mode it replaces the cancelled waiter with the value of + * the resumption and finishes, so that the cancellation handler completes this [resume] eventually. + * This way, in [SMART_ASYNC] mode the value passed to [resume] can be out of the data structure for a while. + * + * Here is a state machine for cells. Note that only one [suspend] + * and at most one [resume] operation can deal with each cell. + * + * +-------+ `suspend` succeeds. +--------------+ `resume` is +---------------+ store `RESUMED` to +---------+ ( `cont` HAS BEEN ) + * | NULL | ----------------------> | cont: active | -------------> | cont: resumed | --------------------> | RESUMED | ( RESUMED AND THIS ) + * +-------+ +--------------+ successful. +---------------+ avoid memory leaks. +---------+ ( `resume` SUCCEEDS ) + * | | + * | | The continuation + * | `resume` comes to | is cancelled. + * | the cell before | + * | `suspend` and puts V ( THE CORRESPONDING `resume` SHOULD BE ) + * | the element into +-----------------+ The concurrent `resume` should be refused, +--------+ ( REFUSED AND `tryReturnRefusedValue` OR ) + * | the cell, waiting | cont: cancelled | -----------------------------------------------> | REFUSE | ( `returnValue` IF IT FAILS IS USED TO ) + * | for `suspend` in +-----------------+ `onCancellation` has returned `false`. +--------+ ( RETURN THE VALUE BACK TO THE ORIGINAL ) + * | SYNC mode. | \ ^ ( SYNCHRONIZATION PRIMITIVE ) + * | | \ | + * | Mark the cell as `CANCELLED` | \ | + * | if the cancellation mode is | \ `resume` delegates its completing to | `onCancellation` returned `false, + * | `SIMPLE` or `onCancellation` | \ the concurrent cancellation handler if | mark the state accordingly and + * | has returned `true`. | \ `SMART_ASYNC` cancellation mode is used. | complete the hung `resume`. + * | | +------------------------------------------+ | + * | | \ | + * | ( THE CONTINUATION IS ) V V | + * | ( CANCELLED AND `resume` ) +-----------+ +-------+ + * | ( EITHER FAILS IN THE ) | CANCELLED | <-------------------------------------------------- | value | + * | ( SIMPLE CANCELLATION ) +-----------+ Mark the cell as `CANCELLED` if `onCancellation` +-------+ + * | ( MODE OR SKIPS THIS ) returned true, complete the hung `resume` accordingly. + * | ( CELL IN THE SMART ONE ) + * | + * | + * | `suspend` gets +-------+ ( RENDEZVOUS HAPPENED, ) + * | +-----------------> | TAKEN | ( BOTH `resume` and ) + * V | the element. +-------+ ( `suspend` SUCCEED ) + * +-------+ | + * | value | --< + * +-------+ | + * | `tryResume` has waited a bounded time, +--------+ + * +---------------------------------------> | BROKEN | (BOTH `suspend` AND `resume` FAIL) + * but `suspend` has not come. +--------+ + * + * As for the infinite array implementation, it is organized as a linked list of [segments][SQSSegment]; + * each segment contains a fixed number of cells. To determine the cell for each [suspend] and [resume] + * invocation, the algorithm reads the current [tail] or [head], increments [enqIdx] or [deqIdx], and + * finds the required segment starting from the initially read one. + */ +@InternalCoroutinesApi +internal abstract class SegmentQueueSynchronizer { + private val head: AtomicRef + private val deqIdx = atomic(0L) + private val tail: AtomicRef + private val enqIdx = atomic(0L) + + init { + val s = SQSSegment(0, null, 2) + head = atomic(s) + tail = atomic(s) + } + + /** + * Specifies whether [resume] should work in + * [synchronous][SYNC] or [asynchronous][ASYNC] mode. + */ + protected open val resumeMode: ResumeMode get() = SYNC + + /** + * Specifies whether [resume] should fail on cancelled waiters ([SIMPLE]), + * or skip them in either [synchronous][SMART_SYNC] or [asynchronous][SMART_ASYNC] + * way. In the asynchronous mode [resume] may pass the element to the + * cancellation handler in order not to wait, so that the element can be "hung" + * for a while, but it is guaranteed that this [resume] will be completed eventually. + */ + protected open val cancellationMode: CancellationMode get() = SIMPLE + + /** + * This function is invoked when waiter is cancelled and smart + * cancellation mode is used (so that cancelled cells are skipped by + * [resume]). Typically, this function performs the logical cancellation. + * It returns `true` if the cancellation succeeds and the cell can be + * marked as [CANCELLED]. This way, a concurrent [resume] skips this cell, + * and the value stays in the waiting queue. However, if the concurrent + * [resume] should be refused by this [SegmentQueueSynchronizer], + * [onCancellation] should return false. In this case, [tryReturnRefusedValue] + * is invoked with the value of [resume], following by [returnValue] + * if the attempt fails. + */ + protected open fun onCancellation() : Boolean = false + + /** + * This function specifies how the refused by + * this [SegmentQueueSynchronizer] value should + * be returned back to the data structure. It + * returns `true` if succeeds or `false` if the + * attempt failed, so that [returnValue] should + * be used to complete the returning. + */ + protected open fun tryReturnRefusedValue(value: T): Boolean = true + + /** + * This function specifies how the value from + * a failed [resume] should be returned back to + * the data structure. It is typically the function + * that invokes [resume] (e.g., [release][Semaphore.release] + * in [Semaphore]). + */ + protected open fun returnValue(value: T) {} + + /** + * This is a short-cut for [tryReturnRefusedValue] and + * the following [returnValue] invocation if it fails. + */ + private fun returnRefusedValue(value: T) { + if (tryReturnRefusedValue(value)) return + returnValue(value) + } + + /** + * Puts the specified continuation into the waiting queue, and returns `true` on success. + * Since [suspend] and [resume] can be invoked concurrently (similarly to `park` and `unpark` + * for threads), it is possible that [resume] comes earlier. In this case, the [resume] comes + * to the first cell and puts it value into it. After that, this [suspend] should come and + * grab the value. However, if the [synchronous][SYNC] resumption mode is used, the concurrent + * [resume] can mark its cell as [broken][BROKEN]; thus, this [suspend] invocation comes to the + * broken cell and fails. The typical patter is retrying both operations, the one that + * failed on [suspend] and the one that failed on [resume], from the beginning. + */ + @Suppress("UNCHECKED_CAST") + fun suspend(cont: Continuation): Boolean { + // Increment `enqIdx` and find the segment + // with the corresponding id. It is guaranteed + // that this segment is not removed since at + // least the cell for this [suspend] invocation + // is not in the `CANCELLED` state. + val curTail = this.tail.value + val enqIdx = enqIdx.getAndIncrement() + val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail, + createNewSegment = ::createSegment).segment + assert { segment.id == enqIdx / SEGMENT_SIZE } + // Try to install the continuation in the cell, + // this is the regular path. + val i = (enqIdx % SEGMENT_SIZE).toInt() + if (segment.cas(i, null, cont)) { + // The continuation is successfully installed, + // add a cancellation handler if it is cancellable + // and complete successfully. + if (cont is CancellableContinuation<*>) { + cont.invokeOnCancellation(SQSCancellationHandler(segment, i).asHandler) + } + return true + } + // The continuation installation failed. This can happen only + // if a concurrent `resume` comes earlier to this cell and put + // its value into it. Note, that in `SYNC` resumption mode + // this concurrent `resume` can mark the cell as broken. + // + // Try to grab the value if the cell is not in the `BROKEN` state. + val value = segment.get(i) + if (value !== BROKEN && segment.cas(i, value, TAKEN)) { + // The elimination is successfully performed, + // resume the continuation with the value and complete. + cont.resume(value as T) + return true + } + // The cell is broken, this can happen only in `SYNC` resumption mode. + assert { resumeMode == SYNC && segment.get(i) === BROKEN } + return false + } + + /** + * Tries to resume the next waiter and returns `true` if + * the resumption succeeds. However, it can fail due to + * several reasons. First of all, if the [resumption mode][resumeMode] + * is [synchronous][SYNC], this [resume] invocation may come + * before [suspend] and mark the cell as [broken][BROKEN]; + * `false` is returned in this case. At the same time, according + * to the [simple cancellation mode][SIMPLE], this [resume] fails + * if the next waiter is cancelled, and returns `false` in this case. + * + * Note that when smart cancellation ([SMART_SYNC] or [SMART_ASYNC]) + * is used, [resume] skips cancelled waiters and can fail only in + * case of unsuccessful elimination due to [synchronous][SYNC] + * resumption mode. + */ + fun resume(value: T): Boolean { + // Should we skip cancelled cells? + val skipCancelled = cancellationMode != SIMPLE + while (true) { + // Try to resume the next waiter, adjust [deqIdx] if + // cancelled cells should be skipped anyway. + when (tryResumeImpl(value, adjustDeqIdx = skipCancelled)) { + TRY_RESUME_SUCCESS -> return true + TRY_RESUME_FAIL_CANCELLED -> if (!skipCancelled) return false + TRY_RESUME_FAIL_BROKEN -> return false + } + } + } + + /** + * Tries to resume the next waiter, and returns [TRY_RESUME_SUCCESS] on + * success, [TRY_RESUME_FAIL_CANCELLED] if the next waiter is cancelled, + * or [TRY_RESUME_FAIL_BROKEN] if the next cell is marked as broken by + * this [tryResumeImpl] invocation due to the [SYNC] resumption mode. + * + * In the smart cancellation modes ([SMART_SYNC] and [SMART_ASYNC]) the + * cells marked as [cancelled][CANCELLED] should be skipped, so that + * there is no need to increment [deqIdx] one-by-one is there is a + * removed segment (logically full of [cancelled][CANCELLED] cells); + * it is faster to point [deqIdx] to the first possibly non-cancelled + * cell instead, i.e. to the first segment id multiplied by the + * [segment size][SEGMENT_SIZE]. + */ + @Suppress("UNCHECKED_CAST") + private fun tryResumeImpl(value: T, adjustDeqIdx: Boolean): Int { + // Check that `adjustDeqIdx` is `false` + // in the simple cancellation mode. + assert { !(cancellationMode == SIMPLE && adjustDeqIdx) } + // Increment `deqIdx` and find the first segment with + // the corresponding or higher (if the required segment + // is physically removed) id. + val curHead = this.head.value + val deqIdx = deqIdx.getAndIncrement() + val id = deqIdx / SEGMENT_SIZE + val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, + createNewSegment = ::createSegment).segment + // The previous segments can be safely collected + // by GC, clean the pointer to them. + segment.cleanPrev() + // Is the required segment physically removed? + if (segment.id > id) { + // Adjust `deqIdx` to the first + // non-removed segment if needed. + if (adjustDeqIdx) adjustDeqIdx(segment.id * SEGMENT_SIZE) + // The cell #deqIdx is in the cancelled state, + // return the corresponding failure. + return TRY_RESUME_FAIL_CANCELLED + } + // Modify the cell according to the state machine, + // all the transitions are performed atomically. + val i = (deqIdx % SEGMENT_SIZE).toInt() + modify_cell@while (true) { + val cellState = segment.get(i) + when { + // Is the cell empty? + cellState === null -> { + // Try to perform an elimination by putting the + // value to the empty cell and wait until it is + // taken by a concurrent `suspend` in case of + // using the synchronous resumption mode. + if (!segment.cas(i, null, value)) continue@modify_cell + // Finish immediately in the async mode. + if (resumeMode == ASYNC) return TRY_RESUME_SUCCESS + // Wait for a concurrent `suspend` for a bounded + // time; it should mark the cell as taken. + repeat(MAX_SPIN_CYCLES) { + if (segment.get(i) === TAKEN) return TRY_RESUME_SUCCESS + } + // The value is still not taken, try to + // atomically mark the cell as broken. + // A failure indicates that the value is taken. + return if (segment.cas(i, value, BROKEN)) TRY_RESUME_FAIL_BROKEN else TRY_RESUME_SUCCESS + } + // Is the waiter cancelled? + cellState === CANCELLED -> { + // Return the corresponding failure. + return TRY_RESUME_FAIL_CANCELLED + } + // Should the current `resume` be refused + // by this `SegmentQueueSynchronizer`? + cellState === REFUSE -> { + // This state should not occur + // in the simple cancellation mode. + assert { cancellationMode != SIMPLE } + // Return the refused value back to + // the data structure and succeed. + returnRefusedValue(value) + return TRY_RESUME_SUCCESS + } + // Does the cell store a cancellable continuation? + cellState is CancellableContinuation<*> -> { + // Try to resume the continuation. + val success = (cellState as CancellableContinuation).tryResume0(value, { returnValue(value) }) + // Is the resumption successful? + if (success) { + // Mark the cell as `DONE` to avoid + // memory leaks and complete successfully. + segment.set(i, RESUMED) + return TRY_RESUME_SUCCESS + } + // The continuation is cancelled, the handling + // logic depends on the cancellation mode. + when (cancellationMode) { + // Fail correspondingly in the simple mode. + SIMPLE -> return TRY_RESUME_FAIL_CANCELLED + // In the smart cancellation mode this cell + // can be either skipped (if it is going to + // be marked as cancelled) or this `resume` + // should be refused. The `SMART_SYNC` mode + // waits in a an infinite spin-loop until + // the state of this cell is changed to + // either `CANCELLED` or `REFUSE`. + SMART_SYNC -> continue@modify_cell + // At the same time, in `SMART_ASYNC` mode + // `resume` replaces the cancelled continuation + // with the resumption value and completes. + // Thus, the concurrent cancellation handler + // notices this value and completes this `resume`. + SMART_ASYNC -> { + // Try to put the value into the cell if there is + // no decision on whether the cell is in the `CANCELLED` + // or `REFUSE` state, and finish if the put is performed. + val valueToStore: Any? = if (value is Continuation<*>) WrappedContinuationValue(value) else value + if (segment.cas(i, cellState, valueToStore)) return TRY_RESUME_SUCCESS + } + } + } + // The cell stores a non-cancellable + // continuation, we can simply resume it. + else -> { + // Resume the continuation and mark the cell + // as `DONE` to avoid memory leaks. + (cellState as Continuation).resume(value) + segment.set(i, RESUMED) + return TRY_RESUME_SUCCESS + } + } + } + } + + /** + * Updates [deqIdx] to [newValue] if the current value is lower. + * Thus, it is guaranteed that either the update is successfully + * performed or the value of [deqIdx] is greater or equal to [newValue]. + */ + private fun adjustDeqIdx(newValue: Long): Unit = deqIdx.loop { cur -> + if (cur >= newValue) return + if (deqIdx.compareAndSet(cur, newValue)) return + } + + /** + * These modes define the strategy that [resume] should + * use if it comes to the cell before [suspend] and finds it empty. + * In the [asynchronous][ASYNC] mode, it puts the value into the cell, + * so that [suspend] grabs it and immediately resumes without actual + * suspension; in other words, an elimination happens in this case. + * However, this strategy produces an incorrect behavior when used for some + * data structures (e.g., for [tryAcquire][Semaphore.tryAcquire] in [Semaphore]), + * so that the [synchronous][SYNC] is introduced. Similarly to the asynchronous one, + * [resume] puts the value into the cell, but do not finish right after that. + * In opposite, it waits in a bounded spin-loop (see [MAX_SPIN_CYCLES]) until + * the value is taken, completes after that. If the value is not taken after + * this spin-loop is finished, [resume] marks the cell as [broken][BROKEN] + * and fails, so that the corresponding [suspend] invocation finds the cell + * [broken][BROKEN] and fails as well. + */ + internal enum class ResumeMode { SYNC, ASYNC } + + /** + * These modes define the mode that should be used for cancellation. + * Essentially, there are two modes: simple and smart. + * Specifies whether [resume] should fail on cancelled waiters ([SIMPLE]), + * or skip them in either [synchronous][SMART_SYNC] or [asynchronous][SMART_ASYNC] + * way. In the asynchronous skip mode [resume] may pass the element to the + * cancellation handler in order not to wait, so that the element can be "hung" + * for a while. + */ + internal enum class CancellationMode { SIMPLE, SMART_SYNC, SMART_ASYNC } + + /** + * This cancellation handler is invoked when + * the waiter located by ```segment[index]``` + * is cancelled. + */ + private inner class SQSCancellationHandler( + private val segment: SQSSegment, + private val index: Int + ) : CancelHandler() { + override fun invoke(cause: Throwable?) { + // Do we use simple or smart cancellation? + if (cancellationMode === SIMPLE) { + // In the simple cancellation mode the logic + // is straightforward -- mark the cell as + // cancelled to avoid memory leaks and complete. + segment.markCancelled(index) + return + } + // We are in the smart cancellation mode. + // Perform the cancellation-related logic and + // check whether the value of the `resume` that + // comes to this cell should be processed in the + // `SegmentQueueSynchronizer` or refused by it. + val cancelled = onCancellation() + if (cancelled) { + // The cell should be considered as cancelled. + // Mark the cell correspondingly and help a + // concurrent `resume` to process its value if + // needed (see `SMART_ASYNC` cancellation mode). + val value = segment.markCancelled(index) ?: return + // Try to resume the next waiter with the value + // provided by a concurrent `resume`. + if (resume(value as T)) return + // The resumption has been failed because of the + // `SYNC` resume mode. Return the value back to + // the original data structure. + returnValue(value) + } else { + // The value of the `resume` that comes to this + // cell should be refused by this `SegmentQueueSynchronizer`. + // Mark the cell correspondingly and help a concurrent + // `resume` to process its value if needed + // (see `SMART_ASYNC` cancellation mode). + val value = segment.markRefused(index) ?: return + returnRefusedValue(value as T) + } + } + + override fun toString() = "SQSCancellationHandler[$segment, $index]" + } +} + +/** + * Tries to resume this continuation atomically, + * returns `true` if succeeds and `false` otherwise. + */ +private fun CancellableContinuation.tryResume0(value: T, onCancellation: ((cause: Throwable) -> Unit)): Boolean { + val token = tryResume(value, onCancellation) ?: return false + completeResume(token) + return true +} + +private fun createSegment(id: Long, prev: SQSSegment?) = SQSSegment(id, prev, 0) + +/** + * The queue of waiters in [SegmentQueueSynchronizer] + * is represented as a linked list of these segments. + */ +private class SQSSegment(id: Long, prev: SQSSegment?, pointers: Int) : Segment(id, prev, pointers) { + private val waiters = atomicArrayOfNulls(SEGMENT_SIZE) + override val maxSlots: Int get() = SEGMENT_SIZE + + @Suppress("NOTHING_TO_INLINE") + inline fun get(index: Int): Any? = waiters[index].value + + @Suppress("NOTHING_TO_INLINE") + inline fun set(index: Int, value: Any?) { + waiters[index].value = value + } + + @Suppress("NOTHING_TO_INLINE") + inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = waiters[index].compareAndSet(expected, value) + + @Suppress("NOTHING_TO_INLINE") + inline fun getAndSet(index: Int, value: Any?): Any? = waiters[index].getAndSet(value) + + /** + * Marks the cell as cancelled and returns `null`, so that + * the [resume] that comes to the cell should notice + * that the cell is cancelled and should fail or skip it + * depending on the cancellation mode. However, in [SMART_ASYNC] + * cancellation mode [resume] that comes to the cell with cancelled + * continuation asynchronously puts its value into the cell, + * and the cancellation handler completes the resumption. + * In this case, [markCancelled] returns this non-null value. + * + * If the whole segment contains [CANCELLED] markers after + * this invocation, [onSlotCleaned] is invoked and this segment + * is going to be removed if [head][SegmentQueueSynchronizer.head] + * and [tail][SegmentQueueSynchronizer.tail] do not reference it. + * Note that the segments that are not stored physically are still + * considered as logically stored but being full of cancelled waiters. + */ + fun markCancelled(index: Int): Any? = mark(index, CANCELLED).also { + onSlotCleaned() + } + + /** + * Marks the cell as refused and returns `null`, so that + * the [resume] that comes to the cell should notice + * that its value is refused by the [SegmentQueueSynchronizer], + * and [SegmentQueueSynchronizer.tryReturnRefusedValue] + * is invoked in this case (if it fails, the value is put back via + * [SegmentQueueSynchronizer.returnValue]). Since in [SMART_ASYNC] + * cancellation mode [resume] that comes to the cell with cancelled + * continuation asynchronously puts its value into the cell. + * In this case, [markRefused] returns this non-null value. + */ + fun markRefused(index: Int): Any? = mark(index, REFUSE) + + /** + * Marks the cell with the specified [marker] + * and returns `null` if the cell contains the + * cancelled continuation. However, in the [SMART_ASYNC] + * cancellation mode it is possible that [resume] comes + * to the cell with cancelled continuation and asynchronously + * puts its value into the cell, so that the cancellation + * handler decides whether this value should be used for + * resuming the next waiter or be refused. In the latter case, + * the corresponding non-null value is returned as a result. + */ + private fun mark(index: Int, marker: Any?): Any? = + when (val old = getAndSet(index, marker)) { + // Did the cell contain the cancelled continuation? + is Continuation<*> -> { + assert { if (old is CancellableContinuation<*>) old.isCancelled else true } + null + } + // Did the cell contain an asynchronously put value? + // (both branches deal with values) + is WrappedContinuationValue -> old.cont + else -> old + } + + override fun toString() = "SQSSegment[id=$id, hashCode=${hashCode()}]" +} + +/** + * In the [smart asynchronous cancellation mode][SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC] + * it is possible that [resume] comes to the cell with cancelled continuation and + * asynchronously puts its value into the cell, so that the cancellation handler decides whether + * this value should be used for resuming the next waiter or be refused. When this + * value is a continuation, it is hard to distinguish it with the one related to the cancelled + * waiter. Thus, such values are wrapped with [WrappedContinuationValue] in this case. Note, that the + * wrapper is required only in [SegmentQueueSynchronizer.CancellationMode.SMART_ASYNC] mode + * and is used in the asynchronous race resolution logic between cancellation and [resume] + * invocation; this way, it is used relatively rare. + */ +private class WrappedContinuationValue(val cont: Continuation<*>) + +@SharedImmutable +private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.sqs.segmentSize", 16) +@SharedImmutable +private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.sqs.maxSpinCycles", 100) +@SharedImmutable +private val TAKEN = Symbol("TAKEN") +@SharedImmutable +private val BROKEN = Symbol("BROKEN") +@SharedImmutable +private val CANCELLED = Symbol("CANCELLED") +@SharedImmutable +private val REFUSE = Symbol("REFUSE") +@SharedImmutable +private val RESUMED = Symbol("RESUMED") + +private const val TRY_RESUME_SUCCESS = 0 +private const val TRY_RESUME_FAIL_CANCELLED = 1 +private const val TRY_RESUME_FAIL_BROKEN = 2 \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt b/kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt new file mode 100644 index 0000000000..957a1479f1 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/sync/ReadWriteMutex.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.sync + +import kotlinx.coroutines.* + +public interface ReadWriteMutex { + /** + * + */ + @HazardousConcurrentApi + public suspend fun readLock() + + /** + * + */ + @HazardousConcurrentApi + public fun readUnlock() + + /** + * + */ + @HazardousConcurrentApi + public suspend fun writeLock() + + /** + * + */ + @HazardousConcurrentApi + public fun writeUnlock() +} + +/** + * + */ +@OptIn(HazardousConcurrentApi::class) +public suspend inline fun ReadWriteMutex.withReadLock(action: () -> T): T { + readLock() + try { + return action() + } finally { + readUnlock() + } +} + +/** + * + */ +@OptIn(HazardousConcurrentApi::class) +public suspend inline fun ReadWriteMutex.withWriteLock(action: () -> T): T { + writeLock() + try { + return action() + } finally { + writeUnlock() + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 84b7f4f8a2..9b807b46f3 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.* import kotlin.contracts.* import kotlin.coroutines.* import kotlin.math.* -import kotlin.native.concurrent.SharedImmutable /** * A counting semaphore for coroutines that logically maintains a number of available permits. @@ -33,6 +32,9 @@ public interface Semaphore { * * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this * function is suspended, this function immediately resumes with [CancellationException]. + * + * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this + * function is suspended, this function immediately resumes with [CancellationException]. * There is a **prompt cancellation guarantee**. If the job was cancelled while this function was * suspended, it will not resume successfully. See [suspendCancellableCoroutine] documentation for low-level details. * This function releases the semaphore if it was already acquired by this function before the [CancellationException] @@ -90,53 +92,10 @@ public suspend inline fun Semaphore.withPermit(action: () -> T): T { } } -private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Semaphore { - /* - The queue of waiting acquirers is essentially an infinite array based on the list of segments - (see `SemaphoreSegment`); each segment contains a fixed number of slots. To determine a slot for each enqueue - and dequeue operation, we increment the corresponding counter at the beginning of the operation - and use the value before the increment as a slot number. This way, each enqueue-dequeue pair - works with an individual cell. We use the corresponding segment pointers to find the required ones. - - Here is a state machine for cells. Note that only one `acquire` and at most one `release` operation - can deal with each cell, and that `release` uses `getAndSet(PERMIT)` to perform transitions for performance reasons - so that the state `PERMIT` represents different logical states. - - +------+ `acquire` suspends +------+ `release` tries +--------+ // if `cont.tryResume(..)` succeeds, then - | NULL | -------------------> | cont | -------------------> | PERMIT | (cont RETRIEVED) // the corresponding `acquire` operation gets - +------+ +------+ to resume `cont` +--------+ // a permit and the `release` one completes. - | | - | | `acquire` request is cancelled and the continuation is - | `release` comes | replaced with a special `CANCEL` token to avoid memory leaks - | to the slot before V - | `acquire` and puts +-----------+ `release` has +--------+ - | a permit into the | CANCELLED | -----------------> | PERMIT | (RElEASE FAILED) - | slot, waiting for +-----------+ failed +--------+ - | `acquire` after - | that. - | - | `acquire` gets +-------+ - | +-----------------> | TAKEN | (ELIMINATION HAPPENED) - V | the permit +-------+ - +--------+ | - | PERMIT | -< - +--------+ | - | `release` has waited a bounded time, +--------+ - +---------------------------------------> | BROKEN | (BOTH RELEASE AND ACQUIRE FAILED) - but `acquire` has not come +--------+ - */ - - private val head: AtomicRef - private val deqIdx = atomic(0L) - private val tail: AtomicRef - private val enqIdx = atomic(0L) - +private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : SegmentQueueSynchronizer(), Semaphore { init { require(permits > 0) { "Semaphore should have at least 1 permit, but had $permits" } require(acquiredPermits in 0..permits) { "The number of acquired permits should be in 0..$permits" } - val s = SemaphoreSegment(0, null, 2) - head = atomic(s) - tail = atomic(s) } /** @@ -149,18 +108,19 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se private val _availablePermits = atomic(permits - acquiredPermits) override val availablePermits: Int get() = max(_availablePermits.value, 0) - private val onCancellationRelease = { _: Throwable -> release() } - - override fun tryAcquire(): Boolean { - _availablePermits.loop { p -> - if (p <= 0) return false - if (_availablePermits.compareAndSet(p, p - 1)) return true - } + override fun tryAcquire(): Boolean = _availablePermits.loop { p -> + // Try to decrement the number of available + // permits if it is greater than zero. + if (p <= 0) return false + if (_availablePermits.compareAndSet(p, p - 1)) return true } override suspend fun acquire() { + // Decrement the number of available permits. val p = _availablePermits.getAndDecrement() - if (p > 0) return // permit acquired + // Is the permit acquired? + if (p > 0) return + // Try to suspend otherwise. // While it looks better when the following function is inlined, // it is important to make `suspend` function invocations in a way // so that the tail-call optimization can be applied. @@ -169,136 +129,43 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se private suspend fun acquireSlowPath() = suspendCancellableCoroutineReusable sc@ { cont -> while (true) { - if (addAcquireToQueue(cont)) return@sc + // Try to suspend. + if (suspend(cont)) return@sc + // The suspension has been failed + // due to the synchronous resumption mode. + // Restart the whole acquire, and decrement + // the number of available permits at first. val p = _availablePermits.getAndDecrement() - if (p > 0) { // permit acquired + // Is the permit acquired? + if (p > 0) { cont.resume(Unit) return@sc } + // Otherwise, go to the beginning of + // the loop and try to suspend otherwise. } } override fun release() { while (true) { + // Increment the number of available permits + // if it does not exceed the maximum value. val p = _availablePermits.getAndUpdate { cur -> check(cur < permits) { "The number of released permits cannot be greater than $permits" } cur + 1 } + // Is there a waiter that should be resumed? if (p >= 0) return - if (tryResumeNextFromQueue()) return - } - } - - /** - * Returns `false` if the received permit cannot be used and the calling operation should restart. - */ - private fun addAcquireToQueue(cont: CancellableContinuation): Boolean { - val curTail = this.tail.value - val enqIdx = enqIdx.getAndIncrement() - val segment = this.tail.findSegmentAndMoveForward(id = enqIdx / SEGMENT_SIZE, startFrom = curTail, - createNewSegment = ::createSegment).segment // cannot be closed - val i = (enqIdx % SEGMENT_SIZE).toInt() - // the regular (fast) path -- if the cell is empty, try to install continuation - if (segment.cas(i, null, cont)) { // installed continuation successfully - cont.invokeOnCancellation(CancelSemaphoreAcquisitionHandler(segment, i).asHandler) - return true - } - // On CAS failure -- the cell must be either PERMIT or BROKEN - // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it - if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair - // The following resume must always succeed, since continuation was not published yet and we don't have - // to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled - cont.resume(Unit) - return true - } - assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it - return false // broken cell, need to retry on a different cell - } - - @Suppress("UNCHECKED_CAST") - private fun tryResumeNextFromQueue(): Boolean { - val curHead = this.head.value - val deqIdx = deqIdx.getAndIncrement() - val id = deqIdx / SEGMENT_SIZE - val segment = this.head.findSegmentAndMoveForward(id, startFrom = curHead, - createNewSegment = ::createSegment).segment // cannot be closed - segment.cleanPrev() - if (segment.id > id) return false - val i = (deqIdx % SEGMENT_SIZE).toInt() - val cellState = segment.getAndSet(i, PERMIT) // set PERMIT and retrieve the prev cell state - when { - cellState === null -> { - // Acquire has not touched this cell yet, wait until it comes for a bounded time - // The cell state can only transition from PERMIT to TAKEN by addAcquireToQueue - repeat(MAX_SPIN_CYCLES) { - if (segment.get(i) === TAKEN) return true - } - // Try to break the slot in order not to wait - return !segment.cas(i, PERMIT, BROKEN) - } - cellState === CANCELLED -> return false // the acquire was already cancelled - else -> return (cellState as CancellableContinuation).tryResumeAcquire() + // Try to resume the first waiter, and + // re-start the operation if it is cancelled. + if (resume(Unit)) return } } - private fun CancellableContinuation.tryResumeAcquire(): Boolean { - val token = tryResume(Unit, null, onCancellationRelease) ?: return false - completeResume(token) - return true - } -} - -private class CancelSemaphoreAcquisitionHandler( - private val segment: SemaphoreSegment, - private val index: Int -) : CancelHandler() { - override fun invoke(cause: Throwable?) { - segment.cancel(index) - } - - override fun toString() = "CancelSemaphoreAcquisitionHandler[$segment, $index]" -} - -private fun createSegment(id: Long, prev: SemaphoreSegment?) = SemaphoreSegment(id, prev, 0) - -private class SemaphoreSegment(id: Long, prev: SemaphoreSegment?, pointers: Int) : Segment(id, prev, pointers) { - val acquirers = atomicArrayOfNulls(SEGMENT_SIZE) - override val maxSlots: Int get() = SEGMENT_SIZE - - @Suppress("NOTHING_TO_INLINE") - inline fun get(index: Int): Any? = acquirers[index].value - - @Suppress("NOTHING_TO_INLINE") - inline fun set(index: Int, value: Any?) { - acquirers[index].value = value - } - - @Suppress("NOTHING_TO_INLINE") - inline fun cas(index: Int, expected: Any?, value: Any?): Boolean = acquirers[index].compareAndSet(expected, value) - - @Suppress("NOTHING_TO_INLINE") - inline fun getAndSet(index: Int, value: Any?) = acquirers[index].getAndSet(value) - - // Cleans the acquirer slot located by the specified index - // and removes this segment physically if all slots are cleaned. - fun cancel(index: Int) { - // Clean the slot - set(index, CANCELLED) - // Remove this segment if needed - onSlotCleaned() + override fun returnValue(value: Unit) { + // Return the permit if the current continuation + // is cancelled after the `tryResume` invocation + // because of the prompt cancellation. + release() } - - override fun toString() = "SemaphoreSegment[id=$id, hashCode=${hashCode()}]" -} -@SharedImmutable -private val MAX_SPIN_CYCLES = systemProp("kotlinx.coroutines.semaphore.maxSpinCycles", 100) -@SharedImmutable -private val PERMIT = Symbol("PERMIT") -@SharedImmutable -private val TAKEN = Symbol("TAKEN") -@SharedImmutable -private val BROKEN = Symbol("BROKEN") -@SharedImmutable -private val CANCELLED = Symbol("CANCELLED") -@SharedImmutable -private val SEGMENT_SIZE = systemProp("kotlinx.coroutines.semaphore.segmentSize", 16) +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueSynchronizerLCStressTests.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueSynchronizerLCStressTests.kt new file mode 100644 index 0000000000..f1a3b9b216 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SegmentQueueSynchronizerLCStressTests.kt @@ -0,0 +1,1512 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("unused") +package kotlinx.coroutines.linearizability + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.check +import kotlinx.coroutines.internal.* +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.CancellationMode.* +import kotlinx.coroutines.internal.SegmentQueueSynchronizer.ResumeMode.* +import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.* +import org.jetbrains.kotlinx.lincheck.annotations.* +import org.jetbrains.kotlinx.lincheck.annotations.Operation +import org.jetbrains.kotlinx.lincheck.paramgen.* +import org.jetbrains.kotlinx.lincheck.strategy.managed.modelchecking.* +import org.jetbrains.kotlinx.lincheck.strategy.stress.* +import org.jetbrains.kotlinx.lincheck.verifier.* +import org.junit.* +import kotlin.coroutines.* +import kotlin.reflect.* + +// This test suit serves two purposes. First of all, it tests the `SegmentQueueSynchronizer` +// implementation under different use-cases and workloads. On the other side, this test suite +// provides different well-known synchronization and communication primitive implementations +// via `SegmentQueueSynchronizer`, which can be considered as an API richness check as well as +// a collection of examples on how to use `SegmentQueueSynchronizer` to build new primitives. + +// ######################## +// # READERS-WRITER MUTEX # +// ######################## + +/** + * This readers-writer mutex maintains two atomic variables [R] and [W], and uses two + * separate [SegmentQueueSynchronizer]-s for waiting readers and writers. The 64-bit + * variable [R] maintains three mostly readers-related states atomically: + * - `AWF` (active writer flag) bit that is `true` if there is a writer holding the write lock. + * - `WWF` (waiting writer flag) bit that is `true` if there is a writer waiting for the write lock + * and the lock is not acquired due to active readers. + * - `AR` (active readers) 30-bit counter which represents the number of coroutines holding a read lock, + * - `WR` (waiting readers) 30-bit counter which represents the number of coroutines waiting for a + * read lock in the corresponding [SegmentQueueSynchronizer]. + * This way, when a reader comes for a lock, it atomically checks whether the `WF` flag is set and + * either increments the `AR` counter and acquires a lock if it is not set, or increments the + * `WR` counter and suspends otherwise. At the same time, when a reader releases the lock, it + * it checks whether it is the last active reader and resumes the first waiting writer if the `WF` + * flag is set. + * + * Writers, on their side, use an additional [W] field which represents the number of waiting + * writers as well as several internal flags: + * - `WW` (waiting writers) 30-bit counter that represents the number of coroutines waiting for + * the write lock in the corresponding [SegmentQueueSynchronizer], + * - `WLA` (the write lock is acquired) flag which is `true` when the write lock is acquired, + * and `WF` should be `true` as well in this case, + * - `WLRP` (write lock release is in progress) flag which is `true` when the [releaseWrite] + * invocation is in progress. Note, that `WLA` + * should already be `false` in this case. + * - `WRF` (writer is resumed) flag that can be set to `true` by [releaseRead] if there is a + * concurrent [releaseWrite], so that `WLRP` is set to true. This + * flag helps to manage the race when [releaseWrite] successfully + * resumed waiting readers, has not re-set `WF` flag in [R] yet, + * while there readers completed with [releaseRead] and the last + * one observes the `WF` flag set to `true`, so that it should try + * to resume the next waiting writer. However, it is better to tell + * the concurrent [releaseWrite] to check whether there is a writer + * and resume it. + * + */ +internal class ReadWriteMutex { + private val R = atomic(0L) + private val W = atomic(0) + + private val sqsForWriters = object : SegmentQueueSynchronizer() { + override val resumeMode: ResumeMode get() = ASYNC + override val cancellationMode: CancellationMode get() = SMART_ASYNC + + override fun onCancellation(): Boolean { + // Decrement the number of waiting writers + while (true) { + val w = curW + if (w.ww == 0) return false + if (w.ww > 1 || w.wla || w.wlrp) { + if (casW(w, constructW(w.ww - 1, w.wla, w.wlrp, w.wrf))) return true + } else { + // w.ww == 1, w.wla == false, w.wlrp == false, w.wrf == false + if (casW(w, constructW(1, false, true, false))) break + } + } + // The WLRP flag is successfully set. + // Re-set the WWF flag or complete if the AWF flag is set. + while (true) { + val r = R.value + val awf = r.awf + val wwf = r.wwf + val ar = r.ar + val wr = r.wr + assert(ar > 0 && wwf || awf) { "Either active writer or active reader should be here" } + // Should this last cancelled waiting writer be resumed by the last reader? + if (awf) { + // Re-set the WLRP flag + while (true) { + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(wlrp) { "WLRP flag should still be set" } + assert(!wla) { "WLA cannot be set here" } + if (wrf) { + // Are we the last waiting writer? + if (ww == 1) { + val updW = constructW(0, true, false, false) + if (W.compareAndSet(w, updW)) return false + } else { + val updW = constructW(ww - 2, true, false, false) + if (W.compareAndSet(w, updW)) { + resume(Unit) + return true + } + } + } else { + assert(ww > 0) { "Our cancelling writer should still be counted" } + val updW = constructW(ww, false, false, false) + if (W.compareAndSet(w, updW)) return true + } + } + } + // Try to reset the WWF flag and resume waiting readers. + if (R.compareAndSet(r, constructR(false, false, ar + wr, 0))) { + // Were there waiting readers? + // Resume them physically if needed. + repeat(wr) { + val success = sqsForReaders.resume(Unit) + assert(success) { "Reader resumption cannot fail because of the smart cancellation in the SQS" } + } + break + } + } + // Check whether the AWF or WWF flag should be set back due to new waiting writers. + // Phase 3. Try to re-set the WLRP flag + // if there is no waiting writer. + while (true) { + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(wlrp) { "WLRP flag should still be set" } + assert(!wla) { "There should be no active writer at this point: $this" } + assert(!wrf) { "WRF flag can be set only when the WF flag is put back"} + // Is there a waiting writer? + if (ww > 1) break + // No waiting writers, try + // to complete immediately. + val updW = constructW(0, false, false, false) + if (W.compareAndSet(w, updW)) return true + } + // Phase 4. There is a waiting writer, + // set the WF flag back and try to grab + // the write lock. + var acquired: Boolean + while (true) { + val r = R.value + val awf = r.awf + val wwf = r.wwf + val ar = r.ar + val wr = r.wr + assert(!awf && !wwf) { "The WF flag should not be set at this point" } + assert(wr == 0) { "The number of waiting readers shold be 0 when the WF flag is not set" } + acquired = ar == 0 + val updR = constructR(acquired, !acquired, ar, 0) + // Try to set the WF flag. + if (R.compareAndSet(r, updR)) break + } + // Phase 5. Re-set the WLRP flag and try to resume + // the next waiting writer if the write lock is grabbed. + while (true) { + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(ww > 0) { "WW cannot decrease without resumptions" } + assert(wlrp) { "WLRP flag should still be set" } + assert(!wla) { "WLA cannot be set here" } + val resume = acquired || wrf + val updW = constructW(if (resume) ww - 1 else ww, resume, false, false) + if (!W.compareAndSet(w, updW)) continue + if (resume) resume(Unit) + return true + } + } + + override fun tryReturnRefusedValue(value: Unit): Boolean { + releaseWrite() + return true + } + } + + private val curR get() = R.value + private val curW get() = W.value + private fun casR(cur: Long, new: Long) = R.compareAndSet(cur, new) + private fun casW(cur: Int, new: Int) = W.compareAndSet(cur, new) + + private val sqsForReaders = object : SegmentQueueSynchronizer() { + override val resumeMode: ResumeMode get() = ASYNC + override val cancellationMode: CancellationMode get() = SMART_ASYNC + + override fun onCancellation(): Boolean { + while (true) { + val r = curR + if (r.wr == 0) return false + if (casR(r, constructR(r.awf, r.wwf, r.ar, r.wr - 1))) { + return true + } + } + } + + override fun tryReturnRefusedValue(value: Unit): Boolean { + releaseRead() + return true + } + } + + suspend fun acquireRead(): Unit = R.loop { r -> + // Read the current [R] state. + val awf = r.awf + val wwf = r.wwf + val ar = r.ar + val wr = r.wr + // Is there an active or waiting writer? + if (!awf && !wwf) { + // There is no writer, try to grab a read lock! + assert(wr == 0) + val upd = constructR(false, false, ar + 1, 0) + if (R.compareAndSet(r, upd)) return + } else { + // This reader should wait for a lock, try to + // increment the number of waiting readers and suspend. + val upd = constructR(awf, wwf, ar, wr + 1) + if (R.compareAndSet(r, upd)) { + suspendCancellableCoroutine { sqsForReaders.suspend(it) } + return + } + } + } + + suspend fun acquireWrite() { + try_again@while (true) { + // Increment the number of waiting writers at first. + val w = W.getAndIncrement() + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + // Is this writer the first one? Check whether there are other + // writers waiting for the lock, an active one, or a concurrent + // [releaseWrite] invocation is in progress. + if (wla || wlrp) { + // Try to suspend and re-try the whole operation on failure. + suspendCancellableCoroutine { cont -> + sqsForWriters.suspend(cont) + } + return + } + // This writer is the first one. Set the `WF` flag and + // complete immediately if there is no reader. + while (true) { + val r = R.value + val awf = r.awf + val wwf = r.wwf + val ar = r.ar + val wr = r.wr + if (awf || wwf) { + // Try to suspend and re-try the whole operation on failure. + suspendCancellableCoroutine { cont -> + sqsForWriters.suspend(cont) + } + return + } + assert(wr == 0) { "The number of waiting readers should be 0 when the WF flag is not set"} + val acquired = ar == 0 + val rUpd = constructR(acquired, !acquired, ar, wr) + if (R.compareAndSet(r, rUpd)) { + // Is the lock acquired? Check the number of readers. + if (acquired) { + // Yes! The write lock is just acquired! + // Update `W` correspondingly. + W.update { w1 -> + val ww1 = w1.ww + val wla1 = w1.wla + val wlrp1 = w1.wlrp + val wrf1 = w1.wrf + assert(ww1 > 0) { "WW should be greater than 0 at least because of this `acquireWrite` invocation" } + assert(!wla1 && !wlrp1 && !wrf1) { "WLA, WLRP, and WRF flags should not be set here" } + constructW(ww1 - 1, true, false, false) + } + return + } + // There were active readers at the point of `WF` flag placing. + // Try to suspend and re-try the whole operation on failure. + // Note, that the thread that fails on resumption should + // re-set the WF flag if required. + suspendCancellableCoroutine { cont -> + sqsForWriters.suspend(cont) + } + return + } + } + } + } + + fun releaseRead() { + while (true) { + // Read the current [R] state + val r = R.value + val wwf = r.wwf + val ra = r.ar + val wr = r.wr + assert(ra > 0) { "No active reader to release" } + assert(!r.awf) { "Write lock cannot be acquired when there is an active reader" } + // Try to decrement the number of active readers + val awfUpd = wwf && ra == 1 + val wwfUpd = wwf && !awfUpd + val upd = constructR(awfUpd, wwfUpd, ra - 1, wr) + if (!R.compareAndSet(r, upd)) continue + // Check whether the current reader is the last one, + // and resume the first writer if the `WF` flag is set. + if (!awfUpd) return + while (true) { + // Either WLA, WLA or WLRP should be + // non-zero when the WF flag is set. + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(!wla) { "There should be no active writer at this point" } + assert(!wrf) { "The WRF flag cannot be set at this point" } + // Is there still a concurrent [releaseWrite]? + // Try to delegate the resumption work in this case. + if (wlrp) { + val updW = constructW(ww, wla, true, true) + if (W.compareAndSet(w, updW)) return + } else { + if (ww == 0) return // cancellation is happening TODO + assert(ww > 0) { "At most one waiting writer should be registered at this point" } + // Try to set the `WLA` flag and decrement + // the number of waiting writers. + val updW = constructW(ww - 1, true, false, false) + if (!W.compareAndSet(w, updW)) continue + // Try to resume the first waiting writer. + sqsForWriters.resume(Unit) + return + } + } + } + } + + fun releaseWrite() { + // Phase 1. Try to resume the next waiting writer + // or re-set the WLA flag and set the WLRP one. + while (true) { + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(wla) { "Write lock is not acquired" } + assert(!wlrp && !wrf) { "WLRP and WRF flags should not be set in the beginning" } + // Is there a waiting writer? + if (ww > 0) { + val updW = constructW(ww - 1, true, false, false) + if (W.compareAndSet(w, updW) && sqsForWriters.resume(Unit)) return + } else { + assert(ww == 0) { "WW can be negative" } + // Re-set the WLA flag and set the WLRP one. + val updW = constructW(ww, false, true, false) + if (W.compareAndSet(w, updW)) break + } + } + // Phase 2. Re-set the WF flag + // and resume readers if needed. + while (true) { + val r = R.value + val awf = r.awf + val wwf = r.wwf + val ar = r.ar + val wr = r.wr + assert(ar == 0) { "There should be no active reader while the write lock is acquired" } + assert(awf) { "AWF should be set here" } + assert(!wwf) { "WWF should not be set here" } + // Re-set the WF flag and resume the waiting readers logically. + if (R.compareAndSet(r, constructR(false, false, wr, 0))) { + // Were there waiting readers? + // Resume them physically if needed. + repeat(wr) { + val success = sqsForReaders.resume(Unit) + assert(success) { "Reader resumption cannot fail because of the smart cancellation in the SQS" } + } + break + } + } + // Phase 3. Try to re-set the WLRP flag + // if there is no waiting writer. + while (true) { + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(wlrp) { "WLRP flag should still be set" } + assert(!wla) { "There should be no active writer at this point: $this" } + assert(!wrf) { "WRF flag can be set only when the WF flag is put back"} + // Is there a waiting writer? + if (ww != 0) break + // No waiting writers, try + // to complete immediately. + val updW = constructW(0, false, false, false) + if (W.compareAndSet(w, updW)) return + } + // Phase 4. There is a waiting writer, + // set the WF flag back and try to grab + // the write lock. + var acquired: Boolean + while (true) { + val r = R.value + val awf = r.awf + val wwf = r.wwf + val ar = r.ar + val wr = r.wr + assert(!awf && !wwf) { "The WF flag should not be set at this point" } + assert(wr == 0) { "The number of waiting readers shold be 0 when the WF flag is not set" } + acquired = ar == 0 + val updR = constructR(acquired, !acquired, ar, 0) + // Try to set the WF flag. + if (R.compareAndSet(r, updR)) break + } + // Phase 5. Re-set the WLRP flag and try to resume + // the next waiting writer if the write lock is grabbed. + while (true) { + val w = W.value + val ww = w.ww + val wla = w.wla + val wlrp = w.wlrp + val wrf = w.wrf + assert(wlrp) { "WLRP flag should still be set" } + assert(!wla) { "WLA cannot be set here" } + val resume = acquired || wrf + if (resume && ww == 0) { + val updW = constructW(0, true, false, false) + if (!W.compareAndSet(w, updW)) continue + releaseWrite() + return + } else { + val updW = constructW(if (resume) ww - 1 else ww, resume, false, false) + if (!W.compareAndSet(w, updW)) continue + if (resume) sqsForWriters.resume(Unit) + return + } + } + } + + override fun toString() = "R=<${R.value.awf},${R.value.wwf},${R.value.ar},${R.value.wr}>," + + "W=<${W.value.ww},${W.value.wla},${W.value.wlrp},${W.value.wrf}>" + + companion object { + inline val Long.awf: Boolean get() = this and AWF_BIT != 0L + const val AWF_BIT = 1L shl 62 + + inline val Long.wwf: Boolean get() = this and WWF_BIT != 0L + const val WWF_BIT = 1L shl 61 + + inline val Long.wr: Int get() = (this and WAITERS_MASK).toInt() + const val WAITERS_MASK = (1L shl 30) - 1L + + inline val Long.ar: Int get() = ((this and ACTIVE_MASK) shr 30).toInt() + const val ACTIVE_MASK = (1L shl 60) - 1L - WAITERS_MASK + + @Suppress("NOTHING_TO_INLINE") + inline fun constructR(awf: Boolean, wwf: Boolean, ar: Int, wr: Int): Long { + var res = 0L + if (awf) res += AWF_BIT + if (wwf) res += WWF_BIT + res += ar.toLong() shl 30 + res += wr.toLong() + return res + } + + const val WRITER_ACTIVE_FLAG: Int = 10_000_000 + const val RELEASE_IN_PROGRESS_FLAG = 100_000_000 + const val WRITER_RESUMED_FLAG = 1_000_000_000 + + inline val Int.ww : Int get() = this % WRITER_ACTIVE_FLAG + inline val Int.wla : Boolean get() = (this / WRITER_ACTIVE_FLAG) % 10 == 1 + inline val Int.wlrp : Boolean get() = (this / RELEASE_IN_PROGRESS_FLAG) % 10 == 1 + inline val Int.wrf : Boolean get() = (this / WRITER_RESUMED_FLAG) % 10 == 1 + + inline fun constructW(ww: Int, wla: Boolean, wlrp: Boolean, wrf: Boolean): Int { + var res = ww + if (wla) res += WRITER_ACTIVE_FLAG + if (wlrp) res += RELEASE_IN_PROGRESS_FLAG + if (wrf) res += WRITER_RESUMED_FLAG + return res + } + } +} + +internal class ReadWriteMutexTest : TestBase() { + @Test + fun `simple single-coroutine test`() = runTest { + val m = ReadWriteMutex() + m.acquireRead() + m.acquireRead() + m.releaseRead() + m.releaseRead() + m.acquireWrite() + m.releaseWrite() + m.acquireRead() + } + + @Test + fun `simple multiple coroutines test`() = runTest { + val m = ReadWriteMutex() + m.acquireRead() + expect(1) + launch { + expect(2) + m.acquireRead() + expect(3) + } + yield() + expect(4) + launch { + expect(5) + m.acquireWrite() + expect(8) + } + yield() + expect(6) + m.releaseRead() + yield() + expect(7) + m.releaseRead() + yield() + finish(9) + } + + @Test + fun `acquireRead does not suspend after cancelled acquireWrite`() = runTest { + val m = ReadWriteMutex() + m.acquireRead() + val wJob = launch { + expect(1) + m.acquireWrite() + expectUnreached() + } + yield() + expect(2) + wJob.cancel() + launch { + expect(3) + m.acquireRead() + expect(4) + } + yield() + finish(5) + } +} + +internal class ReadWriteMutexCounterLCStressTest { + val m = ReadWriteMutex() + var c = 0 + + @Operation(cancellableOnSuspension = true, allowExtraSuspension = true) + suspend fun inc(): Int = try { + m.acquireWrite() + c++ + } finally { + m.releaseWrite() + } + + @Operation(cancellableOnSuspension = true, allowExtraSuspension = true) + suspend fun get(): Int = try { + m.acquireRead() + c + } finally { + m.releaseRead() + } + + @StateRepresentation + fun stateRepresentation(): String = "$c+$m" + + @Test + fun test2() = ModelCheckingOptions() + .iterations(50) + .actorsBefore(0) + .actorsAfter(0) + .threads(2) + .actorsPerThread(3) + .logLevel(LoggingLevel.INFO) + .invocationsPerIteration(100_000) + .sequentialSpecification(ReadWriteMutexCounterSequential::class.java) + .check(this::class) + + @Test + fun test() = StressOptions() + .actorsBefore(0) + .actorsAfter(0) + .threads(3) + .actorsPerThread(5) + .invocationsPerIteration(100_000) + .logLevel(LoggingLevel.INFO) + .sequentialSpecification(ReadWriteMutexCounterSequential::class.java) + .check(this::class) +} + +class ReadWriteMutexCounterSequential : VerifierState() { + var c = 0 + suspend fun inc() = c++ + suspend fun get() = c + + override fun extractState() = c +} + +class ReadWriteMutexLCStressTest { + private val m = ReadWriteMutex() + private val readLockAcquired = IntArray(6) + private val writeLockAcquired = BooleanArray(6) + + @Operation(cancellableOnSuspension = true, allowExtraSuspension = true) + suspend fun acquireRead(@Param(gen = ThreadIdGen::class) threadId: Int) { + m.acquireRead() + readLockAcquired[threadId]++ + } + + @Operation(cancellableOnSuspension = true, allowExtraSuspension = true) + suspend fun acquireWrite(@Param(gen = ThreadIdGen::class) threadId: Int) { + m.acquireWrite() + assert(!writeLockAcquired[threadId]) { "The mutex is not reentrant" } + writeLockAcquired[threadId] = true + } + + @Operation + fun releaseRead(@Param(gen = ThreadIdGen::class) threadId: Int): Boolean { + if (readLockAcquired[threadId] == 0) return false + m.releaseRead() + readLockAcquired[threadId]-- + return true + } + + @Operation + fun releaseWrite(@Param(gen = ThreadIdGen::class) threadId: Int): Boolean { + if (!writeLockAcquired[threadId]) return false + m.releaseWrite() + writeLockAcquired[threadId] = false + return true + } + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .actorsAfter(0) + .threads(3) + .actorsPerThread(5) + .invocationsPerIteration(500_000) + .logLevel(LoggingLevel.INFO) + .sequentialSpecification(ReadWriteMutexSequential::class.java) + .check(this::class) + + @Test + fun test2() = ModelCheckingOptions() + .iterations(50) + .actorsBefore(0) + .actorsAfter(0) + .threads(3) + .actorsPerThread(3) + .logLevel(LoggingLevel.INFO) + .invocationsPerIteration(500_000) + .sequentialSpecification(ReadWriteMutexSequential::class.java) + .check(this::class) + +// @StateRepresentation +// fun stateRepresentation() = m.toString() +} + +class ReadWriteMutexSequential : VerifierState() { + private var activeReaders = 0 + private var writeLockedOrWaiting = false + private val waitingReaders = ArrayList>() + private val waitingWriters = ArrayList>() + // Thread-local info + private val readLockAcquired = IntArray(6) + private val writeLockAcquired = BooleanArray(6) + + suspend fun acquireRead(threadId: Int) { + if (writeLockedOrWaiting) { + suspendCancellableCoroutine { cont -> + waitingReaders.add(cont) + cont.invokeOnCancellation { + waitingReaders.remove(cont) + } + } + } else { + activeReaders++ + } + readLockAcquired[threadId]++ + } + + suspend fun acquireWrite(threadId: Int) { + if (activeReaders > 0 || writeLockedOrWaiting) { + writeLockedOrWaiting = true + suspendCancellableCoroutine { cont -> + waitingWriters.add(cont) + cont.invokeOnCancellation { + waitingWriters.remove(cont) + if (waitingWriters.isEmpty() && writeLockAcquired.all { !it }) { + writeLockedOrWaiting = false + activeReaders += waitingReaders.size + waitingReaders.forEach { it.resume(Unit) } + waitingReaders.clear() + } + } + } + } else { + writeLockedOrWaiting = true + } + writeLockAcquired[threadId] = true + } + + fun releaseRead(threadId: Int): Boolean { + if (readLockAcquired[threadId] == 0) return false + readLockAcquired[threadId]-- + activeReaders-- + if (activeReaders == 0 && writeLockedOrWaiting) { + while (waitingWriters.isNotEmpty()) { + val w = waitingWriters.removeAt(0) + if (w.tryResume0(Unit)) return true + } + writeLockedOrWaiting = false + val resumedReaders = waitingReaders.map { it.tryResume0(Unit) }.filter { it }.count() + waitingReaders.clear() + activeReaders = resumedReaders + return true + } + return true + } + + fun releaseWrite(threadId: Int): Boolean { + if (!writeLockAcquired[threadId]) return false + writeLockAcquired[threadId] = false + while (waitingWriters.isNotEmpty()) { + val w = waitingWriters.removeAt(0) + if (w.tryResume0(Unit)) return true + } + writeLockedOrWaiting = false + val resumedReaders = waitingReaders.map { it.tryResume0(Unit) }.filter { it }.count() + waitingReaders.clear() + activeReaders = resumedReaders + return true + } + + override fun extractState() = "$activeReaders, $writeLockedOrWaiting, ${readLockAcquired.contentToString()}, ${writeLockAcquired.contentToString()}" +} + +// ############## +// # SEMAPHORES # +// ############## + +/** + * This [Semaphore] implementation is similar to the one in the library, + * but uses the [asynchronous][ASYNC] mode for resumptions. However, + * it is hard to make [tryAcquire] linearizable in this case, so that + * it is simply not supported here. + */ +internal class AsyncSemaphore(permits: Int) : SegmentQueueSynchronizer(), Semaphore { + override val resumeMode get() = ASYNC + + private val _availablePermits = atomic(permits) + override val availablePermits get() = _availablePermits.value.coerceAtLeast(0) + + override fun tryAcquire() = error("Not supported in the ASYNC version") + + override suspend fun acquire() { + // Decrement the number of available permits. + val p = _availablePermits.getAndDecrement() + // Is the permit successfully acquired? + if (p > 0) return + // Suspend otherwise. + suspendCancellableCoroutine { cont -> + check(suspend(cont)) { "Should not fail in ASYNC mode" } + } + } + + override fun release() { + while (true) { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is there a waiter that should be resumed? + if (p >= 0) return + // Try to resume the first waiter, and + // re-start the operation if it is cancelled. + if (resume(Unit)) return + } + } +} + +/** + * This semaphore implementation is correct only if [release] is always + * invoked after a successful [acquire]; in other words, when semaphore + * is used properly, without unexpected [release] invocations. The main + * advantage is using smart cancellation, so that [release] always works + * in constant time under no contention, and the cancelled [acquire] + * requests do not play any role. It is worth noting, that it is possible + * to make this implementation correct under not atomic but strong cancellation + * model, when continuation can be cancelled if it is logically resumed + * but not dispatched yet. + */ +internal class AsyncSemaphoreSmart(permits: Int) : SegmentQueueSynchronizer(), Semaphore { + override val resumeMode get() = ASYNC + override val cancellationMode get() = SMART_SYNC + + private val _availablePermits = atomic(permits) + override val availablePermits get() = _availablePermits.value.coerceAtLeast(0) + + override fun tryAcquire() = error("Not supported in the ASYNC version") + + override suspend fun acquire() { + // Decrement the number of available permits. + val p = _availablePermits.getAndDecrement() + // Is the permit acquired? + if (p > 0) return + // Suspend otherwise. + suspendCancellableCoroutine { cont -> + check(suspend(cont)) { "Should not fail in ASYNC mode" } + } + } + + override fun release() { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is there a waiter that should be resumed? + if (p >= 0) return + // Resume the first waiter. Due to the smart + // cancellation it is possible that this + // permit will be refused, so that the real + // release can come with a small lag, but it + // is guaranteed to be processed eventually. + resume(Unit) + } + + override fun onCancellation(): Boolean { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Return `true` if there is no `release` which + // is going to resume us and cannot skip us and + // resume the next waiter. + return p < 0 + } +} + +/** + * This implementation is similar to the previous one, but uses [synchronous][SYNC] + * resumption mode, so that it is possible to implement [tryAcquire] atomically. + * The only notable difference happens when a permit to be released is refused, + * and the following [resume] attempt in the cancellation handler fails due to + * the synchronization on resumption, so that the permit is going to be returned + * back to the semaphore in [returnValue] function. + */ +internal class SyncSemaphoreSmart(permits: Int) : SegmentQueueSynchronizer(), Semaphore { + override val resumeMode get() = SYNC + override val cancellationMode get() = SMART_SYNC + + private val _availablePermits = atomic(permits) + override val availablePermits get() = _availablePermits.value.coerceAtLeast(0) + + override suspend fun acquire() { + while (true) { + // Decrement the number of available permits. + val p = _availablePermits.getAndDecrement() + // Is the permit acquired? + if (p > 0) return + // Try to suspend otherwise. + val acquired = suspendCancellableCoroutine { cont -> + if (!suspend(cont)) cont.resume(false) + } + if (acquired) return + } + } + + override fun tryAcquire(): Boolean = _availablePermits.loop { cur -> + // Try to decrement the number of available + // permits if it is greater than zero. + if (cur <= 0) return false + if (_availablePermits.compareAndSet(cur, cur -1)) return true + } + + override fun release() { + while (true) { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Is there a waiter that should be resumed? + if (p >= 0) return + // Try to resume the first waiter, can fail + // according to the SYNC mode contract. + if (resume(true)) return + } + } + + override fun onCancellation(): Boolean { + // Increment the number of available permits. + val p = _availablePermits.getAndIncrement() + // Return `true` if there is no `release` which + // is going to resume us and cannot skip us and + // resume the next waiter. + return p < 0 + } + + override fun returnValue(value: Boolean) { + // Simply release the permit. + release() + } +} + +abstract class AsyncSemaphoreLCStressTestBase(semaphore: Semaphore, val seqSpec: KClass<*>) { + private val s = semaphore + + @Operation + suspend fun acquire() = s.acquire() + + @Operation + fun release() = s.release() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .sequentialSpecification(seqSpec.java) + .check(this::class) +} + +class SemaphoreUnboundedSequential1 : SemaphoreSequential(1, false) +class SemaphoreUnboundedSequential2 : SemaphoreSequential(2, false) + +class AsyncSemaphore1LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphore(1), SemaphoreUnboundedSequential1::class) +class AsyncSemaphore2LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphore(2), SemaphoreUnboundedSequential2::class) + +class AsyncSemaphoreSmart1LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphoreSmart(1), SemaphoreUnboundedSequential1::class) +class AsyncSemaphoreSmart2LCStressTest : AsyncSemaphoreLCStressTestBase(AsyncSemaphoreSmart(2), SemaphoreUnboundedSequential2::class) + +class SyncSemaphoreSmart1LCStressTest : SemaphoreLCStressTestBase(SyncSemaphoreSmart(1), SemaphoreUnboundedSequential1::class) +class SyncSemaphoreSmart2LCStressTest : SemaphoreLCStressTestBase(SyncSemaphoreSmart(2), SemaphoreUnboundedSequential2::class) + + +// #################################### +// # COUNT-DOWN-LATCH SYNCHRONIZATION # +// #################################### + +/** + * This primitive allows to wait until several operation are completed. + * It is initialized with a given count, and each [countDown] invocation + * decrements the count of remaining operations to be completed. At the + * same time, [await] waits until the count reaches zero. + * + * This implementation uses simple cancellation, so that the [countDown] + * invocation that reaches the counter zero works in a linear of the number of [await] + * invocations, including the ones that are already cancelled. + */ +internal open class CountDownLatch(count: Int) : SegmentQueueSynchronizer() { + override val resumeMode get() = ASYNC + + private val count = atomic(count) + // The number of suspended `await` invocation. + // DONE_MARK should be set when the count reaches + // zero, so that the following suspension attempts + // can fail and complete immediately. + private val waiters = atomic(0) + + protected fun decWaiters() = waiters.decrementAndGet() + + /** + * Decrements the count and resumes waiting + * [await] invocations if it reaches zero. + */ + fun countDown() { + // Decrement the count. + val r = count.decrementAndGet() + // Should the waiters be resumed? + if (r <= 0) resumeWaiters() + } + + private fun resumeWaiters() { + val w = waiters.getAndUpdate { cur -> + // Is the done mark set? + if (cur and DONE_MARK != 0) return + cur or DONE_MARK + } + // This thread has successfully set + // the mark, resume the waiters. + repeat(w) { resume(Unit) } + } + + /** + * Waits until the count reaches zero, + * completes immediately if it is already zero. + */ + suspend fun await() { + // Check whether the count has been reached zero, + // this can be considered as an optimization. + if (remaining() == 0) return + // Increment the number of waiters and check + // that DONE_MARK is not set, finish otherwise. + val w = waiters.incrementAndGet() + if (w and DONE_MARK != 0) return + // The number of waiters is + // successfully incremented, suspend. + suspendCancellableCoroutine { suspend(it) } + } + + /** + * Return the current count. + */ + fun remaining(): Int = count.value.coerceAtLeast(0) + + protected companion object { + const val DONE_MARK = 1 shl 31 + } +} + +/** + * This implementation uses a smarter cancellation mechanism, so that the + * [countDown] invocation that reaches the counter zero works in linear of + * the number of non-cancelled [await] invocations. This way, it does not matter + * how many [await] requests has been cancelled -- they do not play any role. + */ +internal class CountDownLatchSmart(count: Int) : CountDownLatch(count) { + override val cancellationMode get() = SMART_ASYNC + + override fun onCancellation(): Boolean { + // Decrement the number of waiters. + val w = decWaiters() + // Succeed if the DONE_MARK is not set yet. + return (w and DONE_MARK) == 0 + } +} + +internal abstract class CountDownLatchLCStressTestBase(val cdl: CountDownLatch, val seqSpec: KClass<*>) { + @Operation + fun countDown() = cdl.countDown() + + @Operation + fun remaining() = cdl.remaining() + + @Operation + suspend fun await() = cdl.await() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .actorsAfter(0) + .sequentialSpecification(seqSpec.java) + .check(this::class) +} + +class CountDownLatchSequential1 : CountDownLatchSequential(1) +class CountDownLatchSequential2 : CountDownLatchSequential(2) + +internal class CountDownLatch1LCStressTest : CountDownLatchLCStressTestBase(CountDownLatch(1), CountDownLatchSequential1::class) +internal class CountDownLatch2LCStressTest : CountDownLatchLCStressTestBase(CountDownLatch(2), CountDownLatchSequential2::class) + +internal class CountDownLatchSmart1LCStressTest : CountDownLatchLCStressTestBase(CountDownLatchSmart(1), CountDownLatchSequential1::class) +internal class CountDownLatchSmart2LCStressTest : CountDownLatchLCStressTestBase(CountDownLatchSmart(2), CountDownLatchSequential2::class) + +open class CountDownLatchSequential(initialCount: Int) : VerifierState() { + private var count = initialCount + private val waiters = ArrayList>() + + fun countDown() { + if (--count == 0) { + waiters.forEach { it.tryResume0(Unit) } + waiters.clear() + } + } + + suspend fun await() { + if (count <= 0) return + suspendCancellableCoroutine { cont -> + waiters.add(cont) + } + } + + fun remaining(): Int = count.coerceAtLeast(0) + + override fun extractState() = remaining() +} + + +// ########################### +// # BARRIER SYNCHRONIZATION # +// ########################### + +/** + * This synchronization primitive allows a set of coroutines to + * all wait for each other to reach a common barrier point. + * + * The implementation is straightforward: it maintains a counter + * of arrived coroutines and increments it in the beginning of + * [arrived] operation. The last coroutines should resume all the + * previous ones. + * + * In case of cancellation, the handler decrements the counter if + * not all the parties are arrived. However, it is impossible to + * make cancellation atomic (e.g., Java's implementation simply + * does not work in case of thread interruption) since there is + * no way to resume a set of coroutines atomically. Thus, + * this implementation is non-atomic if cancellation happens + * simultaneously to the last [arrive], but is correct under + * the strong cancellation model, when continuation can be + * cancelled if it is logically resumed but not dispatched yet. + */ +internal class Barrier(private val parties: Int) : SegmentQueueSynchronizer() { + override val resumeMode get() = ASYNC + override val cancellationMode get() = SMART_ASYNC + + // The number of coroutines arrived to this barrier point. + private val arrived = atomic(0L) + + /** + * Waits for other parties and returns `true`. + * Fails if this invocation exceeds the number + * of parties, returns `false` in this case. + */ + suspend fun arrive(): Boolean { + // Increment the number of arrived parties. + val a = arrived.incrementAndGet() + return when { + // Should we suspend? + a < parties -> { + suspendCoroutine { cont -> suspend(cont) } + true + } + // Are we the last party? + a == parties.toLong() -> { + // Resume all waiters. + repeat(parties - 1) { + resume(Unit) + } + true + } + // Should we fail? + else -> false + } + } + + override fun onCancellation(): Boolean { + // Decrement the number of arrived parties if possible. + arrived.loop { cur -> + // Are we going to be resumed? + // The resumption permit should be refused in this case. + if (cur == parties.toLong()) return false + // Successful cancellation, return `true`. + if (arrived.compareAndSet(cur, cur - 1)) return true + } + } +} + +// TODO: non-atomic cancellation test, the corresponding feature in lincheck is required. +abstract class BarrierLCStressTestBase(parties: Int, val seqSpec: KClass<*>) { + private val b = Barrier(parties) + + @Operation(cancellableOnSuspension = false) + suspend fun arrive() = b.arrive() + + @Test + fun test() = LCStressOptionsDefault() + .actorsBefore(0) + .actorsAfter(0) + .threads(3) + .sequentialSpecification(seqSpec.java) + .check(this::class) +} + +class BarrierSequential1 : BarrierSequential(1) +class Barrier1LCStressTest : BarrierLCStressTestBase(1, BarrierSequential1::class) +class BarrierSequential2 : BarrierSequential(2) +class Barrier2LCStressTest : BarrierLCStressTestBase(2, BarrierSequential2::class) +class BarrierSequential3 : BarrierSequential(3) +class Barrier3LCStressTest : BarrierLCStressTestBase(3, BarrierSequential3::class) + +open class BarrierSequential(parties: Int) : VerifierState() { + private var remainig = parties + private val waiters = ArrayList>() + + suspend fun arrive(): Boolean { + val r = --remainig + return when { + r > 0 -> { + suspendCancellableCoroutine { cont -> + waiters.add(cont) + cont.invokeOnCancellation { + remainig++ + waiters.remove(cont) + } + } + true + } + r == 0 -> { + waiters.forEach { it.resume(Unit) } + true + } + else -> false + } + } + + override fun extractState() = remainig > 0 +} + + +// ################## +// # BLOCKING POOLS # +// ################## + +/** + * While using resources such as database connections, sockets, etc., + * it is typical to reuse them; that requires a fast and handy mechanism. + * This [BlockingPool] abstraction maintains a set of elements that can be put + * into the pool for further reuse or be retrieved to process the current operation. + * When [retrieve] comes to an empty pool, it blocks, and the following [put] operation + * resumes it; all the waiting requests are processed in the first-in-first-out (FIFO) order. + * + * In our tests we consider two pool implementations: the [queue-based][BlockingQueuePool] + * and the [stack-based][BlockingStackPool]. Intuitively, the queue-based implementation is + * faster since it is built on arrays and uses `Fetch-And-Add`-s on the contended path, + * while the stack-based pool retrieves the last inserted, thus the "hottest", elements. + * + * Please note that both these implementations are not atomic and can retrieve elements + * out-of-order under some races. However, since pools by themselves do not guarantee + * that the stored elements are ordered (the one may consider them as bags), + * these queue- and stack-based versions should be considered as pools with specific heuristics. + */ +interface BlockingPool { + /** + * Either resumes the first waiting [retrieve] operation + * and passes the [element] to it, or simply put the + * [element] into the pool. + */ + fun put(element: T) + + /** + * Retrieves one of the elements from the pool + * (the order is not specified), or suspends if it is + * empty -- the following [put] operations resume + * waiting [retrieve]-s in the first-in-first-out order. + */ + suspend fun retrieve(): T +} + +/** + * This pool uses queue under the hood and implemented withing the simple cancellation technique. + */ +internal class BlockingQueuePool : SegmentQueueSynchronizer(), BlockingPool { + override val resumeMode get() = ASYNC + + // > 0 -- number of elements; + // = 0 -- empty pool; + // < 0 -- number of waiters. + private val availableElements = atomic(0L) + + // This is an infinite array by design, a plain array is used for simplicity. + private val elements = atomicArrayOfNulls(100) + + // Indices in [elements] for the next [tryInsert] and [tryRetrieve] operations. + // Each [tryInsert]/[tryRetrieve] pair works on a separate slot. When [tryRetrieve] + // comes earlier, it marks the slot as [BROKEN] so that both this operation and the + // corresponding [tryInsert] fail. + private val insertIdx = atomic(0) + private val retrieveIdx = atomic(0) + + override fun put(element: T) { + while (true) { + // Increment the number of elements in advance. + val b = availableElements.getAndIncrement() + // Is there a waiting `retrieve`? + if (b < 0) { + // Try to resume the first waiter, + // can fail if it is already cancelled. + if (resume(element)) return + } else { + // Try to insert the element into the + // queue, can fail if the slot is broken. + if (tryInsert(element)) return + } + } + } + + /** + * Tries to insert the [element] into the next + * [elements] array slot. Returns `true` if + * succeeds, or `false` if the slot is [broken][BROKEN]. + */ + private fun tryInsert(element: T): Boolean { + val i = insertIdx.getAndIncrement() + return elements[i].compareAndSet(null, element) + } + + override suspend fun retrieve(): T { + while (true) { + // Decrements the number of elements. + val b = availableElements.getAndDecrement() + // Is there an element in the pool? + if (b > 0) { + // Try to retrieve the first element, + // can fail if the first [elements] slot + // is empty due to a race. + val x = tryRetrieve() + if (x != null) return x + } else { + // The pool is empty, suspend. + return suspendCancellableCoroutine { cont -> + suspend(cont) + } + } + } + } + + /** + * Tries to retrieve the first element from + * the [elements] array. Returns the element if + * succeeds, or `null` if the first slot is empty + * due to a race -- it marks the slot as [broken][BROKEN] + * in this case, so that the corresponding [tryInsert] + * invocation fails. + */ + private fun tryRetrieve(): T? { + val i = retrieveIdx.getAndIncrement() + return elements[i].getAndSet(BROKEN) as T? + } + + companion object { + @JvmStatic + val BROKEN = Symbol("BROKEN") + } +} + +/** + * This pool uses stack under the hood and shows how to use smart cancellation + * for such data structures with resources. + */ +internal class BlockingStackPool : SegmentQueueSynchronizer(), BlockingPool { + override val resumeMode get() = ASYNC + override val cancellationMode get() = SMART_SYNC + + // The stack is implemented via a concurrent linked list, + // this is its head; `null` means that the stack is empty. + private val head = atomic?>(null) + + // > 0 -- number of elements; + // = 0 -- empty pool; + // < 0 -- number of waiters. + private val availableElements = atomic(0) + + override fun put(element: T) { + while (true) { + // Increment the number of elements in advance. + val b = availableElements.getAndIncrement() + // Is there a waiting retrieve? + if (b < 0) { + // Resume the first waiter, never fails + // in the smart cancellation mode. + resume(element) + return + } else { + // Try to insert the element into the + // stack, can fail if a concurrent [tryRetrieve] + // came earlier and marked it with a failure node. + if (tryInsert(element)) return + } + } + } + + /** + * Tries to insert the [element] into the stack. + * Returns `true` on success`, or `false` if the + * stack is marked with a failure node, retrieving + * it in this case. + */ + private fun tryInsert(element: T): Boolean = head.loop { h -> + // Is the stack marked with a failure node? + if (h != null && h.element == null) { + // Try to retrieve the failure node. + if (head.compareAndSet(h, h.next)) return false + } else { + // Try to insert the element. + val newHead = StackNode(element, h) + if (head.compareAndSet(h, newHead)) return true + } + } + + override suspend fun retrieve(): T { + while (true) { + // Decrement the number of elements. + val b = availableElements.getAndDecrement() + // Is there an element in the pool? + if (b > 0) { + // Try to retrieve the top element, + // can fail if the stack if empty + // due to a race. + val x = tryRetrieve() + if (x != null) return x + } else { + // The pool is empty, suspend. + return suspendCancellableCoroutine { cont -> + suspend(cont) + } + } + } + } + + /** + * Try to retrieve the top (last) element and return `true` + * if the stack is not empty, or return `false` and + * insert a failure node otherwise. + */ + private fun tryRetrieve(): T? = head.loop { h -> + // Is the queue empty or full of failure nodes? + if (h == null || h.element == null) { + // Try to add one more failure node and fail. + val failNode = StackNode(null, h) + if (head.compareAndSet(h, failNode)) return null + } else { + // Try to retrieve the top element. + if (head.compareAndSet(h, h.next)) return h.element + } + } + + // The logic of cancellation is very similar to the one + // in semaphore, with the only difference that elements + // should be physically returned to the pool. + override fun onCancellation(): Boolean { + val b = availableElements.getAndIncrement() + return b < 0 + } + + // If an element is refused, it should be inserted back to the stack. + override fun tryReturnRefusedValue(value: T) = tryInsert(value) + + // In order to return the value back + // to the pool [put] is naturally used. + override fun returnValue(value: T) = put(value) + + class StackNode(val element: T?, val next: StackNode?) +} + +abstract class BlockingPoolLCStressTestBase(val p: BlockingPool) { + @Operation + fun put() = p.put(Unit) + + @Operation + suspend fun retrieve() = p.retrieve() + + @Test + fun test() = LCStressOptionsDefault() + .sequentialSpecification(BlockingPoolUnitSequential::class.java) + .check(this::class) +} +class BlockingQueuePoolLCStressTest : BlockingPoolLCStressTestBase(BlockingQueuePool()) +class BlockingStackPoolLCStressTest : BlockingPoolLCStressTestBase(BlockingStackPool()) + +class BlockingPoolUnitSequential : VerifierState() { + private var elements = 0 + private val waiters = ArrayList>() + + fun put() { + while (true) { + if (waiters.isNotEmpty()) { + val w = waiters.removeAt(0) + if (w.tryResume0(Unit)) return + } else { + elements ++ + return + } + } + } + + suspend fun retrieve() { + if (elements > 0) { + elements-- + } else { + suspendCancellableCoroutine { cont -> + waiters.add(cont) + } + } + } + + override fun extractState() = elements +} + + +// ############# +// # UTILITIES # +// ############# + +/** + * Tries to resume this continuation atomically, + * returns `true` if succeeds and `false` otherwise. + */ +private fun CancellableContinuation.tryResume0(value: T): Boolean { + val token = tryResume(value) ?: return false + completeResume(token) + return true +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt index 52902f4987..9fa3a3da9f 100644 --- a/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/linearizability/SemaphoreLCStressTest.kt @@ -6,29 +6,70 @@ package kotlinx.coroutines.linearizability import kotlinx.coroutines.* import kotlinx.coroutines.sync.* +import org.jetbrains.kotlinx.lincheck.* import org.jetbrains.kotlinx.lincheck.annotations.Operation import org.jetbrains.kotlinx.lincheck.verifier.* import org.junit.* +import kotlin.reflect.* -abstract class SemaphoreLCStressTestBase(permits: Int) : VerifierState() { - private val semaphore = Semaphore(permits) +abstract class SemaphoreLCStressTestBase(semaphore: Semaphore, val seqSpec: KClass<*>) { + private val s = semaphore @Operation - fun tryAcquire() = semaphore.tryAcquire() + fun tryAcquire() = s.tryAcquire() @Operation - suspend fun acquire() = semaphore.acquire() + suspend fun acquire() = s.acquire() @Operation(handleExceptionsAsResult = [IllegalStateException::class]) - fun release() = semaphore.release() + fun release() = s.release() @Test fun test() = LCStressOptionsDefault() .actorsBefore(0) + .sequentialSpecification(seqSpec.java) .check(this::class) +} + +open class SemaphoreSequential(val permits: Int, val boundMaxPermits: Boolean) : VerifierState() { + private var availablePermits = permits + private val waiters = ArrayList>() + + fun tryAcquire(): Boolean { + if (availablePermits <= 0) return false + availablePermits-- + return true + } + + suspend fun acquire() { + if (tryAcquire()) return + availablePermits-- + suspendCancellableCoroutine { cont -> + waiters.add(cont) + } + } - override fun extractState() = semaphore.availablePermits + fun release() { + while (true) { + check(availablePermits < permits || !boundMaxPermits) + availablePermits++ + if (availablePermits > 0) return + val w = waiters.removeAt(0) + if (w.tryResume0(Unit)) return + } + } + + override fun extractState() = availablePermits.coerceAtLeast(0) } -class Semaphore1LCStressTest : SemaphoreLCStressTestBase(1) -class Semaphore2LCStressTest : SemaphoreLCStressTestBase(2) \ No newline at end of file +class SemaphoreSequential1 : SemaphoreSequential(1, true) +class Semaphore1LCStressTest : SemaphoreLCStressTestBase(Semaphore(1), SemaphoreSequential1::class) + +class SemaphoreSequential2 : SemaphoreSequential(2, true) +class Semaphore2LCStressTest : SemaphoreLCStressTestBase(Semaphore(2), SemaphoreSequential2::class) + +private fun CancellableContinuation.tryResume0(value: T): Boolean { + val token = tryResume(value) ?: return false + completeResume(token) + return true +} \ No newline at end of file