Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Inconsistency between Flow combine and RxJava combineLatest #2082

Closed
mhernand40 opened this issue Jun 5, 2020 · 7 comments
Closed

Inconsistency between Flow combine and RxJava combineLatest #2082

mhernand40 opened this issue Jun 5, 2020 · 7 comments
Assignees
Labels

Comments

@mhernand40
Copy link

mhernand40 commented Jun 5, 2020

I am looking to migrate some existing RxJava-based code to Coroutines but I ran into some test failures that touched some code that uses Observable.combineLatest. My tests originally use PublishSubjects from RxJava to mock the underlying streams within the combineLatest. However, when I switch to Flow streams in the combine and then update the tests to use Channels instead of PublishSubjects, my tests fail because the expected number of emissions are not received.

I have come up with the following reduced repro (uses MutableStateFlows instead of Channels):

    @Test
    fun `combineLatest Rx`() {
        val subject1 = BehaviorSubject.createDefault(1)
        val subject2 = BehaviorSubject.createDefault("a")

        val observer = Observables.combineLatest(subject1, subject2).test()

        subject1.onNext(2)
        subject2.onNext("b")

        subject1.onNext(3)
        subject2.onNext("c")

        println(observer.values())
    }

Result:

[(1, a), (2, a), (2, b), (3, b), (3, c)]

And then:

    @Test
    fun `combine Coroutines`() = runBlockingTest {
        val subject1 = MutableStateFlow(1)
        val subject2 = MutableStateFlow("a")
        val values = mutableListOf<Pair<Int, String>>()

        val job = launch {
            combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
                .collect { values += it }
        }

        subject1.value = 2
        subject2.value = "b"

        subject1.value = 3
        subject2.value = "c"

        job.cancel()

        println(values)
    }

Result:

[(1, a), (2, a), (2, b)]

Is this expected behavior? The only way I can seem to get the second test to produce the same emissions as the first test is to call yield() after every single value assignment.

@elizarov
Copy link
Contributor

elizarov commented Jun 5, 2020

Yes. It is the expected behavior. MutableStateFlow.value updates are always dispatched by default, just as coroutines are always dispatched. It is so by design, so you'll have to yield() to give them a chance to execute when you run in a single main thread. Rx uses a different default. Most Rx operators are Unconfined by default (don't have their own scheduler), but some of them do have a scheduler and you'll have to read docs to figure out which is which.

You can mimic Rx behavior with coroutines by using launch(Dispatchers.Unconfined). This way you will not have to yield.

Does it help?

@mhernand40
Copy link
Author

Thank you for your reply @elizarov. I updated the test to use Dispatchers.Unconfined like so:

    @Test
    fun `combine Coroutines`() = runBlockingTest {
        val subject1 = MutableStateFlow(1)
        val subject2 = MutableStateFlow("a")
        val values = mutableListOf<Pair<Int, String>>()

        val job = launch(Dispatchers.Unconfined) {
            combine(subject1, subject2) { intVal, strVal -> intVal to strVal }
                .collect { values += it }
        }

        subject1.value = 2
        subject2.value = "b"

        subject1.value = 3
        subject2.value = "c"

        job.cancel()

        println(values)
    }

Result:

[(1, a), (2, a), (2, b), (3, b), (3, c)]

Just as you said, now the emissions are consistent with those of the Rx-based test. Thank you very much!

@mhernand40
Copy link
Author

I guess what I still don't understand in the original example is why anything beyond (1, a) gets emitted at all? It just sort of seems like combine required some form of yield-ing halfway through.

@elizarov
Copy link
Contributor

elizarov commented Jun 8, 2020

I guess what I still don't understand in the original example is why anything beyond (1, a) gets emitted at all? It just sort of seems like combine required some form of yield-ing halfway through.

Ugh... That's not easy to explain. There's a lot of moving pieces involved in this machinery. Internally combine launches two coroutines to get fresh values from both flows and the corresponding code is written so that is deterministic with Unconfined dispatcher by being "fair". This fairness is achieved by calling yield(). which is implemented differently by the TestCoroutineDipsatcher. Let's leave this issue open. It might be possible to fix it so that it works with TestCoroutineDipsatcher just as well as it works with Unconfined dispatcher.

@mhernand40
Copy link
Author

