Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revisit documentation #161

Merged
merged 1 commit into from Nov 3, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
293 changes: 213 additions & 80 deletions README.md
Expand Up @@ -46,20 +46,86 @@ dependencies {

## Usage

The entrypoint for the library is the `test` extension for `Flow<T>` which accepts a validation
block. Like `collect`, `test` is a suspending function that will not return until the flow is
complete or canceled.
A `Turbine` is a thin wrapper over a `Channel` with an API designed for testing.

You can call `awaitItem()` to suspend and wait for an item to be sent to the `Turbine`:

```kotlin
assertEquals("one", turbine.awaitItem())
```

...`awaitComplete()` to suspend until the `Turbine` completes without an exception:

```kotlin
turbine.awaitComplete()
```

...or `awaitError()` to suspend until the `Turbine` completes with a `Throwable`.

```kotlin
assertEquals("broken!", turbine.awaitError().message)
```

If `await*` is called and nothing happens, `Turbine` will timeout and fail instead of hanging.

When you are done with a `Turbine`, you can clean up by calling `cancel()` to terminate any backing coroutines.
Finally, you can assert that all events were consumed by calling `ensureAllEventsConsumed()`.


### Single Flow

The simplest way to create and run a `Turbine` is produce one from a `Flow`.
To test a single `Flow`, call the `test` extension:

```kotlin
someFlow.test {
// Validation code here!
}
```

### Consuming Events
`test` launches a new coroutine, calls `someFlow.collect`, and feeds the results into a `Turbine`.
Then it calls the validation block, passing in the read-only `ReceiveTurbine` interface as a receiver:

Inside the `test` block you must consume all received events from the flow. Failing to consume all
events will fail your test.
```kotlin
flowOf("one").test {
assertEquals("one", awaitItem())
assertComplete()
}
```

When the validation block is complete, `test` cancels the coroutine and calls `ensureAllEventsConsumed()`.

### Multiple Flows

To test multiple flows, assign each `Turbine` to a separate `val` by calling `testIn` instead:

```kotlin
runTest {
val turbine1 = flowOf(1).testIn(backgroundScope)
val turbine2 = flowOf(2).testIn(backgroundScope)
assertEquals(1, turbine1.awaitItem())
assertEquals(2, turbine2.awaitItem())
turbine1.awaitComplete()
turbine2.awaitComplete()
}
```

Like `test`, `testIn` produces a `ReceiveTurbine`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReceiveTurbine as a type was not mentioned above in the test section.

That section probably just needs a quick tweak like

Then it calls the validation block, passing in the read-only interface ReceiveTurbine as the receiver:

`ensureAllEventsConsumed()` will be invoked when the calling coroutine completes.

`testIn` cannot automatically clean up its coroutine, so it is up to you to ensure that the running flow terminates.
Use `runTest`'s `backgroundScope`, and it will take care of this automatically.
Otherwise, make sure to call one of the following methods before the end of your scope:

* `cancel()`
* `awaitComplete()`
* `awaitError()`

Otherwise, your test will hang.

### Consuming All Events

Failing to consume all events before the end of a flow-based `Turbine`'s validation block will fail your test:

```kotlin
flowOf("one", "two").test {
Expand All @@ -73,16 +139,20 @@ Exception in thread "main" AssertionError:
- Complete
```

As the exception indicates, consuming the `"two"` item is not enough. The complete event must
also be consumed.
The same goes for `testIn`, but at the end of the calling coroutine:

```kotlin
flowOf("one", "two").test {
assertEquals("one", awaitItem())
assertEquals("two", awaitItem())
awaitComplete()
runTest {
val turbine = flowOf("one", "two").testIn(backgroundScope)
turbine.assertEquals("one", awaitItem())
}
```
```
Exception in thread "main" AssertionError:
Unconsumed events found:
- Item(two)
- Complete
```

Received events can be explicitly ignored, however.

Expand Down Expand Up @@ -112,10 +182,12 @@ flowOf("one", "two", "three")
}
```

### Consuming Errors

Unlike `collect`, a flow which causes an exception will still be exposed as an event that you
must consume.
### Flow Termination

Flow termination events (exceptions and completion) are exposed as events which must be consumed for validation.
So, for example, throwing a `RuntimeException` inside of your `flow` will not throw an exception in your test.
It will instead produce a Turbine error event:

```kotlin
flow { throw RuntimeException("broken!") }.test {
Expand All @@ -127,22 +199,76 @@ Failure to consume an error will result in the same unconsumed event exception a
with the exception added as the cause so that the full stacktrace is available.

```kotlin
flow { throw RuntimeException("broken!") }.test { }
flow<Nothing> { throw RuntimeException("broken!") }.test { }
```
```
java.lang.AssertionError: Unconsumed events found:
app.cash.turbine.TurbineAssertionError: Unconsumed events found:
- Error(RuntimeException)
at app.cash.turbine.ChannelBasedFlowTurbine.ensureAllEventsConsumed(FlowTurbine.kt:240)
... 53 more
at app//app.cash.turbine.ChannelTurbine.ensureAllEventsConsumed(Turbine.kt:215)
... 80 more
Caused by: java.lang.RuntimeException: broken!
at example.MainKt$main$1.invokeSuspend(Main.kt:7)
... 32 more
at example.MainKt$main$1.invokeSuspend(FlowTest.kt:652)
... 105 more
```

### Asynchronous Flows
### Standalone Turbines

Calls to `awaitItem()`, `awaitComplete()`, and `awaitError()` are suspending and will wait
for events from asynchronous flows.
In addition to `ReceiveTurbine`s created from flows, standalone `Turbine`s can be used to communicate with test code outside of a flow.
Use them everywhere, and you might never need `runCurrent()` again.
Here's an example of how to use `Turbine()` in a fake:

```kotlin
class FakeNavigator : Navigator {
val goTos = Turbine<Screen>()

override fun goTo(screen: Screen) {
goTos.add(screen)
}
}
```
```kotlin
runTest {
val navigator = FakeNavigator()
val events: Flow<UiEvent> =
MutableSharedFlow<UiEvent>(extraBufferCapacity = 50)
val models: Flow<UiModel> =
makePresenter(navigator).present(events)
models.test {
assertEquals(UiModel(title = "Hi there"), awaitItem())
events.emit(UiEvent.Close)
assertEquals(Screens.Back, navigator.goTos.awaitItem())
}
}
```

### Standalone Turbine Compat APIs

To support codebases with a mix of coroutines and non-coroutines code, standalone `Turbine` includes non-suspending compat APIs.
All the `await` methods have equivalent `take` methods that are non-suspending:

```kotlin
val navigator = FakeNavigator()
val events: PublishRelay<UiEvent> = PublishRelay.create()

val models: Observable<UiModel> =
makePresenter(navigator).present(events)
val testObserver = models.test()
testObserver.assertValue(UiModel(title = "Hi there"))
events.accept(UiEvent.Close)
assertEquals(Screens.Back, navigator.goTos.takeItem())
```

Use `takeItem()` and friends, and `Turbine` behaves like simple queue; use `awaitItem()` and friends, and it's a `Turbine`.

These methods should only be used from a non-suspending context.
On JVM platforms, they will throw when used from a suspending context.

### Asynchronicity and Turbine

Flows are asynchronous by default. Your flow is collected concurrently by Turbine alongside your test code.

Handling this asynchronicity works the same way with Turbine as it does in production coroutines code:
instead of using tools like `runCurrent()` to "push" an asynchronous flow along, `Turbine`'s `awaitItem()`, `awaitComplete()`, and `awaitError()` "pull" them along by parking until a new event is ready.

```kotlin
channelFlow {
Expand All @@ -156,8 +282,8 @@ channelFlow {
}
```

Asynchronous flows can be canceled at any time so long as you have consumed all emitted events.
Allowing the `test` lambda to complete will implicitly cancel the flow.
Your validation code may run concurrently with the flow under test, but Turbine puts it in the driver's seat as much as possible:
`test` will end when your validation block is done executing, implicitly cancelling the flow under test.

```kotlin
channelFlow {
Expand Down Expand Up @@ -194,10 +320,33 @@ channelFlow {
}
```

### Hot Flows
### Names

Turbines can be named to improve error feedback.
Pass in a `name` to `test`, `testIn`, or `Turbine()`, and it will be included in any errors that are thrown:

```kotlin
runTest {
val turbine1 = flowOf(1).testIn(backgroundScope, name = "turbine 1")
val turbine2 = flowOf(2).testIn(backgroundScope, name = "turbine 2")
turbine1.awaitComplete()
turbine2.awaitComplete()
}
```
```
Expected complete for turbine 1 but found Item(1)
app.cash.turbine.TurbineAssertionError: Expected complete for turbine 1 but found Item(1)
at app//app.cash.turbine.ChannelKt.unexpectedEvent(channel.kt:258)
at app//app.cash.turbine.ChannelKt.awaitComplete(channel.kt:226)
at app//app.cash.turbine.ChannelKt$awaitComplete$1.invokeSuspend(channel.kt)
at app//kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
...
```

### Order of Execution & Shared Flows

Emissions to hot flows that don't have active consumers are dropped. Call `test` on a flow _before_
emitting or the item will be missed.
Shared flows are sensitive to order of execution.
Calling `emit` before calling `collect` will drop the emitted value:

```kotlin
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
Expand All @@ -207,15 +356,16 @@ mutableSharedFlow.test {
}
```
```
kotlinx.coroutines.test.UncompletedCoroutinesError: After waiting for 60000 ms, the test coroutine is not completing, there were active child jobs: [ScopeCoroutine{Completing}@478db956]
at app//kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTestCoroutine$3$3.invokeSuspend(TestBuilders.kt:304)
at ???(Coroutine boundary.?(?)
at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt.runTestCoroutine(TestBuilders.kt:288)
at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTest$1$1.invokeSuspend(TestBuilders.kt:167)
at kotlinx.coroutines.test.TestBuildersJvmKt$createTestResult$1.invokeSuspend(TestBuildersJvm.kt:13)
No value produced in 1s
java.lang.AssertionError: No value produced in 1s
at app.cash.turbine.ChannelKt.awaitEvent(channel.kt:90)
at app.cash.turbine.ChannelKt$awaitEvent$1.invokeSuspend(channel.kt)
(Coroutine boundary)
at kotlinx.coroutines.test.TestBuildersKt__TestBuildersKt$runTestCoroutine$2.invokeSuspend(TestBuilders.kt:212)
```

Proper usage of Turbine with hot flows looks like the following.
Turbine's `test` and `testIn` methods guarantee that the flow under test will run up to the first suspension point before proceeding.
So calling `test` on a shared flow _before_ emitting will not drop:

```kotlin
val mutableSharedFlow = MutableSharedFlow<Int>(replay = 0)
Expand All @@ -225,70 +375,53 @@ mutableSharedFlow.test {
}
```

The hot flow types Kotlin currently provides are:
If your code collects on shared flows, ensure that it does so promptly to have a lovely experience.

The shared flow types Kotlin currently provides are:
* `MutableStateFlow`
* `StateFlow`
* `MutableSharedFlow`
* `SharedFlow`
* Channels converted to flow with `Channel.consumeAsFlow`

### Multiple turbines
### Timeouts

Turbine applies a timeout whenever it waits for an event.
This is a wall clock time timeout that ignores `runTest`'s virtual clock time.

Multiple flows can be tested at once using the `testIn` function which returns the turbine test
object which would otherwise be used as a lambda receiver in the `test` function.
The default timeout length is one second. This can be overridden by passing a timeout duration to `test`:

```kotlin
runTest {
val turbine1 = flowOf(1).testIn(this)
val turbine2 = flowOf(2).testIn(this)
assertEquals(1, turbine1.awaitItem())
assertEquals(2, turbine2.awaitItem())
turbine1.awaitComplete()
turbine2.awaitComplete()
flowOf("one", "two").test(timeout = 10.milliseconds) {
...
}
```

Unconsumed events will throw an exception when the scope ends.
This timeout will be used for all Turbine-related calls inside the validation block.

You can also override the timeout for Turbines created with `testIn` and `Turbine()`:

```kotlin
runTest {
val turbine1 = flowOf(1).testIn(this)
val turbine2 = flowOf(2).testIn(this)
assertEquals(1, turbine1.awaitItem())
assertEquals(2, turbine2.awaitItem())
turbine1.awaitComplete()
// turbine2.awaitComplete() <-- NEWLY COMMENTED OUT
}
```
```
kotlinx.coroutines.CompletionHandlerException: Exception in completion handler InvokeOnCompletion@6d167f58[job@3403e2ac] for TestScope[test started]
at app//kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:320)
at app//kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:295)
... 70 more
Caused by: app.cash.turbine.AssertionError: Unconsumed events found:
- Complete
at app//app.cash.turbine.ChannelBasedFlowTurbine.ensureAllEventsConsumed(FlowTurbine.kt:333)
at app//app.cash.turbine.FlowTurbineKt$testIn$1.invoke(FlowTurbine.kt:115)
at app//app.cash.turbine.FlowTurbineKt$testIn$1.invoke(FlowTurbine.kt:112)
at app//kotlinx.coroutines.InvokeOnCompletion.invoke(JobSupport.kt:1391)
at app//kotlinx.coroutines.JobSupport.completeStateFinalization(JobSupport.kt:318)
... 72 more
val standalone = Turbine<String>(timeout = 10.milliseconds)
val flow = flowOf("one").testIn(
scope = backgroundScope,
timeout = 10.milliseconds,
)
```

Unlike the `test` lambda, flows are not automatically canceled. Long-running asynchronous or
infinite flows must be explicitly canceled.
These timeout overrides only apply to the `Turbine` on which they were applied.

Finally, you can also change the timeout for a whole block of code using `withTurbineTimeout`:

```kotlin
runTest {
val state = MutableStateFlow(1)
val turbine = state.testIn(this)
assertEquals(1, turbine.awaitItem())
state.emit(2)
assertEquals(2, turbine.awaitItem())
turbine.cancel()
withTurbineTimeout(10.milliseconds) {
...
}
```

### Channel Extensions

Most of Turbine's APIs are implemented as extensions on `Channel`.
The more limited API surface of `Turbine` is usually preferable, but these extensions are also available as public APIs if you need them.

# License

Expand Down