Skip to content

Commit

Permalink
Optimize mutex implementation (#2851)
Browse files Browse the repository at this point in the history
* Get rid of addLastIf and DCSS primitive during contention
* Leverage constants returned by tryResume* and simplify signatures
  • Loading branch information
qwwdfsad committed Aug 2, 2021
1 parent 8baa736 commit 7e762d3
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 23 deletions.
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/CancellableContinuation.kt
Expand Up @@ -81,6 +81,10 @@ public interface CancellableContinuation<in T> : Continuation<T> {
* Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not
* delivered to the caller because of the dispatch in the process, so that atomicity delivery
* guaranteed can be provided by having a cancellation fallback.
*
* Implementation note: current implementation always returns RESUME_TOKEN or `null`
*
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any?
Expand Down
70 changes: 47 additions & 23 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Expand Up @@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlin.native.concurrent.*

Expand Down Expand Up @@ -124,8 +123,6 @@ private val LOCK_FAIL = Symbol("LOCK_FAIL")
@SharedImmutable
private val UNLOCK_FAIL = Symbol("UNLOCK_FAIL")
@SharedImmutable
private val SELECT_SUCCESS = Symbol("SELECT_SUCCESS")
@SharedImmutable
private val LOCKED = Symbol("LOCKED")
@SharedImmutable
private val UNLOCKED = Symbol("UNLOCKED")
Expand Down Expand Up @@ -191,7 +188,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
}

private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable<Unit> sc@ { cont ->
val waiter = LockCont(owner, cont)
var waiter = LockCont(owner, cont)
_state.loop { state ->
when (state) {
is Empty -> {
Expand All @@ -210,11 +207,24 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
is LockedQueue -> {
val curOwner = state.owner
check(curOwner !== owner) { "Already locked by $owner" }
if (state.addLastIf(waiter) { _state.value === state }) {
// added to waiter list!

state.addLast(waiter)
/*
* If the state has been changed while we were adding the waiter,
* it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten.
* To rendezvous that, we try to "invalidate" our node and go for retry.
*
* Node has to be re-instantiated as we do not support node re-adding, even to
* another list
*/
if (_state.value === state || !waiter.take()) {
// added to waiter list
cont.removeOnCancellation(waiter)
return@sc
}

waiter = LockCont(owner, cont)
return@loop
}
is OpDescriptor -> state.perform(this) // help
else -> error("Illegal state $state")
Expand Down Expand Up @@ -252,8 +262,17 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
is LockedQueue -> {
check(state.owner !== owner) { "Already locked by $owner" }
val node = LockSelect(owner, select, block)
if (state.addLastIf(node) { _state.value === state }) {
// successfully enqueued
/*
* If the state has been changed while we were adding the waiter,
* it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten.
* To rendezvous that, we try to "invalidate" our node and go for retry.
*
* Node has to be re-instantiated as we do not support node re-adding, even to
* another list
*/
state.addLast(node)
if (_state.value === state || !node.take()) {
// added to waiter list
select.disposeOnSelect(node)
return
}
Expand Down Expand Up @@ -300,7 +319,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
}
}

public override fun unlock(owner: Any?) {
override fun unlock(owner: Any?) {
_state.loop { state ->
when (state) {
is Empty -> {
Expand All @@ -319,10 +338,9 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
val op = UnlockOp(state)
if (_state.compareAndSet(state, op) && op.perform(this) == null) return
} else {
val token = (waiter as LockWaiter).tryResumeLockWaiter()
if (token != null) {
if ((waiter as LockWaiter).tryResumeLockWaiter()) {
state.owner = waiter.owner ?: LOCKED
waiter.completeResumeLockWaiter(token)
waiter.completeResumeLockWaiter()
return
}
}
Expand Down Expand Up @@ -352,31 +370,37 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
private abstract inner class LockWaiter(
@JvmField val owner: Any?
) : LockFreeLinkedListNode(), DisposableHandle {
private val isTaken = atomic<Boolean>(false)
fun take(): Boolean = isTaken.compareAndSet(false, true)
final override fun dispose() { remove() }
abstract fun tryResumeLockWaiter(): Any?
abstract fun completeResumeLockWaiter(token: Any)
abstract fun tryResumeLockWaiter(): Boolean
abstract fun completeResumeLockWaiter()
}

private inner class LockCont(
owner: Any?,
@JvmField val cont: CancellableContinuation<Unit>
private val cont: CancellableContinuation<Unit>
) : LockWaiter(owner) {
override fun tryResumeLockWaiter() = cont.tryResume(Unit, idempotent = null) {
// if this continuation gets cancelled during dispatch to the caller, then release the lock
unlock(owner)

override fun tryResumeLockWaiter(): Boolean {
if (!take()) return false
return cont.tryResume(Unit, idempotent = null) {
// if this continuation gets cancelled during dispatch to the caller, then release the lock
unlock(owner)
} != null
}
override fun completeResumeLockWaiter(token: Any) = cont.completeResume(token)
override fun toString(): String = "LockCont[$owner, $cont] for ${this@MutexImpl}"

override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN)
override fun toString(): String = "LockCont[$owner, ${cont}] for ${this@MutexImpl}"
}

private inner class LockSelect<R>(
owner: Any?,
@JvmField val select: SelectInstance<R>,
@JvmField val block: suspend (Mutex) -> R
) : LockWaiter(owner) {
override fun tryResumeLockWaiter(): Any? = if (select.trySelect()) SELECT_SUCCESS else null
override fun completeResumeLockWaiter(token: Any) {
assert { token === SELECT_SUCCESS }
override fun tryResumeLockWaiter(): Boolean = take() && select.trySelect()
override fun completeResumeLockWaiter() {
block.startCoroutineCancellable(receiver = this@MutexImpl, completion = select.completion) {
// if this continuation gets cancelled during dispatch to the caller, then release the lock
unlock(owner)
Expand Down

0 comments on commit 7e762d3

Please sign in to comment.