Skip to content

Commit

Permalink
~ Fixed race in testRejectOnResumeInContext
Browse files Browse the repository at this point in the history
  • Loading branch information
elizarov committed Sep 14, 2020
1 parent fda826c commit f86636f
Showing 1 changed file with 20 additions and 3 deletions.
23 changes: 20 additions & 3 deletions kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import kotlinx.coroutines.scheduling.*
import org.junit.*
import org.junit.Test
Expand Down Expand Up @@ -68,17 +69,22 @@ class RejectedExecutionTest : TestBase() {
withContext(Dispatchers.Default) {
expect(3)
assertDefaultDispatcherThread()
// We have to wait until caller executor thread had already suspended (if not running task),
// so that we resume back to it a new task is posted
executor.awaitNotRunningTask()
expect(4)
assertDefaultDispatcherThread()
}
// cancelled on resume back
} finally {
expect(4)
expect(5)
assertIoThread()
}
expectUnreached()
}
}
assertEquals(2, executor.submittedTasks)
finish(5)
finish(6)
}

@Test
Expand Down Expand Up @@ -124,12 +130,23 @@ class RejectedExecutionTest : TestBase() {
private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) {
var acceptTasks = 0
var submittedTasks = 0
val runningTask = MutableStateFlow(false)

override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> {
submittedTasks++
if (submittedTasks > acceptTasks) throw RejectedExecutionException()
return super.schedule(command, delay, unit)
val wrapper = Runnable {
runningTask.value = true
try {
command.run()
} finally {
runningTask.value = false
}
}
return super.schedule(wrapper, delay, unit)
}

suspend fun awaitNotRunningTask() = runningTask.first { !it }
}

private fun assertExecutorThread() {
Expand Down

0 comments on commit f86636f

Please sign in to comment.