From b9ccd28ea214d104bb92e1539e5325268d41a1d6 Mon Sep 17 00:00:00 2001 From: Bill Phillips Date: Tue, 11 Oct 2022 10:25:05 -0700 Subject: [PATCH] Add name API (#158) Add name API Co-authored-by: Jake Wharton --- .../kotlin/app/cash/turbine/Turbine.kt | 32 ++++++---- .../kotlin/app/cash/turbine/channel.kt | 62 ++++++++++-------- .../kotlin/app/cash/turbine/flow.kt | 15 +++-- .../kotlin/app/cash/turbine/ChannelTest.kt | 63 ++++++++++++++++++ .../app/cash/turbine/FlowInScopeTest.kt | 42 ++++++++++++ .../kotlin/app/cash/turbine/FlowTest.kt | 54 ++++++++++++++++ .../kotlin/app/cash/turbine/TurbineTest.kt | 64 +++++++++++++++++++ 7 files changed, 287 insertions(+), 45 deletions(-) diff --git a/src/commonMain/kotlin/app/cash/turbine/Turbine.kt b/src/commonMain/kotlin/app/cash/turbine/Turbine.kt index e70e43d7..6342652b 100644 --- a/src/commonMain/kotlin/app/cash/turbine/Turbine.kt +++ b/src/commonMain/kotlin/app/cash/turbine/Turbine.kt @@ -91,14 +91,19 @@ public operator fun Turbine.plusAssign(value: T) { add(value) } * * @param timeout If non-null, overrides the current Turbine timeout for this [Turbine]. See also: * [withTurbineTimeout]. + * @param name If non-null, name is added to any exceptions thrown to help identify which [Turbine] failed. */ @Suppress("FunctionName") // Interface constructor pattern. -public fun Turbine(timeout: Duration? = null): Turbine = ChannelTurbine(timeout = timeout) +public fun Turbine( + timeout: Duration? = null, + name: String? = null, +): Turbine = ChannelTurbine(timeout = timeout, name = name) internal class ChannelTurbine( channel: Channel = Channel(UNLIMITED), private val job: Job? = null, private val timeout: Duration?, + private val name: String?, ) : Turbine { private suspend fun withTurbineTimeout(block: suspend () -> T): T { return if (timeout != null) { @@ -145,13 +150,13 @@ internal class ChannelTurbine( job?.cancel() } - override fun takeEvent(): Event = channel.takeEvent() + override fun takeEvent(): Event = channel.takeEvent(name = name) - override fun takeItem(): T = channel.takeItem() + override fun takeItem(): T = channel.takeItem(name = name) - override fun takeComplete() = channel.takeComplete() + override fun takeComplete() = channel.takeComplete(name = name) - override fun takeError(): Throwable = channel.takeError() + override fun takeError(): Throwable = channel.takeError(name = name) private var ignoreTerminalEvents = false private var ignoreRemainingEvents = false @@ -176,20 +181,20 @@ internal class ChannelTurbine( } override fun expectNoEvents() { - channel.expectNoEvents() + channel.expectNoEvents(name = name) } - override fun expectMostRecentItem(): T = channel.expectMostRecentItem() + override fun expectMostRecentItem(): T = channel.expectMostRecentItem(name = name) - override suspend fun awaitEvent(): Event = withTurbineTimeout { channel.awaitEvent() } + override suspend fun awaitEvent(): Event = withTurbineTimeout { channel.awaitEvent(name = name) } - override suspend fun awaitItem(): T = withTurbineTimeout { channel.awaitItem() } + override suspend fun awaitItem(): T = withTurbineTimeout { channel.awaitItem(name = name) } - override suspend fun skipItems(count: Int) = withTurbineTimeout { channel.skipItems(count) } + override suspend fun skipItems(count: Int) = withTurbineTimeout { channel.skipItems(count, name) } - override suspend fun awaitComplete() = withTurbineTimeout { channel.awaitComplete() } + override suspend fun awaitComplete() = withTurbineTimeout { channel.awaitComplete(name = name) } - override suspend fun awaitError(): Throwable = withTurbineTimeout { channel.awaitError() } + override suspend fun awaitError(): Throwable = withTurbineTimeout { channel.awaitError(name = name) } override fun ensureAllEventsConsumed() { if (ignoreRemainingEvents) return @@ -209,7 +214,8 @@ internal class ChannelTurbine( if (unconsumed.isNotEmpty()) { throw TurbineAssertionError( buildString { - append("Unconsumed events found:") + append("Unconsumed events found".qualifiedBy(name)) + append(":") for (event in unconsumed) { append("\n - $event") } diff --git a/src/commonMain/kotlin/app/cash/turbine/channel.kt b/src/commonMain/kotlin/app/cash/turbine/channel.kt index b5fdc7cc..b0776035 100644 --- a/src/commonMain/kotlin/app/cash/turbine/channel.kt +++ b/src/commonMain/kotlin/app/cash/turbine/channel.kt @@ -42,7 +42,7 @@ import kotlinx.coroutines.withTimeout * * @throws AssertionError if no item was emitted. */ -public fun ReceiveChannel.expectMostRecentItem(): T { +public fun ReceiveChannel.expectMostRecentItem(name: String? = null): T { var previous: ChannelResult? = null while (true) { val current = tryReceive() @@ -55,7 +55,7 @@ public fun ReceiveChannel.expectMostRecentItem(): T { if (previous?.isSuccess == true) return previous.getOrThrow() - throw AssertionError("No item was found") + throw AssertionError("No item was found".qualifiedBy(name)) } /** @@ -66,9 +66,9 @@ public fun ReceiveChannel.expectMostRecentItem(): T { * * @throws AssertionError if unconsumed events are found. */ -public fun ReceiveChannel.expectNoEvents() { +public fun ReceiveChannel.expectNoEvents(name: String? = null) { val result = tryReceive() - if (!result.isFailure) result.unexpectedResult("no events") + if (!result.isFailure) result.unexpectedResult(name, "no events") } /** @@ -77,7 +77,7 @@ public fun ReceiveChannel.expectNoEvents() { * * This function will always return a terminal event on a closed [ReceiveChannel]. */ -public suspend fun ReceiveChannel.awaitEvent(): Event { +public suspend fun ReceiveChannel.awaitEvent(name: String? = null): Event { val timeout = contextTimeout() return try { withAppropriateTimeout(timeout) { @@ -85,9 +85,9 @@ public suspend fun ReceiveChannel.awaitEvent(): Event { Event.Item(item) } } catch (e: TimeoutCancellationException) { - throw AssertionError("No value produced in $timeout") + throw AssertionError("No ${"value produced".qualifiedBy(name)} in $timeout") } catch (e: TurbineTimeoutCancellationException) { - throw AssertionError("No value produced in $timeout") + throw AssertionError("No ${"value produced".qualifiedBy(name)} in $timeout") } catch (e: CancellationException) { throw e } catch (e: ClosedReceiveChannelException) { @@ -139,10 +139,10 @@ internal class TurbineTimeoutCancellationException internal constructor( * * @throws AssertionError if the next event was completion or an error. */ -public fun ReceiveChannel.takeEvent(): Event { +public fun ReceiveChannel.takeEvent(name: String? = null): Event { assertCallingContextIsNotSuspended() return takeEventUnsafe() - ?: unexpectedEvent(null, "an event") + ?: unexpectedEvent(name, null, "an event") } internal fun ReceiveChannel.takeEventUnsafe(): Event? { @@ -155,9 +155,9 @@ internal fun ReceiveChannel.takeEventUnsafe(): Event? { * * @throws AssertionError if the next event was completion or an error, or no event. */ -public fun ReceiveChannel.takeItem(): T { +public fun ReceiveChannel.takeItem(name: String? = null): T { val event = takeEvent() - return (event as? Event.Item)?.value ?: unexpectedEvent(event, "item") + return (event as? Event.Item)?.value ?: unexpectedEvent(name, event, "item") } /** @@ -166,9 +166,9 @@ public fun ReceiveChannel.takeItem(): T { * * @throws AssertionError if the next event was completion or an error. */ -public fun ReceiveChannel.takeComplete() { +public fun ReceiveChannel.takeComplete(name: String? = null) { val event = takeEvent() - if (event !is Event.Complete) unexpectedEvent(event, "complete") + if (event !is Event.Complete) unexpectedEvent(name, event, "complete") } /** @@ -177,9 +177,9 @@ public fun ReceiveChannel.takeComplete() { * * @throws AssertionError if the next event was completion or an error. */ -public fun ReceiveChannel.takeError(): Throwable { +public fun ReceiveChannel.takeError(name: String? = null): Throwable { val event = takeEvent() - return (event as? Event.Error)?.throwable ?: unexpectedEvent(event, "error") + return (event as? Event.Error)?.throwable ?: unexpectedEvent(name, event, "error") } /** @@ -188,10 +188,10 @@ public fun ReceiveChannel.takeError(): Throwable { * * @throws AssertionError if the next event was completion or an error. */ -public suspend fun ReceiveChannel.awaitItem(): T = - when (val result = awaitEvent()) { +public suspend fun ReceiveChannel.awaitItem(name: String? = null): T = + when (val result = awaitEvent(name = name)) { is Event.Item -> result.value - else -> unexpectedEvent(result, "item") + else -> unexpectedEvent(name, result, "item") } /** @@ -200,12 +200,12 @@ public suspend fun ReceiveChannel.awaitItem(): T = * * @throws AssertionError if one of the events was completion or an error. */ -public suspend fun ReceiveChannel.skipItems(count: Int) { +public suspend fun ReceiveChannel.skipItems(count: Int, name: String? = null) { repeat(count) { index -> when (val event = awaitEvent()) { Event.Complete, is Event.Error -> { val cause = (event as? Event.Error)?.throwable - throw TurbineAssertionError("Expected $count items but got $index items and $event", cause) + throw TurbineAssertionError("Expected $count ${"items".qualifiedBy(name)} but got $index items and $event", cause) } is Event.Item -> { // Success @@ -220,10 +220,10 @@ public suspend fun ReceiveChannel.skipItems(count: Int) { * * @throws AssertionError if the next event was an item or an error. */ -public suspend fun ReceiveChannel.awaitComplete() { +public suspend fun ReceiveChannel.awaitComplete(name: String? = null) { val event = awaitEvent() if (event != Event.Complete) { - unexpectedEvent(event, "complete") + unexpectedEvent(name, event, "complete") } } @@ -233,10 +233,10 @@ public suspend fun ReceiveChannel.awaitComplete() { * * @throws AssertionError if the next event was an item or completion. */ -public suspend fun ReceiveChannel.awaitError(): Throwable { +public suspend fun ReceiveChannel.awaitError(name: String? = null): Throwable { val event = awaitEvent() return (event as? Event.Error)?.throwable - ?: unexpectedEvent(event, "error") + ?: unexpectedEvent(name, event, "error") } internal fun ChannelResult.toEvent(): Event? { @@ -249,10 +249,18 @@ internal fun ChannelResult.toEvent(): Event? { } } -private fun ChannelResult.unexpectedResult(expected: String): Nothing = unexpectedEvent(toEvent(), expected) +private fun ChannelResult.unexpectedResult(name: String?, expected: String): Nothing = + unexpectedEvent(name, toEvent(), expected) -private fun unexpectedEvent(event: Event<*>?, expected: String): Nothing { +private fun unexpectedEvent(name: String?, event: Event<*>?, expected: String): Nothing { val cause = (event as? Event.Error)?.throwable val eventAsString = event?.toString() ?: "no items" - throw TurbineAssertionError("Expected $expected but found $eventAsString", cause) + throw TurbineAssertionError("Expected ${expected.qualifiedBy(name)} but found $eventAsString", cause) } + +internal fun String.qualifiedBy(name: String?) = + if (name == null) { + this + } else { + "$this for $name" + } diff --git a/src/commonMain/kotlin/app/cash/turbine/flow.kt b/src/commonMain/kotlin/app/cash/turbine/flow.kt index db2a8355..b2f82847 100644 --- a/src/commonMain/kotlin/app/cash/turbine/flow.kt +++ b/src/commonMain/kotlin/app/cash/turbine/flow.kt @@ -48,10 +48,11 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher */ public suspend fun Flow.test( timeout: Duration? = null, + name: String? = null, validate: suspend ReceiveTurbine.() -> Unit, ) { coroutineScope { - collectTurbineIn(this, null).apply { + collectTurbineIn(this, null, name).apply { if (timeout != null) { withTurbineTimeout(timeout) { validate() @@ -83,13 +84,17 @@ public suspend fun Flow.test( * @param timeout If non-null, overrides the current Turbine timeout for this [Turbine]. See also: * [withTurbineTimeout]. */ -public fun Flow.testIn(scope: CoroutineScope, timeout: Duration? = null): ReceiveTurbine { +public fun Flow.testIn( + scope: CoroutineScope, + timeout: Duration? = null, + name: String? = null, +): ReceiveTurbine { if (timeout != null) { // Eager check to throw early rather than in a subsequent 'await' call. checkTimeout(timeout) } - val turbine = collectTurbineIn(scope, timeout) + val turbine = collectTurbineIn(scope, timeout, name) scope.coroutineContext.job.invokeOnCompletion { exception -> if (debug) println("Scope ending ${exception ?: ""}") @@ -104,7 +109,7 @@ public fun Flow.testIn(scope: CoroutineScope, timeout: Duration? = null): } @OptIn(ExperimentalCoroutinesApi::class) // New kotlinx.coroutines test APIs are not stable 😬 -private fun Flow.collectTurbineIn(scope: CoroutineScope, timeout: Duration?): Turbine { +private fun Flow.collectTurbineIn(scope: CoroutineScope, timeout: Duration?, name: String?): Turbine { lateinit var channel: Channel // Use test-specific unconfined if test scheduler is in use to inherit its virtual time. @@ -116,7 +121,7 @@ private fun Flow.collectTurbineIn(scope: CoroutineScope, timeout: Duratio channel = collectIntoChannel(this) } - return ChannelTurbine(channel, job, timeout) + return ChannelTurbine(channel, job, timeout, name) } internal fun Flow.collectIntoChannel(scope: CoroutineScope): Channel { diff --git a/src/commonTest/kotlin/app/cash/turbine/ChannelTest.kt b/src/commonTest/kotlin/app/cash/turbine/ChannelTest.kt index 233ee5f4..545b1c28 100644 --- a/src/commonTest/kotlin/app/cash/turbine/ChannelTest.kt +++ b/src/commonTest/kotlin/app/cash/turbine/ChannelTest.kt @@ -108,6 +108,14 @@ class ChannelTest { assertEquals(3, channel.awaitItem()) } + @Test fun skipItemsThrowsOnComplete() = runTest { + val channel = flowOf(1, 2).collectIntoChannel(this) + val message = assertFailsWith { + channel.skipItems(3) + }.message + assertEquals("Expected 3 items but got 2 items and Complete", message) + } + @Test fun expectErrorOnCompletionBeforeAllItemsWereSkipped() = runTest { val channel = flowOf(1).collectIntoChannel(this) assertFailsWith { @@ -286,6 +294,61 @@ class ChannelTest { assertSame(error, actual.cause) } + @Test + fun expectMostRecentItemButNoItemWasFoundThrowsWithName() = runTest { + val actual = assertFailsWith { + val channel = emptyFlow().collectIntoChannel(this) + channel.expectMostRecentItem(name = "empty flow") + } + assertEquals("No item was found for empty flow", actual.message) + } + + @Test fun awaitItemButWasCloseThrowsWithName() = runTest { + val actual = assertFailsWith { + emptyFlow().collectIntoChannel(this).awaitItem(name = "closed flow") + } + assertEquals("Expected item for closed flow but found Complete", actual.message) + } + + @Test fun awaitCompleteButWasItemThrowsWithName() = runTest { + val actual = assertFailsWith { + flowOf("item!").collectIntoChannel(this) + .awaitComplete(name = "item flow") + } + assertEquals("Expected complete for item flow but found Item(item!)", actual.message) + } + + @Test fun awaitErrorButWasItemThrowsWithName() = runTest { + val actual = assertFailsWith { + flowOf("item!").collectIntoChannel(this).awaitError(name = "item flow") + } + assertEquals("Expected error for item flow but found Item(item!)", actual.message) + } + + @Test fun awaitHonorsCoroutineContextTimeoutTimeoutWithName() = runTest { + val actual = assertFailsWith { + withTurbineTimeout(10.milliseconds) { + neverFlow().collectIntoChannel(this).awaitItem(name = "never flow") + } + } + assertEquals("No value produced for never flow in 10ms", actual.message) + } + + @Test fun takeItemButWasCloseThrowsWithName() = withTestScope { + val actual = assertFailsWith { + emptyFlow().collectIntoChannel(this).takeItem(name = "empty flow") + } + assertEquals("Expected item for empty flow but found Complete", actual.message) + } + + @Test fun skipItemsThrowsOnCompleteWithName() = runTest { + val channel = flowOf(1, 2).collectIntoChannel(this) + val message = assertFailsWith { + channel.skipItems(3, name = "two item channel") + }.message + assertEquals("Expected 3 items for two item channel but got 2 items and Complete", message) + } + /** * Used to run test code with a [TestScope], but still outside a suspending context. */ diff --git a/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt b/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt index 5c22ef39..18890d7c 100644 --- a/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt +++ b/src/commonTest/kotlin/app/cash/turbine/FlowInScopeTest.kt @@ -200,4 +200,46 @@ class FlowInScopeTest { } assertEquals("Turbine timeout must be greater than 0: 0s", actual.message) } + + @Test fun expectItemButWasErrorThrowsWithName() = runTest { + val error = CustomRuntimeException("hi") + val actual = assertFailsWith { + flow { throw error }.testIn(this, name = "unit flow") + .awaitItem() + } + assertEquals("Expected item for unit flow but found Error(CustomRuntimeException)", actual.message) + assertSame(error, actual.cause) + } + + @Test fun timeoutThrowsWithName() = runTest { + val turbine = neverFlow().testIn(this, timeout = 10.milliseconds, name = "never flow") + val actual = assertFailsWith { + turbine.awaitItem() + } + assertEquals("No value produced for never flow in 10ms", actual.message) + turbine.cancel() + } + + @Test fun unconsumedItemThrowsWithName() = runTest { + // We have to use an exception handler rather than assertFailsWith because runTest also uses + // one which defers throwing until its block completes. + val exceptionHandler = RecordingExceptionHandler() + withContext(exceptionHandler) { + flow { + emit("item!") + emitAll(neverFlow()) // Avoid emitting complete + }.testIn(this, name = "item flow").cancel() + } + val exception = exceptionHandler.exceptions.removeFirst() + assertTrue(exception is CompletionHandlerException) + val cause = exception.cause + assertTrue(cause is AssertionError) + assertEquals( + """ + |Unconsumed events found for item flow: + | - Item(item!) + """.trimMargin(), + cause.message, + ) + } } diff --git a/src/commonTest/kotlin/app/cash/turbine/FlowTest.kt b/src/commonTest/kotlin/app/cash/turbine/FlowTest.kt index ae7861fa..3aabf162 100644 --- a/src/commonTest/kotlin/app/cash/turbine/FlowTest.kt +++ b/src/commonTest/kotlin/app/cash/turbine/FlowTest.kt @@ -526,6 +526,15 @@ class FlowTest { } } + @Test fun skipItemsThrowsOnComplete() = runTest { + flowOf(1, 2).test { + val message = assertFailsWith { + skipItems(3) + }.message + assertEquals("Expected 3 items but got 2 items and Complete", message) + } + } + @Test fun expectErrorOnCompletionBeforeAllItemsWereSkipped() = runTest { flowOf(1).test { assertFailsWith { @@ -602,4 +611,49 @@ class FlowTest { } assertEquals("Turbine timeout must be greater than 0: 0s", actual.message) } + + @Test fun expectItemButWasErrorThrowsWithName() = runTest { + val error = CustomRuntimeException("hi") + val actual = assertFailsWith { + flow { throw error }.test(name = "unit flow") { + awaitItem() + } + } + assertEquals("Expected item for unit flow but found Error(CustomRuntimeException)", actual.message) + assertSame(error, actual.cause) + } + + @Test fun timeoutThrowsWithName() = runTest { + neverFlow().test(timeout = 10.milliseconds, name = "never flow") { + val actual = assertFailsWith { + awaitItem() + } + assertEquals("No value produced for never flow in 10ms", actual.message) + } + } + + @Test fun unconsumedItemThrowsWithName() = runTest { + val actual = assertFailsWith { + flow { + emit("item!") + emitAll(neverFlow()) // Avoid emitting complete + }.test(name = "item flow") { } + } + assertEquals( + """ + |Unconsumed events found for item flow: + | - Item(item!) + """.trimMargin(), + actual.message + ) + } + + @Test fun skipItemsThrowsOnCompleteWithName() = runTest { + flowOf(1, 2).test(name = "two item channel") { + val message = assertFailsWith { + skipItems(3) + }.message + assertEquals("Expected 3 items for two item channel but got 2 items and Complete", message) + } + } } diff --git a/src/commonTest/kotlin/app/cash/turbine/TurbineTest.kt b/src/commonTest/kotlin/app/cash/turbine/TurbineTest.kt index 1a1a284c..0f9cca7d 100644 --- a/src/commonTest/kotlin/app/cash/turbine/TurbineTest.kt +++ b/src/commonTest/kotlin/app/cash/turbine/TurbineTest.kt @@ -279,6 +279,70 @@ class TurbineTest { assertSame(error, actual.cause) } + @Test + fun expectMostRecentItemButNoItemWasFoundThrowsWithName() = runTest { + val actual = assertFailsWith { + Turbine(name = "empty turbine").expectMostRecentItem() + } + assertEquals("No item was found for empty turbine", actual.message) + } + + @Test + fun awaitItemButWasCloseThrowsWithName() = runTest { + val actual = assertFailsWith { + val channel = Turbine(name = "closed turbine") + channel.close() + channel.awaitItem() + } + assertEquals("Expected item for closed turbine but found Complete", actual.message) + } + + @Test + fun awaitCompleteButWasItemThrowsWithName() = runTest { + val actual = assertFailsWith { + val channel = Turbine(name = "item turbine") + channel.add("item!") + channel.awaitComplete() + } + assertEquals("Expected complete for item turbine but found Item(item!)", actual.message) + } + + @Test + fun awaitErrorButWasItemThrowsWithName() = runTest { + val actual = assertFailsWith { + val channel = Turbine(name = "item turbine") + channel.add("item!") + channel.awaitError() + } + assertEquals("Expected error for item turbine but found Item(item!)", actual.message) + } + + @Test + fun takeItemButWasCloseThrowsWithName() = withTestScope { + val actual = assertFailsWith { + val channel = Turbine(name = "closed turbine") + // JS + CoroutineScope(Dispatchers.Default).launch(start = CoroutineStart.UNDISPATCHED) { + channel.close() + } + + channel.takeItem() + } + assertEquals("Expected item for closed turbine but found Complete", actual.message) + } + + @Test fun skipItemsThrowsOnCompleteWithName() = runTest { + val channel = Turbine(name = "two item channel") + channel.add(1) + channel.add(2) + channel.close() + val message = assertFailsWith { + channel.skipItems(3) + }.message + + assertEquals("Expected 3 items for two item channel but got 2 items and Complete", message) + } + /** * Used to run test code with a [TestScope], but still outside a suspending context. */