/
SemaphoreStressTest.kt
110 lines (104 loc) · 3.38 KB
/
SemaphoreStressTest.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package kotlinx.coroutines.sync
import kotlinx.coroutines.*
import org.junit.Test
import kotlin.test.*
class SemaphoreStressTest : TestBase() {
@Test
fun stressTestAsMutex() = runBlocking(Dispatchers.Default) {
val n = 10_000 * stressTestMultiplier
val k = 100
var shared = 0
val semaphore = Semaphore(1)
val jobs = List(n) {
launch {
repeat(k) {
semaphore.acquire()
shared++
semaphore.release()
}
}
}
jobs.forEach { it.join() }
assertEquals(n * k, shared)
}
@Test
fun stressTest() = runBlocking(Dispatchers.Default) {
val n = 10_000 * stressTestMultiplier
val k = 100
val semaphore = Semaphore(10)
val jobs = List(n) {
launch {
repeat(k) {
semaphore.acquire()
semaphore.release()
}
}
}
jobs.forEach { it.join() }
}
@Test
fun stressCancellation() = runBlocking(Dispatchers.Default) {
val n = 10_000 * stressTestMultiplier
val semaphore = Semaphore(1)
semaphore.acquire()
repeat(n) {
val job = launch {
semaphore.acquire()
}
yield()
job.cancelAndJoin()
}
assertEquals(0, semaphore.availablePermits)
semaphore.release()
assertEquals(1, semaphore.availablePermits)
}
/**
* This checks if repeated releases that race with cancellations put
* the semaphore into an incorrect state where permits are leaked.
*/
@Test
fun stressReleaseCancelRace() = runTest {
val n = 10_000 * stressTestMultiplier
val semaphore = Semaphore(1, 1)
newSingleThreadContext("SemaphoreStressTest").use { pool ->
repeat (n) {
// Initially, we hold the permit and no one else can `acquire`,
// otherwise it's a bug.
assertEquals(0, semaphore.availablePermits)
var job1EnteredCriticalSection = false
val job1 = launch(start = CoroutineStart.UNDISPATCHED) {
semaphore.acquire()
job1EnteredCriticalSection = true
semaphore.release()
}
// check that `job1` didn't finish the call to `acquire()`
assertEquals(false, job1EnteredCriticalSection)
val job2 = launch(pool) {
semaphore.release()
}
// Because `job2` executes in a separate thread, this
// cancellation races with the call to `release()`.
job1.cancelAndJoin()
job2.join()
assertEquals(1, semaphore.availablePermits)
semaphore.acquire()
}
}
}
@Test
fun testShouldBeUnlockedOnCancellation() = runTest {
val semaphore = Semaphore(1)
val n = 1000 * stressTestMultiplier
repeat(n) {
val job = launch(Dispatchers.Default) {
semaphore.acquire()
semaphore.release()
}
semaphore.withPermit {
job.cancel()
}
job.join()
assertTrue { semaphore.availablePermits == 1 }
}
}
}