Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support completing the test coroutine from outside the test thread. #1206

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 67 additions & 0 deletions kotlinx-coroutines-test/src/DelayController.kt
Expand Up @@ -2,6 +2,7 @@ package kotlinx.coroutines.test

import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.ConflatedBroadcastChannel

/**
* Control the virtual clock time of a [CoroutineDispatcher].
Expand Down Expand Up @@ -112,9 +113,75 @@ public interface DelayController {
* Resumed dispatchers will automatically progress through all coroutines scheduled at the current time. To advance
* time and execute coroutines scheduled in the future use, one of [advanceTimeBy],
* or [advanceUntilIdle].
*
* When the dispatcher is resumed, all execution be immediate in the thread that triggered it. This means
* that the following code will not switch back from Dispatchers.IO after `withContext`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While we are here, could you please elaborate on why this behaviour is preferred over "classic" dispatching?
I am a bit concerned about how this behaviour is different from "release" builds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the main goal of this eager behavior was to make this test function testable without having to call runCurrent():

fun launchesCoroutine() {
    scope.launch {
        // do stuff
    }
}

In a test that doesn't use multiple threads (TestCoroutineDispatcher is used for all coroutines) it provides eager entry into the launch body (during dispatch the launch body is immediately executed).

As this is both the normal structure for this code (at least on Android) it is nice to avoid extra calls to runCurrent() in this test

@Test fun callLaunchesCoroutine() = runBlockingTest {
    // assume `scope` is using this TestCoroutineDispatcher
    launchesCoroutine()
    // coroutine has already launched here
}

This thread switching behavior is an undesired side effect of that API choice.

So basically, I think there are two options here:

  1. Modify the behavior of dispatch to require a call to runCurrent() in this test. This does make common test cases require an extra call, but leads to correct threading behavior if the developer hasn't injected TestCoroutineDispatcher throughout their code under test.
  2. Keep the behavior (prefer eager entry of launch blocks) with the assumption most tests will not execute coroutines on multiple threads.

WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

"Keep the behavior" part is, of course, more preferable as it really simplifies writing simple unit tests for end users. I've just realized that this is exactly why we thought to make Uncofined the default behaviour of TestMainDispatcher.

I think it's easier to change the docs and explain that test dispatcher acts like unconfined one (and maybe implementation should use it as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a pretty good way to explain it. I'll add that to the docs in this PR since it's become all about threading.

*
* ```
* runBlockingTest {
* withContext(Dispatchers.IO) { doIo() }
* // runBlockingTest is still on Dispatchers.IO here
* }
* ```
*
* For tests that need accurate threading behavior, [pauseDispatcher] will ensure that the following test dispatches
* on the correct thread.
*
* ```
* runBlockingTest {
* pauseDispatcher()
* withContext(Dispatchers.IO) { doIo() }
* // runBlockingTest has returned to it's starting thread here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit counter-intuitive as well, pauseDispatcher does not pause the dispatcher, but changes its behaviour

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea - this one I'm stuck on. It seems required to support the pauseDispatcher { } block in runBlockingTest but it has this side effect in the presence of multi-threading.

* }
* ```
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun resumeDispatcher()

/**
* Represents the queue state of a DelayController.
*
* Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple
* [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest].
objcode marked this conversation as resolved.
Show resolved Hide resolved
*/
public sealed class QueueState {
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
/**
* A [DelayController] that is idle does not currently have any tasks to perform.
*
* This may happen if all coroutines in this [DelayController] have completed, or if they are all suspended
* waiting on other dispatchers.
*/
public object Idle: QueueState() {
override fun toString() = "Idle"
}

/**
* A [DelayController] that has a task that will execute in response to a call to [runCurrent].
*
* There may also be delayed tasks scheduled, in which case [HasCurrentTask] takes priority since current tasks
* will execute at an earlier virtual time.
*/
public object HasCurrentTask: QueueState() {
override fun toString() = "HasCurrentTask"
}

/**
* A [DelayController] that has delayed tasks has a task scheduled for sometime in the future.
*
* If there are also tasks at the current time, [HasCurrentTask] will take priority.
*/
public object HasDelayedTask: QueueState(){
override fun toString() = "HasDelayedTask"
}
}

