-
Notifications
You must be signed in to change notification settings - Fork 100
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restore timeouts using virtual time #130
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,10 +15,16 @@ | |
*/ | ||
package app.cash.turbine | ||
|
||
import kotlin.coroutines.coroutineContext | ||
import kotlinx.coroutines.CancellationException | ||
import kotlinx.coroutines.ExperimentalCoroutinesApi | ||
import kotlinx.coroutines.TimeoutCancellationException | ||
import kotlinx.coroutines.channels.ChannelResult | ||
import kotlinx.coroutines.channels.ClosedReceiveChannelException | ||
import kotlinx.coroutines.channels.ReceiveChannel | ||
import kotlinx.coroutines.delay | ||
import kotlinx.coroutines.withTimeout | ||
import kotlinx.coroutines.test.TestCoroutineScheduler | ||
|
||
/** | ||
* Returns the most recent item that has already been received. | ||
|
@@ -64,16 +70,44 @@ public fun <T> ReceiveChannel<T>.expectNoEvents() { | |
* | ||
* This function will always return a terminal event on a closed [ReceiveChannel]. | ||
*/ | ||
public suspend fun <T> ReceiveChannel<T>.awaitEvent(): Event<T> = | ||
try { | ||
Event.Item(receive()) | ||
@OptIn(ExperimentalCoroutinesApi::class) | ||
public suspend fun <T> ReceiveChannel<T>.awaitEvent(): Event<T> { | ||
val timeoutMs = contextTimeout() | ||
val testScheduler = coroutineContext[TestCoroutineScheduler] | ||
return try { | ||
withTimeout(timeoutMs) { | ||
val item = if (testScheduler == null) { | ||
// With no test scheduler, let receive() expire the timeout. This will use wallclock time. | ||
receive() | ||
} else { | ||
// *With* a test scheduler, we must poll and nudge the clock | ||
// until some kind of result is produced. | ||
Comment on lines
+83
to
+84
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure about this. Before removing timeout, I changed it to always be wallclock time and never dispatcher time. If you want to assert a timeout on the dispatcher's fake time then you can write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two points need to be responded to: (1) whether timeouts have a role other than bailing out hung code, and (2) whether
Here's an example of a test that fails by hanging: @Test fun logsSubmitClicked() = runTest {
val presenter = Presenter(fakeAnalytics, fakeAppService)
fakeAppService.responses += ApiResult(Success("What's new today"))
presenter.models(events).test {
assertThat(awaitItem()).isEqualTo(Loading)
events.emit(SubmiClicked())
assertThat(fakeAnalytics.events.awaitItem()).isEqualTo(SubmitClickAnalyticsEvent())
assertThat(awaitItem()).isEqualTo(Content("What's new today"))
}
} Some failure modes for this test:
Half of those will present as a hung test. Seeing exactly which one of these happened in CI is invaluable.
runTest {
flow.test {
assertFailsWith<TimeoutException> { withTimeout(1.seconds) { awaitEvent() } }
}
} ...would work with our new
The I am able to convince myself that automatic manipulation of virtual time is worth it, because
Implicitly advancing the clock is a pretty hard nut to swallow, though, and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Where do we want to land on this? If you're not comfortable with the virtual time thing, I would like to get wall clock merged as soon as possible. Without it, I don't know how to tell people how to use 0.9. I keep on hacking around it myself. Re: details of wall clock time, I am concerned about the need to use val callingContext = currentCoroutineContext()
withContext(Dispatchers.Default) {
withTimeout(timeoutMs) {
withContext(callingContext) {
receive()
}
}
} ...having multiple threads involved makes me antsy. |
||
val value: T | ||
while (true) { | ||
val result = tryReceive() | ||
if (result.isFailure && !result.isClosed) { | ||
delay(timeoutMs / 10) | ||
} else if (result.isFailure && result.isClosed) { | ||
throw (result.exceptionOrNull() ?: ClosedReceiveChannelException(null)) | ||
} else { | ||
value = result.getOrThrow() | ||
break | ||
} | ||
} | ||
value | ||
} | ||
Event.Item(item) | ||
} | ||
} catch (e: TimeoutCancellationException) { | ||
throw AssertionError("No value produced in ${timeoutMs}ms") | ||
} catch (e: CancellationException) { | ||
throw e | ||
} catch (e: ClosedReceiveChannelException) { | ||
Event.Complete | ||
} catch (e: Exception) { | ||
Event.Error(e) | ||
} | ||
} | ||
|
||
/** | ||
* Assert that the next event received was non-null and return it. | ||
|
@@ -130,11 +164,15 @@ public fun <T> ReceiveChannel<T>.takeError(): Throwable { | |
* | ||
* @throws AssertionError if the next event was completion or an error. | ||
*/ | ||
public suspend fun <T> ReceiveChannel<T>.awaitItem(): T = | ||
public suspend fun <T> ReceiveChannel<T>.awaitItem(): T = try { | ||
when (val result = awaitEvent()) { | ||
is Event.Item -> result.value | ||
else -> unexpectedEvent(result, "item") | ||
} | ||
} catch (e: Exception) { | ||
println("Caught it! $e") | ||
throw e | ||
Comment on lines
+172
to
+174
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert? |
||
} | ||
|
||
/** | ||
* Assert that [count] item events were received and ignore them. | ||
|
@@ -149,7 +187,8 @@ public suspend fun <T> ReceiveChannel<T>.skipItems(count: Int) { | |
val cause = (event as? Event.Error)?.throwable | ||
throw TurbineAssertionError("Expected $count items but got $index items and $event", cause) | ||
} | ||
is Event.Item<T> -> { /* Success */ } | ||
is Event.Item<T> -> { /* Success */ | ||
} | ||
Comment on lines
+190
to
+191
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Revert. Did an autoformatter do this? We can change it to like -> {
// Success
} or something to prevent it from interfering. |
||
} | ||
} | ||
} | ||
|
@@ -173,7 +212,7 @@ public suspend fun <T> ReceiveChannel<T>.awaitComplete() { | |
* | ||
* @throws AssertionError if the next event was an item or completion. | ||
*/ | ||
public suspend fun <T> ReceiveChannel<T>.awaitError(): Throwable { | ||
public suspend fun <T> ReceiveChannel<T>.awaitError(): Throwable { | ||
val event = awaitEvent() | ||
return (event as? Event.Error)?.throwable | ||
?: unexpectedEvent(event, "error") | ||
|
@@ -187,6 +226,7 @@ internal fun <T> ChannelResult<T>.toEvent(): Event<T>? { | |
else if (isClosed) Event.Complete | ||
else null | ||
} | ||
|
||
private fun <T> ChannelResult<T>.unexpectedResult(expected: String): Nothing = unexpectedEvent(toEvent(), expected) | ||
|
||
private fun unexpectedEvent(event: Event<*>?, expected: String): Nothing { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,22 @@ | |
*/ | ||
package app.cash.turbine | ||
|
||
import kotlin.coroutines.CoroutineContext | ||
import kotlinx.coroutines.CoroutineScope | ||
import kotlinx.coroutines.currentCoroutineContext | ||
import kotlinx.coroutines.withContext | ||
|
||
public const val DEFAULT_TIMEOUT_MS: Long = 1000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Doesn't seem like it needs to be public API. |
||
|
||
/** | ||
* Sets the timeout for all [Turbine] instances within this context. | ||
*/ | ||
public suspend fun <T> withTurbineTimeout(timeoutMs: Long, block: suspend CoroutineScope.() -> T): T { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this need to be public API? When would you not be in control of the turbine instantiation such that you could just pass the desired value as a parameter directly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This kind of scenario is what I was thinking of: @Test fun myTest() = runTest {
val subject = Subject(this /* CoroutineScope */, fakeCollaborator)
subject.doSomething()
assertThat(fakeCollaborator.outputTurbine.awaitItem()).isEqualTo("hi there")
} Wrapping a block like this with |
||
return withContext(TurbineTimeoutElement(timeoutMs)) { | ||
block() | ||
} | ||
} | ||
|
||
/** | ||
* Invoke this method to throw an error when your method is not being called by a suspend fun. | ||
* | ||
|
@@ -44,3 +60,14 @@ internal fun assertCallingContextIsNotSuspended() { | |
error("Calling context is suspending; use a suspending method instead") | ||
} | ||
} | ||
|
||
internal class TurbineTimeoutElement( | ||
val timeout: Long, | ||
) : CoroutineContext.Element { | ||
companion object Key : CoroutineContext.Key<TurbineTimeoutElement> | ||
|
||
override val key: CoroutineContext.Key<*> = Key | ||
} | ||
internal suspend fun contextTimeout(): Long { | ||
return currentCoroutineContext()[TurbineTimeoutElement.Key]?.timeout ?: DEFAULT_TIMEOUT_MS | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,41 +41,11 @@ import kotlinx.coroutines.launch | |
* } | ||
* ``` | ||
*/ | ||
@Deprecated("Timeout parameter removed. Use runTest which has a timeout or wrap in withTimeout.", | ||
ReplaceWith("this.test(validate)"), | ||
DeprecationLevel.ERROR, | ||
) | ||
@Suppress("UNUSED_PARAMETER") | ||
public suspend fun <T> Flow<T>.test( | ||
timeoutMs: Long, | ||
timeout: Duration, | ||
validate: suspend ReceiveTurbine<T>.() -> Unit, | ||
) { | ||
test(validate) | ||
} | ||
|
||
/** | ||
* Terminal flow operator that collects events from given flow and allows the [validate] lambda to | ||
* consume and assert properties on them in order. If any exception occurs during validation the | ||
* exception is rethrown from this method. | ||
* | ||
* ```kotlin | ||
* flowOf("one", "two").test { | ||
* assertEquals("one", expectItem()) | ||
* assertEquals("two", expectItem()) | ||
* expectComplete() | ||
* } | ||
* ``` | ||
*/ | ||
@Deprecated("Timeout parameter removed. Use runTest which has a timeout or wrap in withTimeout.", | ||
ReplaceWith("this.test(validate)"), | ||
DeprecationLevel.ERROR, | ||
) | ||
@Suppress("UNUSED_PARAMETER") | ||
public suspend fun <T> Flow<T>.test( | ||
timeout: Duration = 1.seconds, | ||
validate: suspend ReceiveTurbine<T>.() -> Unit, | ||
) { | ||
test(validate) | ||
test(timeoutMs = timeout.inWholeMilliseconds, validate) | ||
} | ||
|
||
/** | ||
|
@@ -92,10 +62,11 @@ public suspend fun <T> Flow<T>.test( | |
* ``` | ||
*/ | ||
public suspend fun <T> Flow<T>.test( | ||
timeoutMs: Long? = null, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The only public API should be |
||
validate: suspend ReceiveTurbine<T>.() -> Unit, | ||
) { | ||
coroutineScope { | ||
collectTurbineIn(this).apply { | ||
collectTurbineIn(this, timeoutMs).apply { | ||
validate() | ||
cancel() | ||
ensureAllEventsConsumed() | ||
|
@@ -118,8 +89,8 @@ public suspend fun <T> Flow<T>.test( | |
* Unlike [test] which automatically cancels the flow at the end of the lambda, the returned | ||
* [ReceiveTurbine] must either consume a terminal event (complete or error) or be explicitly canceled. | ||
*/ | ||
public fun <T> Flow<T>.testIn(scope: CoroutineScope): ReceiveTurbine<T> { | ||
val turbine = collectTurbineIn(scope) | ||
public fun <T> Flow<T>.testIn(scope: CoroutineScope, timeoutMs: Long? = null): ReceiveTurbine<T> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Samesies |
||
val turbine = collectTurbineIn(scope, timeoutMs) | ||
|
||
scope.coroutineContext.job.invokeOnCompletion { exception -> | ||
if (debug) println("Scope ending ${exception ?: ""}") | ||
|
@@ -133,14 +104,22 @@ public fun <T> Flow<T>.testIn(scope: CoroutineScope): ReceiveTurbine<T> { | |
return turbine | ||
} | ||
|
||
private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope): Turbine<T> { | ||
private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope, timeoutMs: Long?): Turbine<T> { | ||
lateinit var outputBox: Channel<T> | ||
|
||
val job = scope.launch(start = UNDISPATCHED) { | ||
outputBox = collectIntoChannel(this) | ||
|
||
val block: suspend CoroutineScope.() -> Unit = { | ||
outputBox = collectIntoChannel(this) | ||
} | ||
if (timeoutMs != null) { | ||
withTurbineTimeout(timeoutMs, block) | ||
} else { | ||
block() | ||
} | ||
} | ||
|
||
return TurbineImpl(outputBox, job) | ||
return TurbineImpl(outputBox, job, timeoutMs = null) | ||
} | ||
|
||
internal fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout: Duration
please! You should be able to keep safe types all the way down now, I think. There definitely was adelay
overload but I'm not sure aboutwithTimeout
.