diff --git a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt index 67ed740767..a6f4dd6b18 100644 --- a/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt +++ b/kotlinx-coroutines-core/jvm/test/RejectedExecutionTest.kt @@ -4,6 +4,7 @@ package kotlinx.coroutines +import kotlinx.coroutines.flow.* import kotlinx.coroutines.scheduling.* import org.junit.* import org.junit.Test @@ -68,17 +69,22 @@ class RejectedExecutionTest : TestBase() { withContext(Dispatchers.Default) { expect(3) assertDefaultDispatcherThread() + // We have to wait until caller executor thread had already suspended (if not running task), + // so that we resume back to it a new task is posted + executor.awaitNotRunningTask() + expect(4) + assertDefaultDispatcherThread() } // cancelled on resume back } finally { - expect(4) + expect(5) assertIoThread() } expectUnreached() } } assertEquals(2, executor.submittedTasks) - finish(5) + finish(6) } @Test @@ -124,12 +130,23 @@ class RejectedExecutionTest : TestBase() { private inner class RejectingExecutor : ScheduledThreadPoolExecutor(1, { r -> Thread(r, threadName) }) { var acceptTasks = 0 var submittedTasks = 0 + val runningTask = MutableStateFlow(false) override fun schedule(command: Runnable, delay: Long, unit: TimeUnit): ScheduledFuture<*> { submittedTasks++ if (submittedTasks > acceptTasks) throw RejectedExecutionException() - return super.schedule(command, delay, unit) + val wrapper = Runnable { + runningTask.value = true + try { + command.run() + } finally { + runningTask.value = false + } + } + return super.schedule(wrapper, delay, unit) } + + suspend fun awaitNotRunningTask() = runningTask.first { !it } } private fun assertExecutorThread() {