/**
* A ConflatedBroadcastChannel that is up to date with the current [QueueState].
*
* Tests do not normally need to use this API. It is exposed for advanced situations like integrating multiple
* [TestCoroutineDispatcher] instances or creating alternatives to [runBlockingtest].
*/
public val queueState: ConflatedBroadcastChannel<QueueState>
objcode marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
184 changes: 171 additions & 13 deletions kotlinx-coroutines-test/src/TestBuilders.kt
Expand Up @@ -5,8 +5,88 @@
package kotlinx.coroutines.test

import kotlinx.coroutines.*
import kotlinx.coroutines.selects.select
import java.lang.StringBuilder
import kotlin.coroutines.*

private const val DEFAULT_TEST_TIMEOUT = 30_000L

/**
* A strategy for waiting on coroutines executed on other dispatchers inside a [runBlockingTest].
*
* Most tests should use [MultiDispatcherWaitConfig]. As an optimization, a test that executes coroutines only on a
* [TestCoroutineDispatcher] and never interacts with other dispatchers may use [SingleDispatcherWaitConfig].
*
* A test may subclass this to customize the wait in advanced cases.
*/
interface WaitConfig {
objcode marked this conversation as resolved.
Show resolved Hide resolved
/**
* How long (in wall-clock time) to wait for other Dispatchers to complete coroutines during a [runBlockingTest].
*
* This delay is not related to the virtual time of a [TestCoroutineDispatcher], but is how long a test should allow
* another dispatcher, like Dispatchers.IO, to perform a time consuming activity such as reading from a database.
*/
val wait: Long
}

/**
* Do not wait for coroutines executing on another [Dispatcher] in [runBlockingTest].

* Always fails with an uncompleted coroutine when any coroutine in the test executes on any other dispatcher (including
* calls to [withContext]). It should not be used for most tests, instead use the default value of
* [MultiDispatcherWaitConfig].
*
* This configuration should only be used as an optimization for tests that intentionally create an uncompleted
* coroutine and execute all coroutines on the [TestCoroutineDispatcher] used by [runBlockingTest].
*
* If in doubt, prefer [MultiDispatcherWaitConfig].
*/
object SingleDispatcherWaitConfig : WaitConfig {
/**
* This value is ignored by [runBlockingTest] on [SingleDispatcherWaitConfig]
*/
override val wait = 0L

override fun toString() = "SingleDispatcherWaitConfig"
}

/**
* Wait up to 30 seconds for any coroutines running on another [Dispatcher] to complete in [runBlockingTest].
*
* This is the default value for [runBlockingTest] and the recommendation for most tests. This configuration will allow
* for coroutines to be launched on another dispatcher inside the test (e.g. calls to `withContext(Dispatchers.IO)`).
*
* This allows for code like the following to be tested correctly using [runBlockingTest]:
*
* ```
* suspend fun delayOnDefault() = withContext(Dispatchers.Default) {
* // this delay does not use the virtual-time of runBlockingTest since it's executing on Dispatchers.Default
* delay(50)
* }
*
* runBlockingTest {
* // Note: This test takes at least 50ms (real time) to run
*
* // delayOnDefault will suspend the runBlockingTest coroutine for 50ms [real-time: 0; virtual-time: 0]
* delayOnDefault()
* // runBlockingTest resumes 50ms later (real time) [real-time: 50; virtual-time: 0]
*
* delay(10)
* //this delay will auto-progress since it's in runBlockingTest [real-time: 50; virtual-time: 10]
* }
* ```
*/
object MultiDispatcherWaitConfig: WaitConfig {
/**
* Default wait is 30 seconds.
*
* {@inheritDoc}
*/
override val wait = DEFAULT_TEST_TIMEOUT

override fun toString() = "MultiDispatcherWaitConfig[wait = 30s]"
}

