Skip to content

Commit

Permalink
Introduce SegmentQueueSynchronizer abstraction for synchronization …
Browse files Browse the repository at this point in the history
…primitives and `ReadWriteMutex`
  • Loading branch information
ndkoval committed Jul 29, 2021
1 parent ea440c5 commit 54d5f05
Show file tree
Hide file tree
Showing 15 changed files with 2,731 additions and 207 deletions.
1 change: 1 addition & 0 deletions .idea/dictionaries/shared.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 5 additions & 6 deletions benchmarks/src/jmh/kotlin/benchmarks/SemaphoreBenchmark.kt
Expand Up @@ -14,8 +14,8 @@ import org.openjdk.jmh.annotations.*
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.TimeUnit

@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Warmup(iterations = 2, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 5, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
Expand All @@ -31,7 +31,6 @@ open class SemaphoreBenchmark {
private var _3_maxPermits: Int = 0

@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var _4_parallelism: Int = 0

Expand All @@ -51,7 +50,7 @@ open class SemaphoreBenchmark {
val semaphore = Semaphore(_3_maxPermits)
val jobs = ArrayList<Job>(coroutines)
repeat(coroutines) {
jobs += GlobalScope.launch {
jobs += GlobalScope.launch(dispatcher) {
repeat(n) {
semaphore.withPermit {
doGeomDistrWork(WORK_INSIDE)
Expand All @@ -69,7 +68,7 @@ open class SemaphoreBenchmark {
val semaphore = Channel<Unit>(_3_maxPermits)
val jobs = ArrayList<Job>(coroutines)
repeat(coroutines) {
jobs += GlobalScope.launch {
jobs += GlobalScope.launch(dispatcher) {
repeat(n) {
semaphore.send(Unit) // acquire
doGeomDistrWork(WORK_INSIDE)
Expand All @@ -89,4 +88,4 @@ enum class SemaphoreBenchDispatcherCreator(val create: (parallelism: Int) -> Cor

private const val WORK_INSIDE = 80
private const val WORK_OUTSIDE = 40
private const val BATCH_SIZE = 1000000
private const val BATCH_SIZE = 1_000_000
12 changes: 12 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -1271,6 +1271,18 @@ public final class kotlinx/coroutines/sync/MutexKt {
public static synthetic fun withLock$default (Lkotlinx/coroutines/sync/Mutex;Ljava/lang/Object;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/sync/ReadWriteMutex {
public abstract fun getWrite ()Lkotlinx/coroutines/sync/Mutex;
public abstract fun readLock (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun readUnlock ()V
}

public final class kotlinx/coroutines/sync/ReadWriteMutexKt {
public static final fun ReadWriteMutex ()Lkotlinx/coroutines/sync/ReadWriteMutex;
public static final fun read (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun write (Lkotlinx/coroutines/sync/ReadWriteMutex;Lkotlin/jvm/functions/Function0;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/sync/Semaphore {
public abstract fun acquire (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getAvailablePermits ()I
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/build.gradle
Expand Up @@ -249,8 +249,8 @@ static void configureJvmForLincheck(task) {
task.maxHeapSize = '6g' // we may need more space for building an interleaving tree in the model checking mode
task.jvmArgs = ['--add-opens', 'java.base/jdk.internal.misc=ALL-UNNAMED', // required for transformation
'--add-exports', 'java.base/jdk.internal.util=ALL-UNNAMED'] // in the model checking mode
task.systemProperty 'kotlinx.coroutines.semaphore.segmentSize', '2'
task.systemProperty 'kotlinx.coroutines.semaphore.maxSpinCycles', '1' // better for the model checking mode
task.systemProperty 'kotlinx.coroutines.sqs.segmentSize', '2'
task.systemProperty 'kotlinx.coroutines.sqs.maxSpinCycles', '1' // better for the model checking mode
}

task jdk16Test(type: Test, dependsOn: [compileTestKotlinJvm, checkJdk16]) {
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/Debug.common.kt
Expand Up @@ -8,6 +8,7 @@ internal expect val DEBUG: Boolean
internal expect val Any.hexAddress: String
internal expect val Any.classSimpleName: String
internal expect fun assert(value: () -> Boolean)
internal inline fun assertNot(crossinline value: () -> Boolean) = assert { !value() }

/**
* Throwable which can be cloned during stacktrace recovery in a class-specific way.
Expand Down
23 changes: 13 additions & 10 deletions kotlinx-coroutines-core/common/src/internal/ConcurrentLinkedList.kt
Expand Up @@ -150,11 +150,13 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
*/
fun remove() {
assert { removed } // The node should be logically removed at first.
assert { !isTail } // The physical tail cannot be removed.
// The physical tail cannot be removed. Instead, we remove it when
// a new segment is added and this segment is not the tail one anymore.
if (isTail) return
while (true) {
// Read `next` and `prev` pointers ignoring logically removed nodes.
val prev = leftmostAliveNode
val next = rightmostAliveNode
val prev = aliveSegmentLeft
val next = aliveSegmentRight
// Link `next` and `prev`.
next._prev.value = prev
if (prev !== null) prev._next.value = next
Expand All @@ -166,17 +168,17 @@ internal abstract class ConcurrentLinkedListNode<N : ConcurrentLinkedListNode<N>
}
}

private val leftmostAliveNode: N? get() {
private val aliveSegmentLeft: N? get() {
var cur = prev
while (cur !== null && cur.removed)
cur = cur._prev.value
return cur
}

private val rightmostAliveNode: N get() {
private val aliveSegmentRight: N get() {
assert { !isTail } // Should not be invoked on the tail node
var cur = next!!
while (cur.removed)
while (cur.removed && !cur.isTail)
cur = cur.next!!
return cur
}
Expand Down Expand Up @@ -204,19 +206,20 @@ internal abstract class Segment<S : Segment<S>>(val id: Long, prev: S?, pointers
* There are no pointers to this segment from outside, and
* it is not a physical tail in the linked list of segments.
*/
override val removed get() = cleanedAndPointers.value == maxSlots && !isTail
override val removed get() = cleanedAndPointers.value == maxSlots

// increments the number of pointers if this segment is not logically removed.
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots || isTail }
internal fun tryIncPointers() = cleanedAndPointers.addConditionally(1 shl POINTERS_SHIFT) { it != maxSlots }

// returns `true` if this segment is logically removed after the decrement.
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots && !isTail
internal fun decPointers() = cleanedAndPointers.addAndGet(-(1 shl POINTERS_SHIFT)) == maxSlots

/**
* Invoked on each slot clean-up; should not be invoked twice for the same slot.
*/
fun onSlotCleaned() {
if (cleanedAndPointers.incrementAndGet() == maxSlots && !isTail) remove()
if (cleanedAndPointers.incrementAndGet() < maxSlots) return
if (removed) remove()
}
}

Expand Down

0 comments on commit 54d5f05

Please sign in to comment.