Skip to content

Commit

Permalink
Unlock Mutex and release Semaphore during cancellation on a fast bran…
Browse files Browse the repository at this point in the history
…ch of slow-path in Mutex/Semaphore

Fixes #2390
  • Loading branch information
qwwdfsad committed Nov 17, 2020
1 parent a5dfc23 commit 999a304
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 51 deletions.
13 changes: 2 additions & 11 deletions kotlinx-coroutines-core/common/src/sync/Mutex.kt
Expand Up @@ -201,17 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// try lock
val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
if (_state.compareAndSet(state, update)) { // locked
val token = cont.tryResume(Unit, idempotent = null) {
// if this continuation gets cancelled during dispatch to the caller, then release
// the lock
unlock(owner)
}
if (token != null) {
cont.completeResume(token)
} else {
// failure to get token implies already cancelled
unlock(owner)
}
// TODO implement functional type in LockCont as soon as we get rid of legacy JS
cont.resume(Unit) { unlock(owner) }
return@sc
}
}
Expand Down
7 changes: 3 additions & 4 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Expand Up @@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
if (addAcquireToQueue(cont)) return@sc
val p = _availablePermits.getAndDecrement()
if (p > 0) { // permit acquired
cont.resume(Unit)
cont.resume(Unit, onCancellationRelease)
return@sc
}
}
Expand Down Expand Up @@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
// On CAS failure -- the cell must be either PERMIT or BROKEN
// If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
// The following resume must always succeed, since continuation was not published yet and we don't have
// to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled
cont.resume(Unit)
/// This continuation is not yet published, but still can be cancelled via outer job
cont.resume(Unit, onCancellationRelease)
return true
}
assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
Expand Down
34 changes: 0 additions & 34 deletions kotlinx-coroutines-core/common/test/sync/MutexTest.kt
Expand Up @@ -9,9 +9,6 @@ import kotlinx.coroutines.*
import kotlin.test.*

class MutexTest : TestBase() {
private val enterCount = atomic(0)
private val releasedCount = atomic(0)

@Test
fun testSimple() = runTest {
val mutex = Mutex()
Expand Down Expand Up @@ -110,35 +107,4 @@ class MutexTest : TestBase() {
assertFalse(mutex.holdsLock(firstOwner))
assertFalse(mutex.holdsLock(secondOwner))
}

@Test
fun cancelLock() = runTest() {
val mutex = Mutex()
enterCount.value = 0
releasedCount.value = 0
repeat(1000) {
val job = launch(Dispatchers.Default) {
val owner = Any()
try {
enterCount.incrementAndGet()
mutex.withLock(owner) {}
// repeat to give an increase in race probability
mutex.withLock(owner) {}
} finally {
// should be no way lock is still held by owner here
if (mutex.holdsLock(owner)) {
// if it is held, ensure test case doesn't lockup
mutex.unlock(owner)
} else {
releasedCount.incrementAndGet()
}
}
}
mutex.withLock {
job.cancel()
}
job.join()
}
assertEquals(enterCount.value, releasedCount.value)
}
}
19 changes: 18 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt
Expand Up @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() {
}
}
}
}

@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val mutex = Mutex()
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
mutex.lock()
mutex.unlock()
}
mutex.withLock {
job.cancel()
}
job.join()
assertFalse { mutex.isLocked }
}
}
}
20 changes: 19 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
Expand Up @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync

import kotlinx.coroutines.*
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.*

class SemaphoreStressTest : TestBase() {
@Test
Expand Down Expand Up @@ -90,4 +90,22 @@ class SemaphoreStressTest : TestBase() {
}
}
}

@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val semaphore = Semaphore(1)
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
semaphore.acquire()
semaphore.release()
}
semaphore.withPermit {
job.cancel()

}
job.join()
assertTrue { semaphore.availablePermits == 1 }
}
}
}

0 comments on commit 999a304

Please sign in to comment.