Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing flows combined from other flows is difficult #246

Open
cbruegg opened this issue Jun 22, 2023 · 4 comments
Open

Testing flows combined from other flows is difficult #246

cbruegg opened this issue Jun 22, 2023 · 4 comments
Labels
feature New feature or request

Comments

@cbruegg
Copy link

cbruegg commented Jun 22, 2023

Turbine offers a nice API for awaiting individual items of flows, which then allows for running assertions on them. This requires the test code to have knowledge about the sequence of emissions from the flow.

Unfortunately, this does not work well for flows combined from other flows. As an example, consider the following flows:

val flowA = MutableStateFlow("a1")
val flowB = MutableStateFlow("b1")
val flowAB = combine(flowA, flowB) { a, b -> Pair(a, b) }

The flowAB will emit an updated pair whenever flowA or flowB emit an element. Consider this function that emits to flowA and flowB:

fun update() {
  flowA.value = "a2"
  flowB.value = "b2"
}

Calling this method causes two emissions on flowAB. Theoretically a test for the emissions of flowAB when calling update() could anticipate this and ignore the first emission and run its assertions on the second emission. However, in my opinion, tests should not be aware of such implementation details. What if we swap the order of the value assignments in the update method? What if we need to add another flow to the combined flow? In both cases, we would need to update the test as well.

One way around this issue is to await items of the flow until one matches assertions. This way we don't care about the precise sequence of emissions and instead wait for a valid emission to appear. The downside of this would be that such a function would need to have a timeout mechanism for the case where a valid element is not emitted.

I propose an alternative overload of awaitItem that implements this:

suspend fun awaitItem(assertions: suspend (T) -> Unit): T // assertions could also be non-suspending instead

It would run val item = awaitItem() in a loop, and run assertions(item) on each. If assertions(...) completes normally, the item is immediately returned. If it throws an AssertionError, that gets rethrown if the Turbine timeout has been reached. If it has not been reached, the next item is awaited and the cycle repeats.

An alternative to accepting an assertions lambda would be to accept a predicate lambda instead, but this does not work well for validating multiple conditions, since a predicate lambda would not return a message describing the failure. It would also encourage long, unreadable && chains.

With the assertions lambda, test code would look like this:

someCombinedFlow.test {
  awaitItem { item ->
    assertEquals("item.first should be a2", "a2", item.first)
    assertEquals("item.second should be b2", "b2", item.second)
  }
}
@JakeWharton
Copy link
Member

I do not like the idea of an assertions lambda. Conflating exception usage for logic and test failure within a single test function is not something that I think is intuitive or has precedent in other libraries. A predicate lambda has equivalents in Flow operators, other stream libraries, and the standard library's collection operators. A predicate-based function would also allow you to build an exception-based one, if you so desire.

@JakeWharton JakeWharton added the feature New feature or request label Jun 22, 2023
@cbruegg
Copy link
Author

cbruegg commented Jun 22, 2023

Valid concerns and good workaround - thanks! A predicate-based function would actually work well then.

@NemanjaBozovic-TomTom
Copy link

NemanjaBozovic-TomTom commented Jun 30, 2023

I had a different use case, but similarly I wanted to await for "right" value to show up in the flow, so I wrote this extension

suspend fun <T> ReceiveTurbine<T>.awaitItemUntilAssertionPasses(
    timeout: Duration = DEFAULT_TURBINE_TIMEOUT,
    assertion: (T) -> Unit
): T {
    var error: AssertionError? = null
    try {
        return withTimeout(timeout) {
            var t: T
            while (true) {
                t = awaitItem()
                try {
                    assertion(t)
                    break
                } catch (e: AssertionError) {
                    error = e
                }
            }
            return@withTimeout t
        }
    } catch (e: TimeoutCancellationException) {
        if (error != null) {
            e.addSuppressed(error!!)
        }
        throw e
    }
}

(there's a bug in kotlin compiler so this var t looks a bit weird, but it has to be like that)
Might help

@matejdro
Copy link

What we do to get around is call runCurrent() first (to make sure all async coroutine code gets executed) and then call expectMostRecentItem() to get the item instead of awaitFirst().

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants