Skip to content

Flow.combine hangs when the flow in the argument throws with TestCoroutineDispatcher #2405

Closed
@kokeroulis

Description

@kokeroulis

Hello,

During writing some unit tests for our project we found out the following behaviour which is a bit odd..
If you convert a Rx.Single to a Rx.Observable to a Flow which is used with Flow.combine then
the error is not being emitted if you are using a TestCoroutineDispatcher and the Flow which was produced
from RxJava is the second on the stream.

If the dispatcher is the TestCoroutineDispatcher then the stream just hangs. (onError, catch and collect doesn't do anything).

We found out that with some small changes the following code will work fine:

  • Move the Flow which got produced from RxJava, 1st in the chain
  • Replace the dispatcher with Dispatchers.Unconfined
  • Flow.zip works as expected.

Full example bellow

class RxJavaSingleIssue {

    // it works fine with this dispatcher
     private val dispatcher = Dispatchers.Unconfined

    // The stream get's stuck. It never emits the error value
    //private val dispatcher = TestCoroutineDispatcher()

    @Test
    fun `test something when repository throws`() {
        val usecase = SomeUseCase()

        var hasError = false
        var didItCatch = false
        var result: Pair<Int, String>? = null
        GlobalScope.launch(dispatcher) {
            usecase.getSomeData()
                .onError {
                    hasError = true
                }
                .catch {
                    didItCatch = true
                    emit(Pair(0, ""))
                }
                .collect {
                    result = it
                    println("a $it")
                }
        }

        Thread.sleep(3000)
        assert(hasError)
        assert(didItCatch)
        assert(result != null)
    }
}

class SomeUseCase(
    private val someRxRepository: SomeRxRepository = SomeRxRepository(),
    private val someFlowRepository: SomeFlowRepository = SomeFlowRepository()
) {

    fun getSomeData(): Flow<Pair<Int, String>> {
        // If we revert the order, then it always works fine
        return someFlowRepository.getFlowObject()
            .combine(
                someRxRepository.getSomething().toObservable().asFlow()
                           .onError { println("I will print that there is an error") }
            ) { int, string -> Pair(int, string) }
    }
}

class SomeRxRepository {

    fun getSomething(): Single<String> {
        return Single.error(IOException())
    }
}

class SomeFlowRepository {

    fun getFlowObject(): Flow<Int> {
        return flow {
            emit(1)
            emit(2)
        }
    }
}

Activity

dkhalanskyjb

dkhalanskyjb commented on May 18, 2021

@dkhalanskyjb
Collaborator

Simplifying the reproducer, I get the following:

    @Test
    fun reproducer2405() = runBlocking {
        val dispatcher = TestCoroutineDispatcher()
        var collectedError = false
        withContext(dispatcher) {
            flow { emit(1) }
                .combine(
                    flow<String> { throw IOException() }
                ) { int, string -> int.toString() + string }
                .catch { emit("error") }
                .collect {
                    assertEquals("error", it)
                    collectedError = true
                }
        }
        assertTrue(collectedError)
    }

Looks like this isn't related to RxSingle at all: any throwing flow in the argument to combine causes this to fail with TestCoroutineDispatcher. A similar issue was reported here: #1742

changed the title [-]Flow.Combine doesn't emit an error value when RxSingle is throwing[/-] [+]Flow.combine hangs when the flow in the argument throws with TestCoroutineDispatcher[/+] on May 18, 2021
self-assigned this
on Oct 14, 2021
added a commit that references this issue on Nov 1, 2021
776250b
added 4 commits that reference this issue on Nov 17, 2021
6cefa2f
8798846
5c7d034
2e25bae
added a commit that references this issue on Jan 28, 2022
26af8b1
added a commit that references this issue on Sep 14, 2022
fb871b9
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

    Development

    No branches or pull requests

      Participants

      @kokeroulis@qwwdfsad@dkhalanskyjb

      Issue actions

        Flow.combine hangs when the flow in the argument throws with TestCoroutineDispatcher · Issue #2405 · Kotlin/kotlinx.coroutines