Found another issue when testing combine(…).flowOn(testDispatcher) where the test is using runBlockingTest and testDispatcher is a TestCoroutineDispatcher:

    @Test
    fun test() = runBlockingTest {
        val testDispatcher = TestCoroutineDispatcher()
        val channel1 = Channel<Int>()
        val channel2 = Channel<String>()
        val flow1 = channel1.consumeAsFlow()
        val flow2 = channel2.consumeAsFlow()
        val values = mutableListOf<Pair<Int, String>>()

        launch {
            combine(flow1, flow2) { a: Int, b: String -> a to b }
                .flowOn(testDispatcher)
                .collect { values += it }
        }
        channel1.send(1)
        channel2.send("a")
        channel1.send(2)
        channel2.send("b")
        channel1.close()
        channel2.close()

        assertThat(values).containsExactly(
            1 to "a",
            2 to "a",
            2 to "b"
        )
    }

The test ends up failing with the following exception:

java.lang.IllegalStateException: This job has not completed yet
	at kotlinx.coroutines.JobSupport.getCompletionExceptionOrNull(JobSupport.kt:1189)
	at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest(TestBuilders.kt:53)
	at kotlinx.coroutines.test.TestBuildersKt.runBlockingTest$default(TestBuilders.kt:45)

This only occurs when flowOn() is applied on the combine(). If I remove the flowOn() on the combine() but apply flowOn() to flow1, and flow2, the test proceeds to run the assertion at the bottom.

@BenTilbrook
Copy link

It might be possible to fix it so that it works with TestCoroutineDipsatcher just as well as it works with Unconfined dispatcher.

@elizarov It'd be really great to have this.

Our codebase uses both TestCoroutineDispatcher APIs like advanceTimeBy, and the combine function in the code under test (Unconfined dispatcher is used to avoid unexpected test failures). We currently have to choose which one. Making TestCoroutineDispatcher support combine without yield would let us have both, and potentially save future developers the trouble of debugging this issue.

@dkhalanskyjb
Copy link
Collaborator

With the latest release, the test from the initial submission outputs just

[(1, a)]

I didn't bother to check what exactly changed since then.

In any case, diving into the code, the reason for not having to yield when using Dispatchers.Unconfined is that subjectN.value = M resumes the coroutine inside combine that awaits the next value. With Dispatchers.Unconfined, the resumed coroutine is immediately executed inside the call to resume, whereas the test dispatcher just puts the code to execute in a queue and, in this case, only empties it at the end.

So, I see three possible solutions here:

  • Change the test dispatcher so that, in its "unpaused" state, it's properly unconfined and each call to dispatch leads to immediately running the code.
  • Change the tests by inserting calls to runCurrent() in the places where you want the background coroutines to proceed.
  • In some cases, it can be useful to stop and think about what the test does. There's no bug here: the behavior of the provided test is highly reliant on a particular dispatching strategy. Unless one is using Dispatchers.Unconfined, there's no guarantee that the StateFlow detects the change by the time x.value = y finishes. Running the test with just runBlocking returns [], for example. Launching combine in a dispatcher backed by some thread other that the main one (for example, by using Dispatchers.Default) leads to data races and inconsistent results.

In my opinion, a properly unconfined dispatcher does have its merits, as it allows one to test only the functionality, forgetting about the parallelism, but it should probably be marked as such to avoid the false sense of security when parallelism does matter.

dkhalanskyjb added a commit that referenced this issue Nov 1, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
dkhalanskyjb added a commit that referenced this issue Nov 17, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
dkhalanskyjb added a commit that referenced this issue Nov 17, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
dkhalanskyjb added a commit that referenced this issue Nov 19, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
yorickhenning pushed a commit to yorickhenning/kotlinx.coroutines that referenced this issue Jan 28, 2022
This commit introduces the new version of the test module.
Please see README.md and MIGRATION.md for a thorough
discussion of the changes.

Fixes Kotlin#1203
Fixes Kotlin#1609
Fixes Kotlin#2379
Fixes Kotlin#1749
Fixes Kotlin#1204
Fixes Kotlin#1390
Fixes Kotlin#1222
Fixes Kotlin#1395
Fixes Kotlin#1881
Fixes Kotlin#1910
Fixes Kotlin#1772
Fixes Kotlin#1626
Fixes Kotlin#1742
Fixes Kotlin#2082
Fixes Kotlin#2102
Fixes Kotlin#2405
Fixes Kotlin#2462

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
This commit introduces the new version of the test module.
Please see README.md and MIGRATION.md for a thorough
discussion of the changes.

Fixes Kotlin#1203
Fixes Kotlin#1609
Fixes Kotlin#2379
Fixes Kotlin#1749
Fixes Kotlin#1204
Fixes Kotlin#1390
Fixes Kotlin#1222
Fixes Kotlin#1395
Fixes Kotlin#1881
Fixes Kotlin#1910
Fixes Kotlin#1772
Fixes Kotlin#1626
Fixes Kotlin#1742
Fixes Kotlin#2082
Fixes Kotlin#2102
Fixes Kotlin#2405
Fixes Kotlin#2462

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants