Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore timeouts using virtual time #130

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ kotlin {
commonMain {
dependencies {
api "org.jetbrains.kotlinx:kotlinx-coroutines-core:${versions.coroutines}"
implementation 'org.jetbrains.kotlin:kotlin-test'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:${versions.coroutines}"
}
}
commonTest {
Expand Down
25 changes: 18 additions & 7 deletions src/commonMain/kotlin/app/cash/turbine/Turbine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
*/
package app.cash.turbine

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.withContext

internal const val debug = false

Expand Down Expand Up @@ -88,18 +91,26 @@ public operator fun <T> Turbine<T>.plusAssign(value: T) { add(value) }
/**
* Construct a standalone [Turbine].
*/
public fun <T> Turbine(): Turbine<T> = TurbineImpl()
public fun <T> Turbine(timeoutMs: Long? = null): Turbine<T> = TurbineImpl(timeoutMs = timeoutMs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timeout: Duration please! You should be able to keep safe types all the way down now, I think. There definitely was a delay overload but I'm not sure about withTimeout.


internal class TurbineImpl<T>(
channel: Channel<T> = Channel(UNLIMITED),
private val job: Job? = null,
private val timeoutMs: Long? = null,
) : Turbine<T> {
private suspend fun <T> withTurbineTimeout(block: suspend CoroutineScope.() -> T): T {
return if (timeoutMs != null) {
withTurbineTimeout(timeoutMs, block)
} else coroutineScope {
block()
}
}

private val channel = object : Channel<T> by channel {
override fun tryReceive(): ChannelResult<T> {
val result = channel.tryReceive()
val event = result.toEvent()
if (event is Event.Error || event is Event.Item) ignoreRemainingEvents = true
if (event is Event.Error || event is Event.Complete) ignoreRemainingEvents = true

return result
}
Expand Down Expand Up @@ -169,15 +180,15 @@ internal class TurbineImpl<T>(

override fun expectMostRecentItem(): T = channel.expectMostRecentItem()

override suspend fun awaitEvent(): Event<T> = channel.awaitEvent()
override suspend fun awaitEvent(): Event<T> = withTurbineTimeout { channel.awaitEvent() }

override suspend fun awaitItem(): T = channel.awaitItem()
override suspend fun awaitItem(): T = withTurbineTimeout { channel.awaitItem() }

override suspend fun skipItems(count: Int) = channel.skipItems(count)
override suspend fun skipItems(count: Int) = withTurbineTimeout { channel.skipItems(count) }

override suspend fun awaitComplete() = channel.awaitComplete()
override suspend fun awaitComplete() = withTurbineTimeout { channel.awaitComplete() }

override suspend fun awaitError(): Throwable = channel.awaitError()
override suspend fun awaitError(): Throwable = withTurbineTimeout { channel.awaitError() }

override fun ensureAllEventsConsumed() {
if (ignoreRemainingEvents) return
Expand Down
52 changes: 46 additions & 6 deletions src/commonMain/kotlin/app/cash/turbine/channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@
*/
package app.cash.turbine

import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.TimeoutCancellationException
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.delay
import kotlinx.coroutines.withTimeout
import kotlinx.coroutines.test.TestCoroutineScheduler

/**
* Returns the most recent item that has already been received.
Expand Down Expand Up @@ -64,16 +70,44 @@ public fun <T> ReceiveChannel<T>.expectNoEvents() {
*
* This function will always return a terminal event on a closed [ReceiveChannel].
*/
public suspend fun <T> ReceiveChannel<T>.awaitEvent(): Event<T> =
try {
Event.Item(receive())
@OptIn(ExperimentalCoroutinesApi::class)
public suspend fun <T> ReceiveChannel<T>.awaitEvent(): Event<T> {
val timeoutMs = contextTimeout()
val testScheduler = coroutineContext[TestCoroutineScheduler]
return try {
withTimeout(timeoutMs) {
val item = if (testScheduler == null) {
// With no test scheduler, let receive() expire the timeout. This will use wallclock time.
receive()
} else {
// *With* a test scheduler, we must poll and nudge the clock
// until some kind of result is produced.
Comment on lines +83 to +84
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about this. Before removing timeout, I changed it to always be wallclock time and never dispatcher time. If you want to assert a timeout on the dispatcher's fake time then you can write assertFailsWith<TimeoutException> { withTimeout(1.seconds) { awaitEvent() } }. Turbine's timeout was always meant to be a hang detector. I don't like that this makes the behavior subtle and reliant on factors which are hard to document and hard to discern when glancing at a test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two points need to be responded to: (1) whether timeouts have a role other than bailing out hung code, and (2) whether withTimeout(1.seconds) is sufficient as an implementation.

  1. Hangs are garden variety error conditions in CSP-style concurrent code; timeouts at receive sites provide actionable feedback on why these hangs occur.

Here's an example of a test that fails by hanging:

@Test fun logsSubmitClicked() = runTest {
  val presenter = Presenter(fakeAnalytics, fakeAppService)
  fakeAppService.responses += ApiResult(Success("What's new today"))
  presenter.models(events).test {
    assertThat(awaitItem()).isEqualTo(Loading)
    events.emit(SubmiClicked())
    assertThat(fakeAnalytics.events.awaitItem()).isEqualTo(SubmitClickAnalyticsEvent())
    assertThat(awaitItem()).isEqualTo(Content("What's new today"))
  }
}

Some failure modes for this test:

  • Loading isn't emitted first, but skips straight to Content
  • Incorrect Content
  • Analytics event is never emitted
  • Content is never emitted

Half of those will present as a hung test. Seeing exactly which one of these happened in CI is invaluable.

  1. I had hoped that this:
runTest {
  flow.test {
    assertFailsWith<TimeoutException> { withTimeout(1.seconds) { awaitEvent() } }
  }
}

...would work with our new runTest. I initially wrote it that way, and unfortunately it does not: withTimeout(1.seconds) requires a concurrent coroutine to tick the virtual clock in order to advance. This makes it useless as a testing API.

I don't like that this makes the behavior subtle and reliant on factors which are hard to document and hard to discern when glancing at a test.

The delay(timeoutMs / 10) is the real bugaboo that needs answering.

I am able to convince myself that automatic manipulation of virtual time is worth it, because

  1. It speeds iteration on failing tests that timeout
  2. it makes it possible to implement awaitNoEvents(). awaitNoEvents() is something colleagues have asked for that solves some real testing problems, and it can't realistically be done on wall clock time. (it'd have to do something hacky like repeat(10) { yield() }; expectNoEvents()) My answer thus far has been, "Tough, you can't logically assert that nothing happens in response to a stimulus," but it sure would be nice to actually solve the problem instead being a difficult theory nerd.

Implicitly advancing the clock is a pretty hard nut to swallow, though, and delay(timeout / 10) even more so. delay(timeout / 10) can probably be rewritten to be delay(10) and solve the problem without being too confusing, but if you think that implicitly advancing virtual time will cause too much confusion to be worth the advantages above, then this approach should be ditched in favor of wall clock time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we want to land on this? If you're not comfortable with the virtual time thing, I would like to get wall clock merged as soon as possible. Without it, I don't know how to tell people how to use 0.9. I keep on hacking around it myself.

Re: details of wall clock time, I am concerned about the need to use Dispatchers.Default. It doesn't feel safe. Even if we did something like this:

  val callingContext = currentCoroutineContext()
  withContext(Dispatchers.Default) {
    withTimeout(timeoutMs) {
      withContext(callingContext) {
        receive()
      }
    }
  }

...having multiple threads involved makes me antsy.

val value: T
while (true) {
val result = tryReceive()
if (result.isFailure && !result.isClosed) {
delay(timeoutMs / 10)
} else if (result.isFailure && result.isClosed) {
throw (result.exceptionOrNull() ?: ClosedReceiveChannelException(null))
} else {
value = result.getOrThrow()
break
}
}
value
}
Event.Item(item)
}
} catch (e: TimeoutCancellationException) {
throw AssertionError("No value produced in ${timeoutMs}ms")
} catch (e: CancellationException) {
throw e
} catch (e: ClosedReceiveChannelException) {
Event.Complete
} catch (e: Exception) {
Event.Error(e)
}
}

/**
* Assert that the next event received was non-null and return it.
Expand Down Expand Up @@ -130,11 +164,15 @@ public fun <T> ReceiveChannel<T>.takeError(): Throwable {
*
* @throws AssertionError if the next event was completion or an error.
*/
public suspend fun <T> ReceiveChannel<T>.awaitItem(): T =
public suspend fun <T> ReceiveChannel<T>.awaitItem(): T = try {
when (val result = awaitEvent()) {
is Event.Item -> result.value
else -> unexpectedEvent(result, "item")
}
} catch (e: Exception) {
println("Caught it! $e")
throw e
Comment on lines +172 to +174
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert?

}

/**
* Assert that [count] item events were received and ignore them.
Expand All @@ -149,7 +187,8 @@ public suspend fun <T> ReceiveChannel<T>.skipItems(count: Int) {
val cause = (event as? Event.Error)?.throwable
throw TurbineAssertionError("Expected $count items but got $index items and $event", cause)
}
is Event.Item<T> -> { /* Success */ }
is Event.Item<T> -> { /* Success */
}
Comment on lines +190 to +191
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Revert. Did an autoformatter do this? We can change it to like

-> {
  // Success
}

or something to prevent it from interfering.

}
}
}
Expand All @@ -173,7 +212,7 @@ public suspend fun <T> ReceiveChannel<T>.awaitComplete() {
*
* @throws AssertionError if the next event was an item or completion.
*/
public suspend fun <T> ReceiveChannel<T>.awaitError(): Throwable {
public suspend fun <T> ReceiveChannel<T>.awaitError(): Throwable {
val event = awaitEvent()
return (event as? Event.Error)?.throwable
?: unexpectedEvent(event, "error")
Expand All @@ -187,6 +226,7 @@ internal fun <T> ChannelResult<T>.toEvent(): Event<T>? {
else if (isClosed) Event.Complete
else null
}

private fun <T> ChannelResult<T>.unexpectedResult(expected: String): Nothing = unexpectedEvent(toEvent(), expected)

private fun unexpectedEvent(event: Event<*>?, expected: String): Nothing {
Expand Down
27 changes: 27 additions & 0 deletions src/commonMain/kotlin/app/cash/turbine/coroutines.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,22 @@
*/
package app.cash.turbine

import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.withContext

public const val DEFAULT_TIMEOUT_MS: Long = 1000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem like it needs to be public API.


/**
* Sets the timeout for all [Turbine] instances within this context.
*/
public suspend fun <T> withTurbineTimeout(timeoutMs: Long, block: suspend CoroutineScope.() -> T): T {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be public API? When would you not be in control of the turbine instantiation such that you could just pass the desired value as a parameter directly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of scenario is what I was thinking of:

@Test fun myTest() = runTest {
  val subject = Subject(this /* CoroutineScope */, fakeCollaborator)

  subject.doSomething()

  assertThat(fakeCollaborator.outputTurbine.awaitItem()).isEqualTo("hi there")
}

Wrapping a block like this with withTurbineTimeout makes it possible to assign delays to any fake interactions, even if .test { ... } isn't being used.

return withContext(TurbineTimeoutElement(timeoutMs)) {
block()
}
}

/**
* Invoke this method to throw an error when your method is not being called by a suspend fun.
*
Expand Down Expand Up @@ -44,3 +60,14 @@ internal fun assertCallingContextIsNotSuspended() {
error("Calling context is suspending; use a suspending method instead")
}
}

internal class TurbineTimeoutElement(
val timeout: Long,
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<TurbineTimeoutElement>

override val key: CoroutineContext.Key<*> = Key
}
internal suspend fun contextTimeout(): Long {
return currentCoroutineContext()[TurbineTimeoutElement.Key]?.timeout ?: DEFAULT_TIMEOUT_MS
}
55 changes: 17 additions & 38 deletions src/commonMain/kotlin/app/cash/turbine/flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,11 @@ import kotlinx.coroutines.launch
* }
* ```
*/
@Deprecated("Timeout parameter removed. Use runTest which has a timeout or wrap in withTimeout.",
ReplaceWith("this.test(validate)"),
DeprecationLevel.ERROR,
)
@Suppress("UNUSED_PARAMETER")
public suspend fun <T> Flow<T>.test(
timeoutMs: Long,
timeout: Duration,
validate: suspend ReceiveTurbine<T>.() -> Unit,
) {
test(validate)
}

/**
* Terminal flow operator that collects events from given flow and allows the [validate] lambda to
* consume and assert properties on them in order. If any exception occurs during validation the
* exception is rethrown from this method.
*
* ```kotlin
* flowOf("one", "two").test {
* assertEquals("one", expectItem())
* assertEquals("two", expectItem())
* expectComplete()
* }
* ```
*/
@Deprecated("Timeout parameter removed. Use runTest which has a timeout or wrap in withTimeout.",
ReplaceWith("this.test(validate)"),
DeprecationLevel.ERROR,
)
@Suppress("UNUSED_PARAMETER")
public suspend fun <T> Flow<T>.test(
timeout: Duration = 1.seconds,
validate: suspend ReceiveTurbine<T>.() -> Unit,
) {
test(validate)
test(timeoutMs = timeout.inWholeMilliseconds, validate)
}

/**
Expand All @@ -92,10 +62,11 @@ public suspend fun <T> Flow<T>.test(
* ```
*/
public suspend fun <T> Flow<T>.test(
timeoutMs: Long? = null,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only public API should be Duration. We can delete the deprecated Long overloads completely now that 0.8 is shipped.

validate: suspend ReceiveTurbine<T>.() -> Unit,
) {
coroutineScope {
collectTurbineIn(this).apply {
collectTurbineIn(this, timeoutMs).apply {
validate()
cancel()
ensureAllEventsConsumed()
Expand All @@ -118,8 +89,8 @@ public suspend fun <T> Flow<T>.test(
* Unlike [test] which automatically cancels the flow at the end of the lambda, the returned
* [ReceiveTurbine] must either consume a terminal event (complete or error) or be explicitly canceled.
*/
public fun <T> Flow<T>.testIn(scope: CoroutineScope): ReceiveTurbine<T> {
val turbine = collectTurbineIn(scope)
public fun <T> Flow<T>.testIn(scope: CoroutineScope, timeoutMs: Long? = null): ReceiveTurbine<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Samesies

val turbine = collectTurbineIn(scope, timeoutMs)

scope.coroutineContext.job.invokeOnCompletion { exception ->
if (debug) println("Scope ending ${exception ?: ""}")
Expand All @@ -133,14 +104,22 @@ public fun <T> Flow<T>.testIn(scope: CoroutineScope): ReceiveTurbine<T> {
return turbine
}

private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope): Turbine<T> {
private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope, timeoutMs: Long?): Turbine<T> {
lateinit var outputBox: Channel<T>

val job = scope.launch(start = UNDISPATCHED) {
outputBox = collectIntoChannel(this)

val block: suspend CoroutineScope.() -> Unit = {
outputBox = collectIntoChannel(this)
}
if (timeoutMs != null) {
withTurbineTimeout(timeoutMs, block)
} else {
block()
}
}

return TurbineImpl(outputBox, job)
return TurbineImpl(outputBox, job, timeoutMs = null)
}

internal fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> {
Expand Down
37 changes: 37 additions & 0 deletions src/commonTest/kotlin/app/cash/turbine/ChannelTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flow
Expand All @@ -42,6 +47,7 @@ import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.TestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeoutOrNull

class ChannelTest {
@Test
Expand Down Expand Up @@ -227,6 +233,37 @@ class ChannelTest {
assertEquals("Expected error but found Complete", actual.message)
}

@Test fun failsOnDefaultTimeout() = runTest {
assertFailsWith<AssertionError> {
coroutineScope {
neverFlow().collectIntoChannel(this).awaitItem()
}
}
}

@Test fun awaitHonorsCoroutineContextTimeoutNoTimeout() = runTest {
withTurbineTimeout(5000) {
val job = launch {
neverFlow().collectIntoChannel(this).awaitItem()
}

delay(3000)
job.cancel()
}
}

@Test fun awaitHonorsCoroutineContextTimeoutTimeout() = runTest {
assertFailsWith<AssertionError> {
withTurbineTimeout(5000) {
launch {
neverFlow().collectIntoChannel(this).awaitItem()
}

delay(5000)
}
}
}

@Test fun takeItem() = withTestScope {
val item = Any()
val channel = flowOf(item).collectIntoChannel(this)
Expand Down
6 changes: 4 additions & 2 deletions src/commonTest/kotlin/app/cash/turbine/FlowTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,10 @@ class FlowTest {
}

@Test fun expectCompleteButWasErrorThrows() = runTest {
emptyFlow<Nothing>().test {
awaitComplete()
assertFailsWith<AssertionError> {
flow<Nothing>{ throw RuntimeException() }.test {
awaitComplete()
}
}
}

Expand Down