From 4fe809f80a259ff41a77ba0257e44802907ccb80 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 18 Nov 2020 00:40:50 -0800 Subject: [PATCH] Unlock Mutex and release Semaphore during cancellation on a fast branch of slow-path in Mutex/Semaphore (#2396) Fixes #2390 Co-authored-by: Gareth Pearce --- .../common/src/sync/Mutex.kt | 3 ++- .../common/src/sync/Semaphore.kt | 7 +++---- .../common/test/sync/MutexTest.kt | 3 ++- .../jvm/test/sync/MutexStressTest.kt | 19 ++++++++++++++++++- .../jvm/test/sync/SemaphoreStressTest.kt | 19 ++++++++++++++++++- 5 files changed, 43 insertions(+), 8 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/sync/Mutex.kt b/kotlinx-coroutines-core/common/src/sync/Mutex.kt index 73aaab5fbf..707c4640bc 100644 --- a/kotlinx-coroutines-core/common/src/sync/Mutex.kt +++ b/kotlinx-coroutines-core/common/src/sync/Mutex.kt @@ -201,7 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2 { // try lock val update = if (owner == null) EMPTY_LOCKED else Empty(owner) if (_state.compareAndSet(state, update)) { // locked - cont.resume(Unit) + // TODO implement functional type in LockCont as soon as we get rid of legacy JS + cont.resume(Unit) { unlock(owner) } return@sc } } diff --git a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt index 84b7f4f8a2..c342bb3009 100644 --- a/kotlinx-coroutines-core/common/src/sync/Semaphore.kt +++ b/kotlinx-coroutines-core/common/src/sync/Semaphore.kt @@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se if (addAcquireToQueue(cont)) return@sc val p = _availablePermits.getAndDecrement() if (p > 0) { // permit acquired - cont.resume(Unit) + cont.resume(Unit, onCancellationRelease) return@sc } } @@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se // On CAS failure -- the cell must be either PERMIT or BROKEN // If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair - // The following resume must always succeed, since continuation was not published yet and we don't have - // to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled - cont.resume(Unit) + /// This continuation is not yet published, but still can be cancelled via outer job + cont.resume(Unit, onCancellationRelease) return true } assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it diff --git a/kotlinx-coroutines-core/common/test/sync/MutexTest.kt b/kotlinx-coroutines-core/common/test/sync/MutexTest.kt index c5d0ccf187..4f428bc4b0 100644 --- a/kotlinx-coroutines-core/common/test/sync/MutexTest.kt +++ b/kotlinx-coroutines-core/common/test/sync/MutexTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines.sync +import kotlinx.atomicfu.* import kotlinx.coroutines.* import kotlin.test.* @@ -106,4 +107,4 @@ class MutexTest : TestBase() { assertFalse(mutex.holdsLock(firstOwner)) assertFalse(mutex.holdsLock(secondOwner)) } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt index bb713b258d..027f3c514d 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() { } } } -} \ No newline at end of file + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val mutex = Mutex() + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + mutex.lock() + mutex.unlock() + } + mutex.withLock { + job.cancel() + } + job.join() + assertFalse { mutex.isLocked } + } + } +} diff --git a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt index 374a1e3d7c..2ceed64b95 100644 --- a/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync import kotlinx.coroutines.* import org.junit.Test -import kotlin.test.assertEquals +import kotlin.test.* class SemaphoreStressTest : TestBase() { @Test @@ -90,4 +90,21 @@ class SemaphoreStressTest : TestBase() { } } } + + @Test + fun testShouldBeUnlockedOnCancellation() = runTest { + val semaphore = Semaphore(1) + val n = 1000 * stressTestMultiplier + repeat(n) { + val job = launch(Dispatchers.Default) { + semaphore.acquire() + semaphore.release() + } + semaphore.withPermit { + job.cancel() + } + job.join() + assertTrue { semaphore.availablePermits == 1 } + } + } }