/**
* Executes a [testBody] inside an immediate execution dispatcher.
*
Expand Down Expand Up @@ -38,26 +118,104 @@ import kotlin.coroutines.*
* (including coroutines suspended on join/await).
*
* @param context additional context elements. If [context] contains [CoroutineDispatcher] or [CoroutineExceptionHandler],
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
* then they must implement [DelayController] and [TestCoroutineExceptionHandler] respectively.
* @param waitConfig strategy for waiting on other dispatchers to complete during the test. [SingleDispatcherWaitConfig]
* will never wait, other values will wait for [WaitConfig.wait]ms.
* @param testBody The code of the unit-test.
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun runBlockingTest(context: CoroutineContext = EmptyCoroutineContext, testBody: suspend TestCoroutineScope.() -> Unit) {
public fun runBlockingTest(
context: CoroutineContext = EmptyCoroutineContext,
waitConfig: WaitConfig = MultiDispatcherWaitConfig,
testBody: suspend TestCoroutineScope.() -> Unit
) {
val (safeContext, dispatcher) = context.checkArguments()
val startingJobs = safeContext.activeJobs()
val scope = TestCoroutineScope(safeContext)
val deferred = scope.async {
scope.testBody()

var testScope: TestCoroutineScope? = null

val deferred = CoroutineScope(safeContext).async {
val localTestScope = TestCoroutineScope(coroutineContext)
testScope = localTestScope
localTestScope.testBody()
}
dispatcher.advanceUntilIdle()
deferred.getCompletionExceptionOrNull()?.let {
throw it

val didTimeout = deferred.waitForCompletion(waitConfig, dispatcher)

if (deferred.isCompleted) {
deferred.getCompletionExceptionOrNull()?.let {
throw it
}
}
scope.cleanupTestCoroutines()

testScope!!.cleanupTestCoroutines()
val endingJobs = safeContext.activeJobs()
if ((endingJobs - startingJobs).isNotEmpty()) {
throw UncompletedCoroutinesError("Test finished with active jobs: $endingJobs")

// TODO: should these be separate exceptions to allow for tests to detect difference?
if (didTimeout) {
val message = """
runBlockingTest timed out after waiting ${waitConfig.wait}ms for coroutines to complete due waitConfig = $waitConfig.
Active jobs after test (may be empty): $endingJobs
""".trimIndent()
throw UncompletedCoroutinesError(message)
} else if ((endingJobs - startingJobs).isNotEmpty()) {
val message = StringBuilder("Test finished with active jobs: ")
message.append(endingJobs)
if (waitConfig == SingleDispatcherWaitConfig) {
message.append("""

Note: runBlockingTest did not wait for other dispatchers due to argument waitConfig = $waitConfig

Tip: If this runBlockingTest starts any code on another dispatcher (such as Dispatchers.Default,
Dispatchers.IO, etc) in any of the functions it calls it will never pass when configured with
SingleDispatcherWaitConfig. Please update your test to use the default value of MultiDispatcherWaitConfig
like:

runBlockingTest { }

""".trimIndent())
}
throw UncompletedCoroutinesError(message.toString())
}
}

private fun Deferred<Unit>.waitForCompletion(waitConfig: WaitConfig, dispatcher: DelayController): Boolean {
var didTimeout = false
when (waitConfig) {
SingleDispatcherWaitConfig -> dispatcher.advanceUntilIdle()
else -> {
runBlocking {
val subscription = dispatcher.queueState.openSubscription()
dispatcher.advanceUntilIdle()

var finished = false
try {
while (!finished) {
finished = select {
this@waitForCompletion.onAwait {
true
}
onTimeout(waitConfig.wait) {
didTimeout = true
true
}
subscription.onReceive { queueState ->
when (queueState) {
DelayController.QueueState.Idle -> Unit
else -> dispatcher.advanceUntilIdle()
}
false
}
}
}
} finally {
subscription.cancel()
}
}

}
}
return didTimeout
}

private fun CoroutineContext.activeJobs(): Set<Job> {
Expand All @@ -69,13 +227,13 @@ private fun CoroutineContext.activeJobs(): Set<Job> {
*/
// todo: need documentation on how this extension is supposed to be used
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineScope.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, block)
public fun TestCoroutineScope.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(coroutineContext, configuration, block)

/**
* Convenience method for calling [runBlockingTest] on an existing [TestCoroutineDispatcher].
*/
@ExperimentalCoroutinesApi // Since 1.2.1, tentatively till 1.3.0
public fun TestCoroutineDispatcher.runBlockingTest(block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, block)
public fun TestCoroutineDispatcher.runBlockingTest(configuration: WaitConfig = MultiDispatcherWaitConfig, block: suspend TestCoroutineScope.() -> Unit) = runBlockingTest(this, configuration, block)

private fun CoroutineContext.checkArguments(): Pair<CoroutineContext, DelayController> {
// TODO optimize it
Expand Down