From 999a304f57bd7e2eb93cdf9940fc3bb1ebcb93f6 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 17 Nov 2020 18:13:56 +0300 Subject: [PATCH] Unlock Mutex and release Semaphore during cancellation on a fast branch of slow-path in Mutex/Semaphore Fixes #2390 --- .../common/src/sync/Mutex.kt | 13 ++----- .../common/src/sync/Semaphore.kt | 7 ++-- .../common/test/sync/MutexTest.kt | 34 ------------------- .../jvm/test/sync/MutexStressTest.kt | 19 ++++++++++- .../jvm/test/sync/SemaphoreStressTest.kt | 20 ++++++++++- 5 files changed, 42 insertions(+), 51 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 36f62acd3b..707c4640bc 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -201,17 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { // try lock val update = if (owner == null) EMPTY_LOCKED else Empty(owner) if (_state.compareAndSet(state, update)) { // locked - val token = cont.tryResume(Unit, idempotent = null) { - // if this continuation gets cancelled during dispatch to the caller, then release - // the lock - unlock(owner) - } - if (token != null) { - cont.completeResume(token) - } else { - // failure to get token implies already cancelled - unlock(owner) - } + // TODO implement functional type in LockCont as soon as we get rid of legacy JS + cont.resume(Unit) { unlock(owner) } return@sc } } diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 84b7f4f8a2..c342bb3009 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se if (addAcquireToQueue(cont)) return@sc val p = _availablePermits.getAndDecrement() if (p > 0) { // permit acquired - cont.resume(Unit) + cont.resume(Unit, onCancellationRelease) return@sc } } @@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se // On CAS failure -- the cell must be either PERMIT or BROKEN // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair - // The following resume must always succeed, since continuation was not published yet and we don't have - // to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled - cont.resume(Unit) + /// This continuation is not yet published, but still can be cancelled via outer job + cont.resume(Unit, onCancellationRelease) return true } assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it diff --git a/kotlinx-coroutines-core/common/test/sync/MutexTest.kt b/kotlinx-coroutines-core/common/test/sync/MutexTest.kt index 0cfd48031a..4f428bc4b0 100644 --- a/kotlinx-coroutines-core/common/test/sync/MutexTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/MutexTest.kt @@ -9,9 +9,6 @@ import kotlinx.coroutines.* import kotlin.test.* class MutexTest : TestBase() { - private val enterCount = atomic(0) - private val releasedCount = atomic(0) - @Test fun testSimple() = runTest { val mutex = Mutex() @@ -110,35 +107,4 @@ class MutexTest : TestBase() { assertFalse(mutex.holdsLock(firstOwner)) assertFalse(mutex.holdsLock(secondOwner)) } - - @Test - fun cancelLock() = runTest() { - val mutex = Mutex() - enterCount.value = 0 - releasedCount.value = 0 - repeat(1000) { - val job = launch(Dispatchers.Default) { - val owner = Any() - try { - enterCount.incrementAndGet() - mutex.withLock(owner) {} - // repeat to give an increase in race probability - mutex.withLock(owner) {} - } finally { - // should be no way lock is still held by owner here - if (mutex.holdsLock(owner)) { - // if it is held, ensure test case doesn't lockup - mutex.unlock(owner) - } else { - releasedCount.incrementAndGet() - } - } - } - mutex.withLock { - job.cancel() - } - job.join() - } - assertEquals(enterCount.value, releasedCount.value) - } } diff --git a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt index bb713b258d..027f3c514d 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() { } } } -} \ No newline at end of file + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val mutex = Mutex() + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + mutex.lock() + mutex.unlock() + } + mutex.withLock { + job.cancel() + } + job.join() + assertFalse { mutex.isLocked } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt index 374a1e3d7c..672b743dc7 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync import kotlinx.coroutines.* import org.junit.Test -import kotlin.test.assertEquals +import kotlin.test.* class SemaphoreStressTest : TestBase() { @Test @@ -90,4 +90,22 @@ class SemaphoreStressTest : TestBase() { } } } + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val semaphore = Semaphore(1) + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + semaphore.acquire() + semaphore.release() + } + semaphore.withPermit { + job.cancel() + + } + job.join() + assertTrue { semaphore.availablePermits == 1 } + } + } }