From baade879d56bf8b59ed3e2d94611375edd42dadd Mon Sep 17 00:00:00 2001 From: Sean McQuillan Date: Fri, 17 May 2019 18:00:20 -0700 Subject: [PATCH 1/5] Support completing the test coroutine from outside the test thread. This requires a call to invokeOnCompletion to get the completion state from the other thread. Changes: - runBlockingTest now supports completing the test coroutine from another thread - success and failure path tests While fixing this, a subtle non-determinism was discovered that would lead to flakey tests. If another thread completed the test coroutine *during* the cleanup checks it was possible to modify the state of the test during the cleanup checks in a way that could expose false-positive or false-negative results randomly. To resolve this, a non-completed coroutine immediately after advanceUntilIdle is now considered a failure, even if it completes before the more aggressive cleanup checks. There is still a very brief window (between advanceTimeBy and getResultIfKnown) that non-determinism may be introduced, but it will fail with a descriptive error message at on random executions directing the developer to resolve the non-determinstic behavior. Note: testWithOddlyCompletingJob_fails may fail when the implementation of runBlockingTest changes (it's very tightly coupled). --- kotlinx-coroutines-test/src/TestBuilders.kt | 44 ++++++++- .../test/TestRunBlockingOrderTest.kt | 92 +++++++++++++++++++ 2 files changed, 133 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-test/src/TestBuilders.kt b/kotlinx-coroutines-test/src/TestBuilders.kt index 7ef77bd643..884a6f0774 100644 --- a/kotlinx-coroutines-test/src/TestBuilders.kt +++ b/kotlinx-coroutines-test/src/TestBuilders.kt @@ -46,18 +46,56 @@ public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, te val (safeContext, dispatcher) = context.checkArguments() val startingJobs = safeContext.activeJobs() val scope = TestCoroutineScope(safeContext) + val deferred = scope.async { scope.testBody() } + + // run any outstanding coroutines that can be completed by advancing virtual-time dispatcher.advanceUntilIdle() - deferred.getCompletionExceptionOrNull()?.let { - throw it - } + + // fetch results from the coroutine - this may require a thread hop if some child coroutine was *completed* on + // another thread during this test so we must use an invokeOnCompletion handler to retrieve the result. + + // There are two code paths for fetching the error: + // + // 1. The job was already completed (happy path, normal test) + // - invokeOnCompletion was executed immediately and errorThrownByTestOrNull is already at it's final value so + // we can throw it + // 2. The job has not already completed (always fail the test due to error or time-based non-determinism) + // - invokeOnCompletion will not be triggered right away. To avoid introducing wall non-deterministic behavior + // (the deferred may complete between here and the call to activeJobs below) this will always be considered a + // test failure. + // - this will not happen if all coroutines are only waiting to complete due to thread hops, but may happen + // if another thread triggers completion concurrently with this cleanup code. + // + // give test code errors a priority in the happy path, throw here if the error is already known. + val (wasCompletedAfterTest, errorThrownByTestOrNull) = deferred.getResultIfKnown() + errorThrownByTestOrNull?.let { throw it } + scope.cleanupTestCoroutines() val endingJobs = safeContext.activeJobs() if ((endingJobs - startingJobs).isNotEmpty()) { throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs") } + + if (!wasCompletedAfterTest) { + // Handle path #2, we are going to fail the test in an opinionated way at this point so let the developer know + // how to fix it. + throw UncompletedCoroutinesError("Test completed all jobs after cleanup code started. This is " + + "thrown to avoid non-deterministic behavior in tests (the next execution may fail randomly). Ensure " + + "all threads started by the test are completed before returning from runBlockingTest.") + } +} + +private fun Deferred.getResultIfKnown(): Pair { + var testError: Throwable? = null + var wasExecuted = false + invokeOnCompletion { errorFromTestOrNull -> + testError = errorFromTestOrNull + wasExecuted = true + }.dispose() + return Pair(wasExecuted, testError) } private fun CoroutineContext.activeJobs(): Set { diff --git a/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt b/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt index 0013a654a6..7082396340 100644 --- a/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt +++ b/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt @@ -6,7 +6,9 @@ package kotlinx.coroutines.test import kotlinx.coroutines.* import org.junit.* +import kotlin.concurrent.thread import kotlin.coroutines.* +import kotlin.test.assertEquals class TestRunBlockingOrderTest : TestBase() { @Test @@ -68,6 +70,96 @@ class TestRunBlockingOrderTest : TestBase() { expect(2) } + @Test + fun testNewThread_inSuspendCancellableCoroutine() = runBlockingTest { + expect(1) + suspendCancellableCoroutine { cont -> + expect(2) + thread { + expect(3) + cont.resume(Unit) + } + } + finish(4) + } + + @Test(expected = UncompletedCoroutinesError::class) + fun testWithOddlyCompletingJob_fails() { + // this test is suspect since it relies upon the exact ordering of code in runBlockingTest + // however, it needs to ensure the job finishes *after* advanceUntilIdle is called in order + // to ensure that runBlockingTest errors when presented with threading non-determinism. + + // this test is stable and will always pass unless the implementation changes. + + // If this starts failing because the call to cleanupTestCoroutines changes it will need a similarly + // implementation driven test. + + class FakeDispatcher(val delegate: TestCoroutineDispatcher): + CoroutineDispatcher(), + Delay by delegate, + DelayController by delegate { + private var cleanupCallback: (() -> Unit)? = null + + override fun dispatch(context: CoroutineContext, block: Runnable) { + delegate.dispatch(context, block) + } + + fun onCleanup(block: () -> Unit) { + cleanupCallback = block + } + + override fun cleanupTestCoroutines() { + delegate.cleanupTestCoroutines() + cleanupCallback?.invoke() + } + } + + val dispatcher = FakeDispatcher(TestCoroutineDispatcher()) + val scope = TestCoroutineScope(dispatcher) + val resumeAfterTest = CompletableDeferred() + + scope.runBlockingTest { + expect(1) + dispatcher.onCleanup { + // after advanceTimeUntilIdle, complete the launched coroutine + expect(3) + resumeAfterTest.complete(Unit) + finish(5) + } + expect(2) + resumeAfterTest.await() // this will resume just before child jobs are checked + expect(4) + } + } + + @Test + fun testThrows_throws() { + val expected = IllegalStateException("expected") + val result = runCatching { + expect(1) + runBlockingTest { + expect(2) + throw expected + } + } + finish(3) + assertEquals(expected, result.exceptionOrNull()) + } + + @Test + fun testSuspendForever_fails() { + val uncompleted = CompletableDeferred() + val result = runCatching { + expect(1) + runBlockingTest { + expect(2) + uncompleted.await() + } + } + finish(3) + assertEquals(true, result.isFailure) + } + @Test fun testAdvanceUntilIdle_inRunBlocking() = runBlockingTest { expect(1) From d3f40a9a165e2785ecf21bb34fbd7842dd53107f Mon Sep 17 00:00:00 2001 From: Sean McQuillan Date: Wed, 19 Jun 2019 11:48:15 -0700 Subject: [PATCH 2/5] Implement an eventLoop for runBlockingTest. As a result, runBlockingTest is now correct in the presence of repeated time-delayed dispatch from other dispatchers. Changes: - runBlockingTest will now allow a 30 second timeout for other dispatchers to complete coroutines - Introduced WaitConfig, SingleDispatcherWaitConfig, and MultiDispatcherWaitConfig to configure runBlockingTest timeout behavior - Added DelayController.queueState as a ConflatedBroadcastChannel to observe or poll the queue status of a TestCoroutineDispatcher - Added queue status of Idle, HasCurrentTask and HasDelayedTask as public APIs for quering the status of a TestCoroutineDispatcher - Added dependency on runBlocking from runBlockingTest - Improved documentation for threading concerns around resumeDispatcher --- .../src/DelayController.kt | 67 +++++ kotlinx-coroutines-test/src/TestBuilders.kt | 214 +++++++++++---- .../src/TestCoroutineDispatcher.kt | 60 ++++- .../test/TestRunBlockingOrderTest.kt | 247 ++++++++++++++---- .../test/TestRunBlockingTest.kt | 10 +- 5 files changed, 486 insertions(+), 112 deletions(-) diff --git a/kotlinx-coroutines-test/src/DelayController.kt b/kotlinx-coroutines-test/src/DelayController.kt index 54e9c8ae5e..b4abfa097d 100644 --- a/kotlinx-coroutines-test/src/DelayController.kt +++ b/kotlinx-coroutines-test/src/DelayController.kt @@ -2,6 +2,7 @@ package kotlinx.coroutines.test import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.ConflatedBroadcastChannel /** * Control the virtual clock time of a [CoroutineDispatcher]. @@ -112,9 +113,75 @@ public interface DelayController { * Resumed dispatchers will automatically progress through all coroutines scheduled at the current time. To advance * time and execute coroutines scheduled in the future use, one of [advanceTimeBy], * or [advanceUntilIdle]. + * + * When the dispatcher is resumed, all execution be immediate in the thread that triggered it. This means + * that the following code will not switch back from Dispatchers.IO after `withContext` + * + * ``` + * runBlockingTest { + * withContext(Dispatchers.IO) { doIo() } + * // runBlockingTest is still on Dispatchers.IO here + * } + * ``` + * + * For tests that need accurate threading behavior, [pauseDispatcher] will ensure that the following test dispatches + * on the correct thread. + * + * ``` + * runBlockingTest { + * pauseDispatcher() + * withContext(Dispatchers.IO) { doIo() } + * // runBlockingTest has returned to it's starting thread here + * } + * ``` */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public fun resumeDispatcher() + + /** + * Represents the queue state of a DelayController. + * + * Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple + * [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest]. + */ + public sealed class QueueState { + /** + * A [DelayController] that is idle does not currently have any tasks to perform. + * + * This may happen if all coroutines in this [DelayController] have completed, or if they are all suspended + * waiting on other dispatchers. + */ + public object Idle: QueueState() { + override fun toString() = "Idle" + } + + /** + * A [DelayController] that has a task that will execute in response to a call to [runCurrent]. + * + * There may also be delayed tasks scheduled, in which case [HasCurrentTask] takes priority since current tasks + * will execute at an earlier virtual time. + */ + public object HasCurrentTask: QueueState() { + override fun toString() = "HasCurrentTask" + } + + /** + * A [DelayController] that has delayed tasks has a task scheduled for sometime in the future. + * + * If there are also tasks at the current time, [HasCurrentTask] will take priority. + */ + public object HasDelayedTask: QueueState(){ + override fun toString() = "HasDelayedTask" + } + } + + /** + * A ConflatedBroadcastChannel that is up to date with the current [QueueState]. + * + * Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple + * [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest]. + */ + public val queueState: ConflatedBroadcastChannel } /** diff --git a/kotlinx-coroutines-test/src/TestBuilders.kt b/kotlinx-coroutines-test/src/TestBuilders.kt index 884a6f0774..59a2ccc9ea 100644 --- a/kotlinx-coroutines-test/src/TestBuilders.kt +++ b/kotlinx-coroutines-test/src/TestBuilders.kt @@ -5,8 +5,88 @@ package kotlinx.coroutines.test import kotlinx.coroutines.* +import kotlinx.coroutines.selects.select +import java.lang.StringBuilder import kotlin.coroutines.* +private const val DEFAULT_TEST_TIMEOUT = 30_000L + +/** + * A strategy for waiting on coroutines executed on other dispatchers inside a [runBlockingTest]. + * + * Most tests should use [MultiDispatcherWaitConfig]. As an optimization, a test that executes coroutines only on a + * [TestCoroutineDispatcher] and never interacts with other dispatchers may use [SingleDispatcherWaitConfig]. + * + * A test may subclass this to customize the wait in advanced cases. + */ +interface WaitConfig { + /** + * How long (in wall-clock time) to wait for other Dispatchers to complete coroutines during a [runBlockingTest]. + * + * This delay is not related to the virtual time of a [TestCoroutineDispatcher], but is how long a test should allow + * another dispatcher, like Dispatchers.IO, to perform a time consuming activity such as reading from a database. + */ + val wait: Long +} + +/** + * Do not wait for coroutines executing on another [Dispatcher] in [runBlockingTest]. + + * Always fails with an uncompleted coroutine when any coroutine in the test executes on any other dispatcher (including + * calls to [withContext]). It should not be used for most tests, instead use the default value of + * [MultiDispatcherWaitConfig]. + * + * This configuration should only be used as an optimization for tests that intentionally create an uncompleted + * coroutine and execute all coroutines on the [TestCoroutineDispatcher] used by [runBlockingTest]. + * + * If in doubt, prefer [MultiDispatcherWaitConfig]. + */ +object SingleDispatcherWaitConfig : WaitConfig { + /** + * This value is ignored by [runBlockingTest] on [SingleDispatcherWaitConfig] + */ + override val wait = 0L + + override fun toString() = "SingleDispatcherWaitConfig" +} + +/** + * Wait up to 30 seconds for any coroutines running on another [Dispatcher] to complete in [runBlockingTest]. + * + * This is the default value for [runBlockingTest] and the recommendation for most tests. This configuration will allow + * for coroutines to be launched on another dispatcher inside the test (e.g. calls to `withContext(Dispatchers.IO)`). + * + * This allows for code like the following to be tested correctly using [runBlockingTest]: + * + * ``` + * suspend fun delayOnDefault() = withContext(Dispatchers.Default) { + * // this delay does not use the virtual-time of runBlockingTest since it's executing on Dispatchers.Default + * delay(50) + * } + * + * runBlockingTest { + * // Note: This test takes at least 50ms (real time) to run + * + * // delayOnDefault will suspend the runBlockingTest coroutine for 50ms [real-time: 0; virtual-time: 0] + * delayOnDefault() + * // runBlockingTest resumes 50ms later (real time) [real-time: 50; virtual-time: 0] + * + * delay(10) + * //this delay will auto-progress since it's in runBlockingTest [real-time: 50; virtual-time: 10] + * } + * ``` + */ +object MultiDispatcherWaitConfig: WaitConfig { + /** + * Default wait is 30 seconds. + * + * {@inheritDoc} + */ + override val wait = DEFAULT_TEST_TIMEOUT + + override fun toString() = "MultiDispatcherWaitConfig[wait = 30s]" +} + /** * Executes a [testBody] inside an immediate execution dispatcher. * @@ -38,64 +118,104 @@ import kotlin.coroutines.* * (including coroutines suspended on join/await). * * @param context additional context elements. If [context] contains [CoroutineDispatcher] or [CoroutineExceptionHandler], - * then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively. + * then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively. + * @param waitConfig strategy for waiting on other dispatchers to complete during the test. [SingleDispatcherWaitConfig] + * will never wait, other values will wait for [WaitConfig.wait]ms. * @param testBody The code of the unit-test. */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 -public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, testBody: suspend TestCoroutineScope.() -> Unit) { +public fun runBlockingTest( + context: CoroutineContext = EmptyCoroutineContext, + waitConfig: WaitConfig = MultiDispatcherWaitConfig, + testBody: suspend TestCoroutineScope.() -> Unit +) { val (safeContext, dispatcher) = context.checkArguments() val startingJobs = safeContext.activeJobs() - val scope = TestCoroutineScope(safeContext) - val deferred = scope.async { - scope.testBody() + var testScope: TestCoroutineScope? = null + + val deferred = CoroutineScope(safeContext).async { + val localTestScope = TestCoroutineScope(coroutineContext) + testScope = localTestScope + localTestScope.testBody() } - // run any outstanding coroutines that can be completed by advancing virtual-time - dispatcher.advanceUntilIdle() - - // fetch results from the coroutine - this may require a thread hop if some child coroutine was *completed* on - // another thread during this test so we must use an invokeOnCompletion handler to retrieve the result. - - // There are two code paths for fetching the error: - // - // 1. The job was already completed (happy path, normal test) - // - invokeOnCompletion was executed immediately and errorThrownByTestOrNull is already at it's final value so - // we can throw it - // 2. The job has not already completed (always fail the test due to error or time-based non-determinism) - // - invokeOnCompletion will not be triggered right away. To avoid introducing wall non-deterministic behavior - // (the deferred may complete between here and the call to activeJobs below) this will always be considered a - // test failure. - // - this will not happen if all coroutines are only waiting to complete due to thread hops, but may happen - // if another thread triggers completion concurrently with this cleanup code. - // - // give test code errors a priority in the happy path, throw here if the error is already known. - val (wasCompletedAfterTest, errorThrownByTestOrNull) = deferred.getResultIfKnown() - errorThrownByTestOrNull?.let { throw it } - - scope.cleanupTestCoroutines() - val endingJobs = safeContext.activeJobs() - if ((endingJobs - startingJobs).isNotEmpty()) { - throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs") + val didTimeout = deferred.waitForCompletion(waitConfig, dispatcher) + + if (deferred.isCompleted) { + deferred.getCompletionExceptionOrNull()?.let { + throw it + } } - if (!wasCompletedAfterTest) { - // Handle path #2, we are going to fail the test in an opinionated way at this point so let the developer know - // how to fix it. - throw UncompletedCoroutinesError("Test completed all jobs after cleanup code started. This is " + - "thrown to avoid non-deterministic behavior in tests (the next execution may fail randomly). Ensure " + - "all threads started by the test are completed before returning from runBlockingTest.") + testScope!!.cleanupTestCoroutines() + val endingJobs = safeContext.activeJobs() + + // TODO: should these be separate exceptions to allow for tests to detect difference? + if (didTimeout) { + val message = """ + runBlockingTest timed out after waiting ${waitConfig.wait}ms for coroutines to complete due waitConfig = $waitConfig. + Active jobs after test (may be empty): $endingJobs + """.trimIndent() + throw UncompletedCoroutinesError(message) + } else if ((endingJobs - startingJobs).isNotEmpty()) { + val message = StringBuilder("Test finished with active jobs: ") + message.append(endingJobs) + if (waitConfig == SingleDispatcherWaitConfig) { + message.append(""" + + Note: runBlockingTest did not wait for other dispatchers due to argument waitConfig = $waitConfig + + Tip: If this runBlockingTest starts any code on another dispatcher (such as Dispatchers.Default, + Dispatchers.IO, etc) in any of the functions it calls it will never pass when configured with + SingleDispatcherWaitConfig. Please update your test to use the default value of MultiDispatcherWaitConfig + like: + + runBlockingTest { } + + """.trimIndent()) + } + throw UncompletedCoroutinesError(message.toString()) } } -private fun Deferred.getResultIfKnown(): Pair { - var testError: Throwable? = null - var wasExecuted = false - invokeOnCompletion { errorFromTestOrNull -> - testError = errorFromTestOrNull - wasExecuted = true - }.dispose() - return Pair(wasExecuted, testError) +private fun Deferred.waitForCompletion(waitConfig: WaitConfig, dispatcher: DelayController): Boolean { + var didTimeout = false + when (waitConfig) { + SingleDispatcherWaitConfig -> dispatcher.advanceUntilIdle() + else -> { + runBlocking { + val subscription = dispatcher.queueState.openSubscription() + dispatcher.advanceUntilIdle() + + var finished = false + try { + while (!finished) { + finished = select { + this@waitForCompletion.onAwait { + true + } + onTimeout(waitConfig.wait) { + didTimeout = true + true + } + subscription.onReceive { queueState -> + when (queueState) { + DelayController.QueueState.Idle -> Unit + else -> dispatcher.advanceUntilIdle() + } + false + } + } + } + } finally { + subscription.cancel() + } + } + + } + } + return didTimeout } private fun CoroutineContext.activeJobs(): Set { @@ -107,13 +227,13 @@ private fun CoroutineContext.activeJobs(): Set { */ // todo: need documentation on how this extension is supposed to be used @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 -public fun TestCoroutineScope.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, block) +public fun TestCoroutineScope.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, configuration, block) /** * Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher]. */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 -public fun TestCoroutineDispatcher.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, block) +public fun TestCoroutineDispatcher.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, configuration, block) private fun CoroutineContext.checkArguments(): Pair { // TODO optimize it diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index 386fc8380d..e4f57d570d 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -4,11 +4,15 @@ package kotlinx.coroutines.test -import kotlinx.atomicfu.* +import kotlinx.atomicfu.atomic +import kotlinx.atomicfu.update import kotlinx.coroutines.* -import kotlinx.coroutines.internal.* -import kotlin.coroutines.* -import kotlin.math.* +import kotlinx.coroutines.channels.ConflatedBroadcastChannel +import kotlinx.coroutines.internal.ThreadSafeHeap +import kotlinx.coroutines.internal.ThreadSafeHeapNode +import kotlinx.coroutines.test.DelayController.QueueState.* +import kotlin.coroutines.CoroutineContext +import kotlin.math.max /** * [CoroutineDispatcher] that performs both immediate and lazy execution of coroutines in tests @@ -44,6 +48,8 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl // Storing time in nanoseconds internally. private val _time = atomic(0L) + override val queueState = ConflatedBroadcastChannel(Idle) + /** @suppress */ override fun dispatch(context: CoroutineContext, block: Runnable) { if (dispatchImmediately) { @@ -70,6 +76,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl return object : DisposableHandle { override fun dispose() { queue.remove(node) + updateQueueObservers() } } } @@ -79,14 +86,18 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl return "TestCoroutineDispatcher[currentTime=${currentTime}ms, queued=${queue.size}]" } - private fun post(block: Runnable) = + private fun post(block: Runnable) { queue.addLast(TimedRunnable(block, _counter.getAndIncrement())) + updateQueueObservers() + } - private fun postDelayed(block: Runnable, delayTime: Long) = - TimedRunnable(block, _counter.getAndIncrement(), safePlus(currentTime, delayTime)) - .also { - queue.addLast(it) - } + private fun postDelayed(block: Runnable, delayTime: Long): TimedRunnable { + return TimedRunnable(block, _counter.getAndIncrement(), safePlus(currentTime, delayTime)) + .also { + queue.addLast(it) + updateQueueObservers() + } + } private fun safePlus(currentTime: Long, delayTime: Long): Long { check(delayTime >= 0) @@ -111,6 +122,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl override fun advanceTimeBy(delayTimeMillis: Long): Long { val oldTime = currentTime advanceUntilTime(oldTime + delayTimeMillis) + updateQueueObservers() return currentTime - oldTime } @@ -122,6 +134,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl private fun advanceUntilTime(targetTime: Long) { doActionsUntil(targetTime) _time.update { currentValue -> max(currentValue, targetTime) } + updateQueueObservers() } /** @suppress */ @@ -132,11 +145,16 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl val next = queue.peek() ?: break advanceUntilTime(next.time) } + + updateQueueObservers() return currentTime - oldTime } /** @suppress */ - override fun runCurrent() = doActionsUntil(currentTime) + override fun runCurrent() { + doActionsUntil(currentTime) + updateQueueObservers() + } /** @suppress */ override suspend fun pauseDispatcher(block: suspend () -> Unit) { @@ -163,6 +181,7 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl override fun cleanupTestCoroutines() { // process any pending cancellations or completions, but don't advance time doActionsUntil(currentTime) + updateQueueObservers() // run through all pending tasks, ignore any submitted coroutines that are not active val pendingTasks = mutableListOf() @@ -181,6 +200,25 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl ) } } + + private fun updateQueueObservers() { + // note: this will be called from any thread, and is safe lock-free in runBlockingTest but to guard against + // third party code code updating this will use a lock + synchronized(queue) { + val next = queue.peek() + when { + next == null-> queueState.offerIfChanged(Idle) + next.time <= currentTime -> queueState.offerIfChanged(HasCurrentTask) + next.time > currentTime -> queueState.offerIfChanged(HasDelayedTask) + } + } + } + + private fun ConflatedBroadcastChannel.offerIfChanged(element: DelayController.QueueState) { + if (valueOrNull != element) { + offer(element) + } + } } /** diff --git a/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt b/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt index 7082396340..b1debee8cf 100644 --- a/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt +++ b/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt @@ -4,11 +4,12 @@ package kotlinx.coroutines.test +import junit.framework.TestCase.assertEquals import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import org.junit.* +import java.util.concurrent.Executors import kotlin.concurrent.thread -import kotlin.coroutines.* -import kotlin.test.assertEquals class TestRunBlockingOrderTest : TestBase() { @Test @@ -77,59 +78,20 @@ class TestRunBlockingOrderTest : TestBase() { expect(2) thread { expect(3) - cont.resume(Unit) + cont.resume(Unit) { Unit } } } finish(4) } - @Test(expected = UncompletedCoroutinesError::class) - fun testWithOddlyCompletingJob_fails() { - // this test is suspect since it relies upon the exact ordering of code in runBlockingTest - // however, it needs to ensure the job finishes *after* advanceUntilIdle is called in order - // to ensure that runBlockingTest errors when presented with threading non-determinism. - - // this test is stable and will always pass unless the implementation changes. - - // If this starts failing because the call to cleanupTestCoroutines changes it will need a similarly - // implementation driven test. - - class FakeDispatcher(val delegate: TestCoroutineDispatcher): - CoroutineDispatcher(), - Delay by delegate, - DelayController by delegate { - private var cleanupCallback: (() -> Unit)? = null - - override fun dispatch(context: CoroutineContext, block: Runnable) { - delegate.dispatch(context, block) - } - - fun onCleanup(block: () -> Unit) { - cleanupCallback = block - } - - override fun cleanupTestCoroutines() { - delegate.cleanupTestCoroutines() - cleanupCallback?.invoke() - } - } - - val dispatcher = FakeDispatcher(TestCoroutineDispatcher()) - val scope = TestCoroutineScope(dispatcher) - val resumeAfterTest = CompletableDeferred() - - scope.runBlockingTest { - expect(1) - dispatcher.onCleanup { - // after advanceTimeUntilIdle, complete the launched coroutine - expect(3) - resumeAfterTest.complete(Unit) - finish(5) - } + @Test + fun testWithDelayInOtherDispatcher_passesWhenDelayIsShort() = runBlockingTest { + expect(1) + withContext(Dispatchers.IO) { + delay(1) expect(2) - resumeAfterTest.await() // this will resume just before child jobs are checked - expect(4) } + finish(3) } @Test @@ -151,7 +113,7 @@ class TestRunBlockingOrderTest : TestBase() { val uncompleted = CompletableDeferred() val result = runCatching { expect(1) - runBlockingTest { + runBlockingTest(waitConfig = SingleDispatcherWaitConfig) { expect(2) uncompleted.await() } @@ -168,4 +130,191 @@ class TestRunBlockingOrderTest : TestBase() { } finish(2) } + + @Test + fun testComplexDispatchFromOtherDispatchersOverTime_completes() { + val otherDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + val max = 10 + + val numbersFromOtherDispatcherWithDelays = flow { + var current = 0 + while (current < max) { + delay(1) + emit(++current) + } + }.flowOn(otherDispatcher) + + try { + runBlockingTest { + numbersFromOtherDispatcherWithDelays.collect { value -> + expect(value) + } + expect(max + 1) + } + } finally { + otherDispatcher.close() + } + finish(max + 2) + } + + @Test + fun testComplexDispatchFromOtherDispatchersOverTime_withPasuedTestDispatcher_completes() { + val otherDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + val max = 10 + + val numbersFromOtherDispatcherWithDelays = flow { for(x in 1..max) { emit(x) } } + .buffer(0) + .delayEach(1) + .flowOn(otherDispatcher) + + otherDispatcher.use { + runBlockingTest { + pauseDispatcher() + numbersFromOtherDispatcherWithDelays.collect { value -> + expect(value) + } + expect(max + 1) + } + } + finish(max + 2) + } + + @Test + fun testDispatchFromOtherDispatch_triggersInternalDispatch() { + val otherDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + val numbersFromOtherDispatcherWithDelays = flow { emit(1) } + .delayEach(1) + .buffer(0) + .flowOn(otherDispatcher) + + otherDispatcher.use { + runBlockingTest { + numbersFromOtherDispatcherWithDelays.collect { value -> + expect(value) + launch { + expect(2) + } + } + expect(3) + } + } + finish(4) + } + + @Test + fun testDispatchFromOtherDispatch_triggersInternalDispatch_withDelay() { + val otherDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() + + val max = 10 + + val numbersFromOtherDispatcherWithDelays = flow { for(x in 1..max) { emit(x)} } + .filter { it % 2 == 1 } + .delayEach(1) + .buffer(0) + .flowOn(otherDispatcher) + + otherDispatcher.use { + runBlockingTest { + numbersFromOtherDispatcherWithDelays.collect { value -> + expect(value) + delay(1) + expect (value + 1) + } + delay(1) + expect(max + 1) + } + } + finish(max + 2) + } + + object OneMillisecondWaitConfig : WaitConfig { + override val wait = 1L + + override fun toString() = "OneMillisecondWaitConfig" + } + + @Test + fun whenWaitConfig_timesOut_getExceptionWithMessage() { + expect(1) + val uncompleted = CompletableDeferred() + val result = runCatching { + runBlockingTest(waitConfig = OneMillisecondWaitConfig) { + withContext(Dispatchers.IO) { + expect(2) + uncompleted.await() + } + } + } + finish(3) + val hasDetailedError = result.exceptionOrNull()?.message?.contains("may be empty") + assertEquals(true, hasDetailedError) + uncompleted.cancel() + } + + @Test + fun whenWaitConfig_isSingleThreaded_hasDetailedErrorMessage() { + expect(1) + val uncompleted = CompletableDeferred() + val result = runCatching { + runBlockingTest(waitConfig = SingleDispatcherWaitConfig) { + launch { + expect(2) + uncompleted.await() + } + } + } + finish(3) + val hasDetailedError = result.exceptionOrNull()?.message?.contains("Please update your test to use the default value of MultiDispatcherWaitConfig") + assertEquals(true, hasDetailedError) + uncompleted.cancel() + } + + @Test + fun whenCoroutineStartedInScope_doesntLeakOnAnotherDispatcher() { + var job: Job? = null + runBlockingTest { + expect(1) + job = launch(Dispatchers.IO) { + delay(1) + expect(3) + } + expect(2) + } + assertEquals(true, job?.isCompleted) + finish(4) + } + + @Test + fun whenDispatcherPaused_runBlocking_dispatchesToTestThread() { + val thread = Thread.currentThread() + runBlockingTest { + pauseDispatcher() + withContext(Dispatchers.IO) { + expect(1) + delay(1) + expect(2) + } + assertEquals(thread, Thread.currentThread()) + finish(3) + } + } + + @Test + fun whenDispatcherResumed_runBlocking_dispatchesImmediatelyOnIO() { + var thread: Thread? = null + runBlockingTest { + resumeDispatcher() + withContext(Dispatchers.IO) { + thread = Thread.currentThread() + expect(1) + delay(1) + expect(2) + } + assertEquals(thread, Thread.currentThread()) + finish(3) + } + } } diff --git a/kotlinx-coroutines-test/test/TestRunBlockingTest.kt b/kotlinx-coroutines-test/test/TestRunBlockingTest.kt index e0c7091505..1611c1d848 100644 --- a/kotlinx-coroutines-test/test/TestRunBlockingTest.kt +++ b/kotlinx-coroutines-test/test/TestRunBlockingTest.kt @@ -5,6 +5,7 @@ package kotlinx.coroutines.test import kotlinx.coroutines.* +import java.lang.IllegalStateException import kotlin.coroutines.* import kotlin.test.* @@ -129,7 +130,6 @@ class TestRunBlockingTest { @Test fun whenUsingTimeout_inAsync_doesNotTriggerWhenNotDelayed() = runBlockingTest { - val testScope = this val deferred = async { withTimeout(SLOW) { delay(0) @@ -187,13 +187,13 @@ class TestRunBlockingTest { assertRunsFast { job.join() - throw job.getCancellationException().cause ?: assertFails { "expected exception" } + throw job.getCancellationException().cause ?: IllegalStateException("expected exception") } } @Test(expected = IllegalArgumentException::class) fun throwingException__inAsync_throws() = runBlockingTest { - val deferred = async { + val deferred : Deferred = async { delay(SLOW) throw IllegalArgumentException("Test") } @@ -274,7 +274,7 @@ class TestRunBlockingTest { } @Test(expected = UncompletedCoroutinesError::class) - fun whenACoroutineLeaks_errorIsThrown() = runBlockingTest { + fun whenACoroutineLeaks_errorIsThrown() = runBlockingTest(waitConfig = SingleDispatcherWaitConfig) { val uncompleted = CompletableDeferred() launch { uncompleted.await() @@ -342,7 +342,7 @@ class TestRunBlockingTest { fun testWithTestContextThrowingAnAssertionError() = runBlockingTest { val expectedError = IllegalAccessError("hello") - val job = launch { + launch { throw expectedError } From 6d1639ed0a54529babc02fb65aa59faa3cf694fb Mon Sep 17 00:00:00 2001 From: Sean McQuillan Date: Thu, 8 Aug 2019 21:20:42 -0700 Subject: [PATCH 3/5] Reimplement test event loop with a much smaller interface that allows for external parking via a single suspend function. --- .../src/DelayController.kt | 45 ----- kotlinx-coroutines-test/src/TestBuilders.kt | 164 ++++-------------- .../src/TestCoroutineDispatcher.kt | 59 +++---- .../test/TestRunBlockingOrderTest.kt | 53 +++--- .../test/TestRunBlockingTest.kt | 2 +- 5 files changed, 87 insertions(+), 236 deletions(-) diff --git a/kotlinx-coroutines-test/src/DelayController.kt b/kotlinx-coroutines-test/src/DelayController.kt index b4abfa097d..286e5efe20 100644 --- a/kotlinx-coroutines-test/src/DelayController.kt +++ b/kotlinx-coroutines-test/src/DelayController.kt @@ -137,51 +137,6 @@ public interface DelayController { */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public fun resumeDispatcher() - - /** - * Represents the queue state of a DelayController. - * - * Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple - * [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest]. - */ - public sealed class QueueState { - /** - * A [DelayController] that is idle does not currently have any tasks to perform. - * - * This may happen if all coroutines in this [DelayController] have completed, or if they are all suspended - * waiting on other dispatchers. - */ - public object Idle: QueueState() { - override fun toString() = "Idle" - } - - /** - * A [DelayController] that has a task that will execute in response to a call to [runCurrent]. - * - * There may also be delayed tasks scheduled, in which case [HasCurrentTask] takes priority since current tasks - * will execute at an earlier virtual time. - */ - public object HasCurrentTask: QueueState() { - override fun toString() = "HasCurrentTask" - } - - /** - * A [DelayController] that has delayed tasks has a task scheduled for sometime in the future. - * - * If there are also tasks at the current time, [HasCurrentTask] will take priority. - */ - public object HasDelayedTask: QueueState(){ - override fun toString() = "HasDelayedTask" - } - } - - /** - * A ConflatedBroadcastChannel that is up to date with the current [QueueState]. - * - * Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple - * [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest]. - */ - public val queueState: ConflatedBroadcastChannel } /** diff --git a/kotlinx-coroutines-test/src/TestBuilders.kt b/kotlinx-coroutines-test/src/TestBuilders.kt index 59a2ccc9ea..93349cd686 100644 --- a/kotlinx-coroutines-test/src/TestBuilders.kt +++ b/kotlinx-coroutines-test/src/TestBuilders.kt @@ -5,87 +5,11 @@ package kotlinx.coroutines.test import kotlinx.coroutines.* +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.selects.select -import java.lang.StringBuilder import kotlin.coroutines.* -private const val DEFAULT_TEST_TIMEOUT = 30_000L - -/** - * A strategy for waiting on coroutines executed on other dispatchers inside a [runBlockingTest]. - * - * Most tests should use [MultiDispatcherWaitConfig]. As an optimization, a test that executes coroutines only on a - * [TestCoroutineDispatcher] and never interacts with other dispatchers may use [SingleDispatcherWaitConfig]. - * - * A test may subclass this to customize the wait in advanced cases. - */ -interface WaitConfig { - /** - * How long (in wall-clock time) to wait for other Dispatchers to complete coroutines during a [runBlockingTest]. - * - * This delay is not related to the virtual time of a [TestCoroutineDispatcher], but is how long a test should allow - * another dispatcher, like Dispatchers.IO, to perform a time consuming activity such as reading from a database. - */ - val wait: Long -} - -/** - * Do not wait for coroutines executing on another [Dispatcher] in [runBlockingTest]. - - * Always fails with an uncompleted coroutine when any coroutine in the test executes on any other dispatcher (including - * calls to [withContext]). It should not be used for most tests, instead use the default value of - * [MultiDispatcherWaitConfig]. - * - * This configuration should only be used as an optimization for tests that intentionally create an uncompleted - * coroutine and execute all coroutines on the [TestCoroutineDispatcher] used by [runBlockingTest]. - * - * If in doubt, prefer [MultiDispatcherWaitConfig]. - */ -object SingleDispatcherWaitConfig : WaitConfig { - /** - * This value is ignored by [runBlockingTest] on [SingleDispatcherWaitConfig] - */ - override val wait = 0L - - override fun toString() = "SingleDispatcherWaitConfig" -} - -/** - * Wait up to 30 seconds for any coroutines running on another [Dispatcher] to complete in [runBlockingTest]. - * - * This is the default value for [runBlockingTest] and the recommendation for most tests. This configuration will allow - * for coroutines to be launched on another dispatcher inside the test (e.g. calls to `withContext(Dispatchers.IO)`). - * - * This allows for code like the following to be tested correctly using [runBlockingTest]: - * - * ``` - * suspend fun delayOnDefault() = withContext(Dispatchers.Default) { - * // this delay does not use the virtual-time of runBlockingTest since it's executing on Dispatchers.Default - * delay(50) - * } - * - * runBlockingTest { - * // Note: This test takes at least 50ms (real time) to run - * - * // delayOnDefault will suspend the runBlockingTest coroutine for 50ms [real-time: 0; virtual-time: 0] - * delayOnDefault() - * // runBlockingTest resumes 50ms later (real time) [real-time: 50; virtual-time: 0] - * - * delay(10) - * //this delay will auto-progress since it's in runBlockingTest [real-time: 50; virtual-time: 10] - * } - * ``` - */ -object MultiDispatcherWaitConfig: WaitConfig { - /** - * Default wait is 30 seconds. - * - * {@inheritDoc} - */ - override val wait = DEFAULT_TEST_TIMEOUT - - override fun toString() = "MultiDispatcherWaitConfig[wait = 30s]" -} +private const val DEFAULT_WAIT_FOR_OTHER_DISPATCHERS = 30_000L /** * Executes a [testBody] inside an immediate execution dispatcher. @@ -119,14 +43,14 @@ object MultiDispatcherWaitConfig: WaitConfig { * * @param context additional context elements. If [context] contains [CoroutineDispatcher] or [CoroutineExceptionHandler], * then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively. - * @param waitConfig strategy for waiting on other dispatchers to complete during the test. [SingleDispatcherWaitConfig] - * will never wait, other values will wait for [WaitConfig.wait]ms. + * @param waitForOtherDispatchers how long to wait for other dispatchers to execute tasks asynchronously, default 30 + * seconds * @param testBody The code of the unit-test. */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public fun runBlockingTest( context: CoroutineContext = EmptyCoroutineContext, - waitConfig: WaitConfig = MultiDispatcherWaitConfig, + waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, testBody: suspend TestCoroutineScope.() -> Unit ) { val (safeContext, dispatcher) = context.checkArguments() @@ -140,7 +64,7 @@ public fun runBlockingTest( localTestScope.testBody() } - val didTimeout = deferred.waitForCompletion(waitConfig, dispatcher) + val didTimeout = deferred.waitForCompletion(waitForOtherDispatchers, dispatcher, dispatcher as IdleWaiter) if (deferred.isCompleted) { deferred.getCompletionExceptionOrNull()?.let { @@ -154,66 +78,41 @@ public fun runBlockingTest( // TODO: should these be separate exceptions to allow for tests to detect difference? if (didTimeout) { val message = """ - runBlockingTest timed out after waiting ${waitConfig.wait}ms for coroutines to complete due waitConfig = $waitConfig. + runBlockingTest timed out after waiting ${waitForOtherDispatchers}ms for coroutines to complete. Active jobs after test (may be empty): $endingJobs """.trimIndent() throw UncompletedCoroutinesError(message) } else if ((endingJobs - startingJobs).isNotEmpty()) { - val message = StringBuilder("Test finished with active jobs: ") - message.append(endingJobs) - if (waitConfig == SingleDispatcherWaitConfig) { - message.append(""" - - Note: runBlockingTest did not wait for other dispatchers due to argument waitConfig = $waitConfig - - Tip: If this runBlockingTest starts any code on another dispatcher (such as Dispatchers.Default, - Dispatchers.IO, etc) in any of the functions it calls it will never pass when configured with - SingleDispatcherWaitConfig. Please update your test to use the default value of MultiDispatcherWaitConfig - like: - - runBlockingTest { } - - """.trimIndent()) - } - throw UncompletedCoroutinesError(message.toString()) + throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs ") } } -private fun Deferred.waitForCompletion(waitConfig: WaitConfig, dispatcher: DelayController): Boolean { +private fun Deferred.waitForCompletion(wait: Long, delayController: DelayController, park: IdleWaiter): Boolean { var didTimeout = false - when (waitConfig) { - SingleDispatcherWaitConfig -> dispatcher.advanceUntilIdle() - else -> { - runBlocking { - val subscription = dispatcher.queueState.openSubscription() - dispatcher.advanceUntilIdle() - - var finished = false - try { - while (!finished) { - finished = select { - this@waitForCompletion.onAwait { - true - } - onTimeout(waitConfig.wait) { - didTimeout = true - true - } - subscription.onReceive { queueState -> - when (queueState) { - DelayController.QueueState.Idle -> Unit - else -> dispatcher.advanceUntilIdle() - } - false - } - } + + runBlocking { + val unparkChannel = Channel(1) + val job = launch { + while(true) { + park.suspendUntilNextDispatch() + unparkChannel.send(Unit) + } + } + + try { + withTimeout(wait) { + while(!isCompleted) { + delayController.advanceUntilIdle() + select { + onAwait { Unit } + unparkChannel.onReceive { Unit } } - } finally { - subscription.cancel() } } - + } catch (exception: TimeoutCancellationException) { + didTimeout = true } + job.cancel() } return didTimeout } @@ -227,18 +126,19 @@ private fun CoroutineContext.activeJobs(): Set { */ // todo: need documentation on how this extension is supposed to be used @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 -public fun TestCoroutineScope.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, configuration, block) +public fun TestCoroutineScope.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, waitForOtherDispatchers, block) /** * Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher]. */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 -public fun TestCoroutineDispatcher.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, configuration, block) +public fun TestCoroutineDispatcher.runBlockingTest(waitForOtherDispatchers: Long = DEFAULT_WAIT_FOR_OTHER_DISPATCHERS, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, waitForOtherDispatchers, block) private fun CoroutineContext.checkArguments(): Pair { // TODO optimize it val dispatcher = get(ContinuationInterceptor).run { this?.let { require(this is DelayController) { "Dispatcher must implement DelayController: $this" } } + this?.let { require(this is IdleWaiter) { "Dispatcher must implement IdleWaiter" } } this ?: TestCoroutineDispatcher() } diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index e4f57d570d..3d0c86f5f2 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -7,10 +7,9 @@ package kotlinx.coroutines.test import kotlinx.atomicfu.atomic import kotlinx.atomicfu.update import kotlinx.coroutines.* -import kotlinx.coroutines.channels.ConflatedBroadcastChannel +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.internal.ThreadSafeHeap import kotlinx.coroutines.internal.ThreadSafeHeapNode -import kotlinx.coroutines.test.DelayController.QueueState.* import kotlin.coroutines.CoroutineContext import kotlin.math.max @@ -29,7 +28,7 @@ import kotlin.math.max * @see DelayController */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 -public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayController { +public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayController, IdleWaiter { private var dispatchImmediately = true set(value) { field = value @@ -48,11 +47,12 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl // Storing time in nanoseconds internally. private val _time = atomic(0L) - override val queueState = ConflatedBroadcastChannel(Idle) - + private val waitLock = Channel(capacity = 1) + /** @suppress */ override fun dispatch(context: CoroutineContext, block: Runnable) { if (dispatchImmediately) { + unpark() block.run() } else { post(block) @@ -76,7 +76,6 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl return object : DisposableHandle { override fun dispose() { queue.remove(node) - updateQueueObservers() } } } @@ -88,14 +87,14 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl private fun post(block: Runnable) { queue.addLast(TimedRunnable(block, _counter.getAndIncrement())) - updateQueueObservers() + unpark() } private fun postDelayed(block: Runnable, delayTime: Long): TimedRunnable { return TimedRunnable(block, _counter.getAndIncrement(), safePlus(currentTime, delayTime)) .also { queue.addLast(it) - updateQueueObservers() + unpark() } } @@ -122,7 +121,6 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl override fun advanceTimeBy(delayTimeMillis: Long): Long { val oldTime = currentTime advanceUntilTime(oldTime + delayTimeMillis) - updateQueueObservers() return currentTime - oldTime } @@ -134,7 +132,6 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl private fun advanceUntilTime(targetTime: Long) { doActionsUntil(targetTime) _time.update { currentValue -> max(currentValue, targetTime) } - updateQueueObservers() } /** @suppress */ @@ -146,14 +143,12 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl advanceUntilTime(next.time) } - updateQueueObservers() return currentTime - oldTime } /** @suppress */ override fun runCurrent() { doActionsUntil(currentTime) - updateQueueObservers() } /** @suppress */ @@ -179,9 +174,9 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl /** @suppress */ override fun cleanupTestCoroutines() { + unpark() // process any pending cancellations or completions, but don't advance time doActionsUntil(currentTime) - updateQueueObservers() // run through all pending tasks, ignore any submitted coroutines that are not active val pendingTasks = mutableListOf() @@ -201,23 +196,12 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl } } - private fun updateQueueObservers() { - // note: this will be called from any thread, and is safe lock-free in runBlockingTest but to guard against - // third party code code updating this will use a lock - synchronized(queue) { - val next = queue.peek() - when { - next == null-> queueState.offerIfChanged(Idle) - next.time <= currentTime -> queueState.offerIfChanged(HasCurrentTask) - next.time > currentTime -> queueState.offerIfChanged(HasDelayedTask) - } - } + override suspend fun suspendUntilNextDispatch() { + waitLock.receive() } - private fun ConflatedBroadcastChannel.offerIfChanged(element: DelayController.QueueState) { - if (valueOrNull != element) { - offer(element) - } + private fun unpark() { + waitLock.offer(Unit) } } @@ -250,4 +234,21 @@ private class TimedRunnable( } override fun toString() = "TimedRunnable(time=$time, run=$runnable)" -} \ No newline at end of file +} + +/** + * Alternative implementations of [TestCoroutineDispatcher] must implement this interface in order to be supported by + * [runBlockingTest]. + * + * This interface allows external code to suspend itself until the next dispatch is received. This is similar to park in + * a normal event loop, but doesn't require that [TestCoroutineDispatcher] block a thread while parked. + */ +interface IdleWaiter { + /** + * Attempt to suspend until the next dispatch is received. + * + * This method may resume immediately if any dispatch was received since the last time it was called. This ensures + * that dispatches won't be dropped if they happen just before calling [suspendUntilNextDispatch]. + */ + public suspend fun suspendUntilNextDispatch() +} diff --git a/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt b/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt index b1debee8cf..08b1665d6b 100644 --- a/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt +++ b/kotlinx-coroutines-test/test/TestRunBlockingOrderTest.kt @@ -113,7 +113,7 @@ class TestRunBlockingOrderTest : TestBase() { val uncompleted = CompletableDeferred() val result = runCatching { expect(1) - runBlockingTest(waitConfig = SingleDispatcherWaitConfig) { + runBlockingTest(waitForOtherDispatchers = 0L) { expect(2) uncompleted.await() } @@ -230,48 +230,23 @@ class TestRunBlockingOrderTest : TestBase() { finish(max + 2) } - object OneMillisecondWaitConfig : WaitConfig { - override val wait = 1L - - override fun toString() = "OneMillisecondWaitConfig" - } - @Test fun whenWaitConfig_timesOut_getExceptionWithMessage() { expect(1) val uncompleted = CompletableDeferred() val result = runCatching { - runBlockingTest(waitConfig = OneMillisecondWaitConfig) { + runBlockingTest(waitForOtherDispatchers = 1L) { withContext(Dispatchers.IO) { - expect(2) + finish(2) uncompleted.await() } } } - finish(3) val hasDetailedError = result.exceptionOrNull()?.message?.contains("may be empty") assertEquals(true, hasDetailedError) uncompleted.cancel() } - @Test - fun whenWaitConfig_isSingleThreaded_hasDetailedErrorMessage() { - expect(1) - val uncompleted = CompletableDeferred() - val result = runCatching { - runBlockingTest(waitConfig = SingleDispatcherWaitConfig) { - launch { - expect(2) - uncompleted.await() - } - } - } - finish(3) - val hasDetailedError = result.exceptionOrNull()?.message?.contains("Please update your test to use the default value of MultiDispatcherWaitConfig") - assertEquals(true, hasDetailedError) - uncompleted.cancel() - } - @Test fun whenCoroutineStartedInScope_doesntLeakOnAnotherDispatcher() { var job: Job? = null @@ -308,13 +283,33 @@ class TestRunBlockingOrderTest : TestBase() { runBlockingTest { resumeDispatcher() withContext(Dispatchers.IO) { - thread = Thread.currentThread() expect(1) delay(1) expect(2) + thread = Thread.currentThread() } assertEquals(thread, Thread.currentThread()) finish(3) } } + + @Test + fun whenDispatcherRunning_doesntProgressDelays_inLaunchBody() { + var state = 0 + fun CoroutineScope.subject() = launch { + state = 1 + delay(1000) + state = 2 + } + + runBlockingTest { + subject() + + assertEquals(1, state) + + advanceTimeBy(1000) + + assertEquals(2, state) + } + } } diff --git a/kotlinx-coroutines-test/test/TestRunBlockingTest.kt b/kotlinx-coroutines-test/test/TestRunBlockingTest.kt index 1611c1d848..8cfec254cd 100644 --- a/kotlinx-coroutines-test/test/TestRunBlockingTest.kt +++ b/kotlinx-coroutines-test/test/TestRunBlockingTest.kt @@ -274,7 +274,7 @@ class TestRunBlockingTest { } @Test(expected = UncompletedCoroutinesError::class) - fun whenACoroutineLeaks_errorIsThrown() = runBlockingTest(waitConfig = SingleDispatcherWaitConfig) { + fun whenACoroutineLeaks_errorIsThrown() = runBlockingTest(waitForOtherDispatchers = 0L) { val uncompleted = CompletableDeferred() launch { uncompleted.await() From 403f502310e0efe7060966ad1ee25ebab5b07413 Mon Sep 17 00:00:00 2001 From: Sean McQuillan Date: Thu, 8 Aug 2019 21:47:21 -0700 Subject: [PATCH 4/5] Cleanup docs to describe the behavior as similar to [Dispatchers.Unconfined] --- kotlinx-coroutines-test/src/DelayController.kt | 13 ++++++++++--- .../src/TestCoroutineDispatcher.kt | 4 ++++ 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/kotlinx-coroutines-test/src/DelayController.kt b/kotlinx-coroutines-test/src/DelayController.kt index 286e5efe20..3f608e9e95 100644 --- a/kotlinx-coroutines-test/src/DelayController.kt +++ b/kotlinx-coroutines-test/src/DelayController.kt @@ -94,6 +94,9 @@ public interface DelayController { * * This is useful when testing functions that start a coroutine. By pausing the dispatcher assertions or * setup may be done between the time the coroutine is created and started. + * + * While in the paused block, the dispatcher will queue all dispatched coroutines and they will be resumed on + * whatever thread calls [advanceUntilIdle], [advanceTimeBy], or [runCurrent]. */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public suspend fun pauseDispatcher(block: suspend () -> Unit) @@ -103,6 +106,9 @@ public interface DelayController { * * When paused, the dispatcher will not execute any coroutines automatically, and you must call [runCurrent] or * [advanceTimeBy], or [advanceUntilIdle] to execute coroutines. + * + * While paused, the dispatcher will queue all dispatched coroutines and they will be resumed on whatever thread + * calls [advanceUntilIdle], [advanceTimeBy], or [runCurrent]. */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 public fun pauseDispatcher() @@ -114,8 +120,9 @@ public interface DelayController { * time and execute coroutines scheduled in the future use, one of [advanceTimeBy], * or [advanceUntilIdle]. * - * When the dispatcher is resumed, all execution be immediate in the thread that triggered it. This means - * that the following code will not switch back from Dispatchers.IO after `withContext` + * When the dispatcher is resumed, all execution be immediate in the thread that triggered it similar to + * [Dispatchers.Unconfined]. This means that the following code will not switch back from Dispatchers.IO after + * `withContext` * * ``` * runBlockingTest { @@ -125,7 +132,7 @@ public interface DelayController { * ``` * * For tests that need accurate threading behavior, [pauseDispatcher] will ensure that the following test dispatches - * on the correct thread. + * on a controlled thread. * * ``` * runBlockingTest { diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index 3d0c86f5f2..8497a1d84b 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -25,6 +25,10 @@ import kotlin.math.max * not execute until a call to [DelayController.runCurrent] or the virtual clock-time has been advanced via one of the * methods on [DelayController]. * + * While in immediate mode [TestCoroutineDispatcher] behaves similar to [Dispatchers.Unconfined]. When resuming from + * another thread it will *not* switch threads. When in lazy mode, [TestCoroutineDispatcher] will enqueue all + * dispatches and whatever thread calls an [advanceUntilIdle], [advanceTimeBy], or [runCurrent] will continue execution. + * * @see DelayController */ @ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0 From 1d2a05e5b7f6ff2270c2c4e983f1fc273ae72f9b Mon Sep 17 00:00:00 2001 From: Sean McQuillan Date: Sat, 10 Aug 2019 22:01:54 -0700 Subject: [PATCH 5/5] Unpark runBlocking *after* potential background blocks have run. --- kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt index 8497a1d84b..ea7afef1d7 100644 --- a/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt +++ b/kotlinx-coroutines-test/src/TestCoroutineDispatcher.kt @@ -56,8 +56,8 @@ public class TestCoroutineDispatcher: CoroutineDispatcher(), Delay, DelayControl /** @suppress */ override fun dispatch(context: CoroutineContext, block: Runnable) { if (dispatchImmediately) { - unpark() block.run() + unpark() } else { post(block) }