Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a scope for launching background work in tests #3348

Merged
merged 13 commits into from Jul 12, 2022
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -381,6 +381,12 @@ public final class kotlinx/coroutines/Job$DefaultImpls {
public final class kotlinx/coroutines/Job$Key : kotlin/coroutines/CoroutineContext$Key {
}

public class kotlinx/coroutines/JobImpl : kotlinx/coroutines/JobSupport, kotlinx/coroutines/CompletableJob {
public fun <init> (Lkotlinx/coroutines/Job;)V
public fun complete ()Z
public fun completeExceptionally (Ljava/lang/Throwable;)Z
}

public final class kotlinx/coroutines/JobKt {
public static final fun Job (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/CompletableJob;
public static final synthetic fun Job (Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job;
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/src/JobSupport.kt
Expand Up @@ -1312,6 +1312,7 @@ private class Empty(override val isActive: Boolean) : Incomplete {
override fun toString(): String = "Empty{${if (isActive) "Active" else "New" }}"
}

@PublishedApi // for a custom job in the test module
internal open class JobImpl(parent: Job?) : JobSupport(true), CompletableJob {
init { initParentJob(parent) }
override val onCancelComplete get() = true
Expand Down
Expand Up @@ -40,7 +40,7 @@ internal expect suspend inline fun recoverAndThrow(exception: Throwable): Nothin
* The opposite of [recoverStackTrace].
* It is guaranteed that `unwrap(recoverStackTrace(e)) === e`
*/
@PublishedApi // only published for the multiplatform tests in our own code
@PublishedApi // published for the multiplatform implementation of kotlinx-coroutines-test
internal expect fun <E: Throwable> unwrap(exception: E): E

internal expect class StackTraceElement
Expand Down
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/internal/ThreadSafeHeap.kt
Expand Up @@ -37,6 +37,16 @@ public open class ThreadSafeHeap<T> : SynchronizedObject() where T: ThreadSafeHe
_size.value = 0
}

public fun find(
predicate: (value: T) -> Boolean
): T? = synchronized(this) block@{
for (i in 0 until size) {
val value = a?.get(i)!!
if (predicate(value)) return@block value
}
null
}

public fun peek(): T? = synchronized(this) { firstImpl() }

public fun removeFirstOrNull(): T? = synchronized(this) {
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-test/api/kotlinx-coroutines-test.api
Expand Up @@ -105,6 +105,7 @@ public final class kotlinx/coroutines/test/TestDispatchers {
}

public abstract interface class kotlinx/coroutines/test/TestScope : kotlinx/coroutines/CoroutineScope {
public abstract fun getBackgroundScope ()Lkotlinx/coroutines/CoroutineScope;
public abstract fun getTestScheduler ()Lkotlinx/coroutines/test/TestCoroutineScheduler;
}

Expand Down
37 changes: 26 additions & 11 deletions kotlinx-coroutines-test/common/src/TestBuilders.kt
Expand Up @@ -164,15 +164,19 @@ public fun TestScope.runTest(
): TestResult = asSpecificImplementation().let {
it.enter()
createTestResult {
runTestCoroutine(it, dispatchTimeoutMs, TestScopeImpl::tryGetCompletionCause, testBody) { it.leave() }
runTestCoroutine(it, dispatchTimeoutMs, TestScopeImpl::tryGetCompletionCause, testBody) {
backgroundScope.cancel()
testScheduler.advanceUntilIdleOr { false }
it.leave()
}
}
}

/**
* Runs [testProcedure], creating a [TestResult].
*/
@Suppress("NO_ACTUAL_FOR_EXPECT") // actually suppresses `TestResult`
internal expect fun createTestResult(testProcedure: suspend () -> Unit): TestResult
internal expect fun createTestResult(testProcedure: suspend CoroutineScope.() -> Unit): TestResult

/** A coroutine context element indicating that the coroutine is running inside `runTest`. */
internal object RunningInRunTest : CoroutineContext.Key<RunningInRunTest>, CoroutineContext.Element {
Expand All @@ -195,7 +199,7 @@ internal const val DEFAULT_DISPATCH_TIMEOUT_MS = 60_000L
* The [cleanup] procedure may either throw [UncompletedCoroutinesError] to denote that child coroutines were leaked, or
* return a list of uncaught exceptions that should be reported at the end of the test.
*/
internal suspend fun <T: AbstractCoroutine<Unit>> runTestCoroutine(
internal suspend fun <T: AbstractCoroutine<Unit>> CoroutineScope.runTestCoroutine(
coroutine: T,
dispatchTimeoutMs: Long,
tryGetCompletionCause: T.() -> Throwable?,
Expand All @@ -216,16 +220,27 @@ internal suspend fun <T: AbstractCoroutine<Unit>> runTestCoroutine(
completed = true
continue
}
select<Unit> {
coroutine.onJoin {
completed = true
}
scheduler.onDispatchEvent {
// we received knowledge that `scheduler` observed a dispatch event, so we reset the timeout
// in case progress depends on some background work, we need to keep spinning it.
val backgroundWorkRunner = launch(CoroutineName("background work runner")) {
while (true) {
scheduler.tryRunNextTaskUnless { !isActive }
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
yield()
}
onTimeout(dispatchTimeoutMs) {
handleTimeout(coroutine, dispatchTimeoutMs, tryGetCompletionCause, cleanup)
}
try {
select<Unit> {
coroutine.onJoin {
completed = true
}
scheduler.onDispatchEvent {
// we received knowledge that `scheduler` observed a dispatch event, so we reset the timeout
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also worth noting that it doesn't receive events from background jobs

}
onTimeout(dispatchTimeoutMs) {
handleTimeout(coroutine, dispatchTimeoutMs, tryGetCompletionCause, cleanup)
}
}
} finally {
backgroundWorkRunner.cancelAndJoin()
}
}
coroutine.getCompletionExceptionOrNull()?.let { exception ->
Expand Down
Expand Up @@ -96,7 +96,7 @@ private class UnconfinedTestDispatcherImpl(
@Suppress("INVISIBLE_MEMBER")
override fun dispatch(context: CoroutineContext, block: Runnable) {
checkSchedulerInContext(scheduler, context)
scheduler.sendDispatchEvent()
scheduler.sendDispatchEvent(context)

/** copy-pasted from [kotlinx.coroutines.Unconfined.dispatch] */
/** It can only be called by the [yield] function. See also code of [yield] function. */
Expand Down Expand Up @@ -151,8 +151,7 @@ private class StandardTestDispatcherImpl(
) : TestDispatcher() {

override fun dispatch(context: CoroutineContext, block: Runnable) {
checkSchedulerInContext(scheduler, context)
scheduler.registerEvent(this, 0, block) { false }
scheduler.registerEvent(this, 0, block, context) { false }
}

override fun toString(): String = "${name ?: "StandardTestDispatcher"}[scheduler=$scheduler]"
Expand Down
62 changes: 42 additions & 20 deletions kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt
Expand Up @@ -62,17 +62,20 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
dispatcher: TestDispatcher,
timeDeltaMillis: Long,
marker: T,
context: CoroutineContext,
isCancelled: (T) -> Boolean
): DisposableHandle {
require(timeDeltaMillis >= 0) { "Attempted scheduling an event earlier in time (with the time delta $timeDeltaMillis)" }
checkSchedulerInContext(this, context)
val count = count.getAndIncrement()
val isForeground = context[BackgroundWork] === null
return synchronized(lock) {
val time = addClamping(currentTime, timeDeltaMillis)
val event = TestDispatchEvent(dispatcher, count, time, marker as Any) { isCancelled(marker) }
val event = TestDispatchEvent(dispatcher, count, time, marker as Any, isForeground) { isCancelled(marker) }
events.addLast(event)
/** can't be moved above: otherwise, [onDispatchEvent] could consume the token sent here before there's
* actually anything in the event queue. */
sendDispatchEvent()
sendDispatchEvent(context)
DisposableHandle {
synchronized(lock) {
events.remove(event)
Expand All @@ -82,10 +85,12 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
}

/**
* Runs the next enqueued task, advancing the virtual time to the time of its scheduled awakening.
* Runs the next enqueued task, advancing the virtual time to the time of its scheduled awakening,
* unless [condition] holds.
*/
private fun tryRunNextTask(): Boolean {
internal fun tryRunNextTaskUnless(condition: () -> Boolean): Boolean {
val event = synchronized(lock) {
if (condition()) return false
val event = events.removeFirstOrNull() ?: return false
if (currentTime > event.time)
currentTimeAheadOfEvents()
Expand All @@ -105,9 +110,15 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
* functionality, query [currentTime] before and after the execution to achieve the same result.
*/
@ExperimentalCoroutinesApi
public fun advanceUntilIdle() {
while (!synchronized(lock) { events.isEmpty }) {
tryRunNextTask()
public fun advanceUntilIdle(): Unit = advanceUntilIdleOr { events.none(TestDispatchEvent<*>::isForeground) }

/**
* [condition]: guaranteed to be invoked under the lock.
*/
internal fun advanceUntilIdleOr(condition: () -> Boolean) {
while (true) {
if (!tryRunNextTaskUnless(condition))
return
}
}

Expand Down Expand Up @@ -169,24 +180,19 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout
/**
* Checks that the only tasks remaining in the scheduler are cancelled.
*/
internal fun isIdle(strict: Boolean = true): Boolean {
internal fun isIdle(strict: Boolean = true): Boolean =
synchronized(lock) {
if (strict)
return events.isEmpty
// TODO: also completely empties the queue, as there's no nondestructive way to iterate over [ThreadSafeHeap]
val presentEvents = mutableListOf<TestDispatchEvent<*>>()
while (true) {
presentEvents += events.removeFirstOrNull() ?: break
}
return presentEvents.all { it.isCancelled() }
if (strict) events.isEmpty else events.none { !it.isCancelled() }
}
}

/**
* Notifies this scheduler about a dispatch event.
*
* [context] is the context in which the task will be dispatched.
*/
internal fun sendDispatchEvent() {
dispatchEvents.trySend(Unit)
internal fun sendDispatchEvent(context: CoroutineContext) {
if (context[BackgroundWork] !== BackgroundWork)
dispatchEvents.trySend(Unit)
}

/**
Expand Down Expand Up @@ -216,6 +222,8 @@ private class TestDispatchEvent<T>(
private val count: Long,
@JvmField val time: Long,
@JvmField val marker: T,
@JvmField val isForeground: Boolean,
// TODO: remove once the deprecated API is gone
@JvmField val isCancelled: () -> Boolean
) : Comparable<TestDispatchEvent<*>>, ThreadSafeHeapNode {
override var heap: ThreadSafeHeap<*>? = null
Expand All @@ -224,7 +232,7 @@ private class TestDispatchEvent<T>(
override fun compareTo(other: TestDispatchEvent<*>) =
compareValuesBy(this, other, TestDispatchEvent<*>::time, TestDispatchEvent<*>::count)

override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher)"
override fun toString() = "TestDispatchEvent(time=$time, dispatcher=$dispatcher${if (isForeground) "" else ", background"})"
}

// works with positive `a`, `b`
Expand All @@ -238,3 +246,17 @@ internal fun checkSchedulerInContext(scheduler: TestCoroutineScheduler, context:
}
}
}

/**
* A coroutine context key denoting that the work is to be executed in the background.
* @see [TestScope.backgroundScope]
*/
internal object BackgroundWork : CoroutineContext.Key<BackgroundWork>, CoroutineContext.Element {
override val key: CoroutineContext.Key<*>
get() = this

override fun toString(): String = "BackgroundWork"
}

private fun<T> ThreadSafeHeap<T>.none(predicate: (T) -> Boolean) where T: ThreadSafeHeapNode, T: Comparable<T> =
find(predicate) == null
13 changes: 5 additions & 8 deletions kotlinx-coroutines-test/common/src/TestDispatcher.kt
Expand Up @@ -10,14 +10,14 @@ import kotlin.jvm.*

/**
* A test dispatcher that can interface with a [TestCoroutineScheduler].
*
*
* The available implementations are:
* * [StandardTestDispatcher] is a dispatcher that places new tasks into a queue.
* * [UnconfinedTestDispatcher] is a dispatcher that behaves like [Dispatchers.Unconfined] while allowing to control
* the virtual time.
*/
@ExperimentalCoroutinesApi
public abstract class TestDispatcher internal constructor(): CoroutineDispatcher(), Delay {
public abstract class TestDispatcher internal constructor() : CoroutineDispatcher(), Delay {
/** The scheduler that this dispatcher is linked to. */
@ExperimentalCoroutinesApi
public abstract val scheduler: TestCoroutineScheduler
Expand All @@ -30,16 +30,13 @@ public abstract class TestDispatcher internal constructor(): CoroutineDispatcher

/** @suppress */
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
checkSchedulerInContext(scheduler, continuation.context)
val timedRunnable = CancellableContinuationRunnable(continuation, this)
scheduler.registerEvent(this, timeMillis, timedRunnable, ::cancellableRunnableIsCancelled)
scheduler.registerEvent(this, timeMillis, timedRunnable, continuation.context, ::cancellableRunnableIsCancelled)
}

/** @suppress */
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
checkSchedulerInContext(scheduler, context)
return scheduler.registerEvent(this, timeMillis, block) { false }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
scheduler.registerEvent(this, timeMillis, block, context) { false }
}

/**
Expand Down