Skip to content

Commit

Permalink
Unlock Mutex and release Semaphore during cancellation on a fast bran…
Browse files Browse the repository at this point in the history
…ch of slow-path in Mutex/Semaphore (#2396)


Fixes #2390

Co-authored-by: Gareth Pearce <Tilps@users.noreply.github.com>
  • Loading branch information
qwwdfsad and Tilps committed Nov 18, 2020
1 parent 8ca5296 commit 4fe809f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 8 deletions.
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/src/sync/Mutex.kt
Expand Up @@ -201,7 +201,8 @@ internal class MutexImpl(locked: Boolean) : Mutex, SelectClause2<Any?, Mutex> {
// try lock
val update = if (owner == null) EMPTY_LOCKED else Empty(owner)
if (_state.compareAndSet(state, update)) { // locked
cont.resume(Unit)
// TODO implement functional type in LockCont as soon as we get rid of legacy JS
cont.resume(Unit) { unlock(owner) }
return@sc
}
}
Expand Down
7 changes: 3 additions & 4 deletions kotlinx-coroutines-core/common/src/sync/Semaphore.kt
Expand Up @@ -172,7 +172,7 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
if (addAcquireToQueue(cont)) return@sc
val p = _availablePermits.getAndDecrement()
if (p > 0) { // permit acquired
cont.resume(Unit)
cont.resume(Unit, onCancellationRelease)
return@sc
}
}
Expand Down Expand Up @@ -206,9 +206,8 @@ private class SemaphoreImpl(private val permits: Int, acquiredPermits: Int) : Se
// On CAS failure -- the cell must be either PERMIT or BROKEN
// If the cell already has PERMIT from tryResumeNextFromQueue, try to grab it
if (segment.cas(i, PERMIT, TAKEN)) { // took permit thus eliminating acquire/release pair
// The following resume must always succeed, since continuation was not published yet and we don't have
// to pass onCancellationRelease handle, since the coroutine did not suspend yet and cannot be cancelled
cont.resume(Unit)
/// This continuation is not yet published, but still can be cancelled via outer job
cont.resume(Unit, onCancellationRelease)
return true
}
assert { segment.get(i) === BROKEN } // it must be broken in this case, no other way around it
Expand Down
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/test/sync/MutexTest.kt
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.sync

import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlin.test.*

Expand Down Expand Up @@ -106,4 +107,4 @@ class MutexTest : TestBase() {
assertFalse(mutex.holdsLock(firstOwner))
assertFalse(mutex.holdsLock(secondOwner))
}
}
}
19 changes: 18 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/MutexStressTest.kt
Expand Up @@ -90,4 +90,21 @@ class MutexStressTest : TestBase() {
}
}
}
}

@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val mutex = Mutex()
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
mutex.lock()
mutex.unlock()
}
mutex.withLock {
job.cancel()
}
job.join()
assertFalse { mutex.isLocked }
}
}
}
19 changes: 18 additions & 1 deletion kotlinx-coroutines-core/jvm/test/sync/SemaphoreStressTest.kt
Expand Up @@ -2,7 +2,7 @@ package kotlinx.coroutines.sync

import kotlinx.coroutines.*
import org.junit.Test
import kotlin.test.assertEquals
import kotlin.test.*

class SemaphoreStressTest : TestBase() {
@Test
Expand Down Expand Up @@ -90,4 +90,21 @@ class SemaphoreStressTest : TestBase() {
}
}
}

@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val semaphore = Semaphore(1)
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
semaphore.acquire()
semaphore.release()
}
semaphore.withPermit {
job.cancel()
}
job.join()
assertTrue { semaphore.availablePermits == 1 }
}
}
}

0 comments on commit 4fe809f

Please sign in to comment.