From 7e762d324aeeb34b53390dee931569013809660e Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Mon, 2 Aug 2021 13:36:34 +0300 Subject: [PATCH] Optimize mutex implementation (#2851) * Get rid of addLastIf and DCSS primitive during contention * Leverage constants returned by tryResume* and simplify signatures --- .../common/src/CancellableContinuation.kt | 4 ++ .../common/src/sync/Mutex.kt | 70 +++++++++++++------ 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index b133b7935d..2c2f1b8ff6 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -81,6 +81,10 @@ public interface CancellableContinuation : Continuation { * Same as [tryResume] but with [onCancellation] handler that called if and only if the value is not * delivered to the caller because of the dispatch in the process, so that atomicity delivery * guaranteed can be provided by having a cancellation fallback. + * + * Implementation note: current implementation always returns RESUME_TOKEN or `null` + * + * @suppress **This is unstable API and it is subject to change.** */ @InternalCoroutinesApi public fun tryResume(value: T, idempotent: Any?, onCancellation: ((cause: Throwable) -> Unit)?): Any? diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 7d0a343d95..19584e0981 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -10,7 +10,6 @@ import kotlinx.coroutines.internal.* import kotlinx.coroutines.intrinsics.* import kotlinx.coroutines.selects.* import kotlin.contracts.* -import kotlin.coroutines.* import kotlin.jvm.* import kotlin.native.concurrent.* @@ -124,8 +123,6 @@ private val LOCK_FAIL = Symbol("LOCK_FAIL") @SharedImmutable private val UNLOCK_FAIL = Symbol("UNLOCK_FAIL") @SharedImmutable -private val SELECT_SUCCESS = Symbol("SELECT_SUCCESS") -@SharedImmutable private val LOCKED = Symbol("LOCKED") @SharedImmutable private val UNLOCKED = Symbol("UNLOCKED") @@ -191,7 +188,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { } private suspend fun lockSuspend(owner: Any?) = suspendCancellableCoroutineReusable sc@ { cont -> - val waiter = LockCont(owner, cont) + var waiter = LockCont(owner, cont) _state.loop { state -> when (state) { is Empty -> { @@ -210,11 +207,24 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { is LockedQueue -> { val curOwner = state.owner check(curOwner !== owner) { "Already locked by $owner" } - if (state.addLastIf(waiter) { _state.value === state }) { - // added to waiter list! + + state.addLast(waiter) + /* + * If the state has been changed while we were adding the waiter, + * it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten. + * To rendezvous that, we try to "invalidate" our node and go for retry. + * + * Node has to be re-instantiated as we do not support node re-adding, even to + * another list + */ + if (_state.value === state || !waiter.take()) { + // added to waiter list cont.removeOnCancellation(waiter) return@sc } + + waiter = LockCont(owner, cont) + return@loop } is OpDescriptor -> state.perform(this) // help else -> error("Illegal state $state") @@ -252,8 +262,17 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { is LockedQueue -> { check(state.owner !== owner) { "Already locked by $owner" } val node = LockSelect(owner, select, block) - if (state.addLastIf(node) { _state.value === state }) { - // successfully enqueued + /* + * If the state has been changed while we were adding the waiter, + * it means that 'unlock' has taken it and _either_ resumed it successfully or just overwritten. + * To rendezvous that, we try to "invalidate" our node and go for retry. + * + * Node has to be re-instantiated as we do not support node re-adding, even to + * another list + */ + state.addLast(node) + if (_state.value === state || !node.take()) { + // added to waiter list select.disposeOnSelect(node) return } @@ -300,7 +319,7 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { } } - public override fun unlock(owner: Any?) { + override fun unlock(owner: Any?) { _state.loop { state -> when (state) { is Empty -> { @@ -319,10 +338,9 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { val op = UnlockOp(state) if (_state.compareAndSet(state, op) && op.perform(this) == null) return } else { - val token = (waiter as LockWaiter).tryResumeLockWaiter() - if (token != null) { + if ((waiter as LockWaiter).tryResumeLockWaiter()) { state.owner = waiter.owner ?: LOCKED - waiter.completeResumeLockWaiter(token) + waiter.completeResumeLockWaiter() return } } @@ -352,21 +370,28 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { private abstract inner class LockWaiter( @JvmField val owner: Any? ) : LockFreeLinkedListNode(), DisposableHandle { + private val isTaken = atomic(false) + fun take(): Boolean = isTaken.compareAndSet(false, true) final override fun dispose() { remove() } - abstract fun tryResumeLockWaiter(): Any? - abstract fun completeResumeLockWaiter(token: Any) + abstract fun tryResumeLockWaiter(): Boolean + abstract fun completeResumeLockWaiter() } private inner class LockCont( owner: Any?, - @JvmField val cont: CancellableContinuation + private val cont: CancellableContinuation ) : LockWaiter(owner) { - override fun tryResumeLockWaiter() = cont.tryResume(Unit, idempotent = null) { - // if this continuation gets cancelled during dispatch to the caller, then release the lock - unlock(owner) + + override fun tryResumeLockWaiter(): Boolean { + if (!take()) return false + return cont.tryResume(Unit, idempotent = null) { + // if this continuation gets cancelled during dispatch to the caller, then release the lock + unlock(owner) + } != null } - override fun completeResumeLockWaiter(token: Any) = cont.completeResume(token) - override fun toString(): String = "LockCont[$owner, $cont] for ${this@MutexImpl}" + + override fun completeResumeLockWaiter() = cont.completeResume(RESUME_TOKEN) + override fun toString(): String = "LockCont[$owner, ${cont}] for ${this@MutexImpl}" } private inner class LockSelect( @@ -374,9 +399,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { @JvmField val select: SelectInstance, @JvmField val block: suspend (Mutex) -> R ) : LockWaiter(owner) { - override fun tryResumeLockWaiter(): Any? = if (select.trySelect()) SELECT_SUCCESS else null - override fun completeResumeLockWaiter(token: Any) { - assert { token === SELECT_SUCCESS } + override fun tryResumeLockWaiter(): Boolean = take() && select.trySelect() + override fun completeResumeLockWaiter() { block.startCoroutineCancellable(receiver = this@MutexImpl, completion = select.completion) { // if this continuation gets cancelled during dispatch to the caller, then release the lock unlock(owner)