Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flowOn seems not consistent over time #2462

Closed
twittemb opened this issue Dec 22, 2020 · 3 comments
Closed

flowOn seems not consistent over time #2462

twittemb opened this issue Dec 22, 2020 · 3 comments

Comments

@twittemb
Copy link

twittemb commented Dec 22, 2020

Hi every one,

It'd like to submit an issue we've had in our team regarding the design of a feedback loop architecture.
We've implemented a test that was supposed to ensure that our reducer function was always called on the expected Thread (in our unit test, abstracted by a TestCoroutineDispatcher).
In our test, we have 2 feedbacks that perform side effects on their respective and dedicated dispatchers (backed by SingleThreadContext). Every time the loop iterates, we check that the side effects are run on the expected Threads and the reducer is also executed in the expected Thread.

When executed once, the test works fine. When executed several times then we start to see that the reducing can take place in a feedback dedicated Thread. It seems to be completely random. I'd say that on 200 executions of the test, 30 can fail because of that.

We try to ensure the reducer is executed on the expected dispatcher thanks to a flowOn() statement. We've also tried the execute it inside a withContext() statement, but we noticed the same behaviour.

At that point, we do not know if our implementation has an issue, or if the Flow api has an issue. Any help would be appreciated 👍.

Coroutine version is 1.4.1.
Here is the code of the Unit Test:

@FlowPreview
@ExperimentalCoroutinesApi
class FeedbackLoopUnitTest {

    private sealed class MockState : State {
        data class StateA(val round: Int) : MockState()
        data class StateB(val round: Int) : MockState()
        data class StateC(val round: Int) : MockState()
        data class StateD(val round: Int) : MockState()
    }

    private sealed class MockEvent : Event {
        object Next : MockEvent()
    }

    private val testDispatcher = TestCoroutineDispatcher()

    @AfterEach
    fun resetTestDispatcher() {
        testDispatcher.cleanupTestCoroutines()
    }

    @ObsoleteCoroutinesApi
    @RepeatedTest(value=200)
    @DisplayName("Assemble makes a stream that respects the schedulers used in each feedback and in the reducer")
    fun assemble_make_a_stream_that_respect_schedulers() = testDispatcher.runBlockingTest {
        val expectedFeedbackAThread = "FEEDBACKA-${UUID.randomUUID()}"
        val expectedFeedbackBThread = "FEEDBACKB-${UUID.randomUUID()}"
        var expectedReducerThreadId = -1L

        val receivedFeedbackAThreads = mutableListOf<String>()
        val receivedFeedbackBThreads = mutableListOf<String>()
        val receivedReducerThreads = mutableListOf<Long>()

        val sideEffectDispatcherA = newSingleThreadContext(expectedFeedbackAThread)
        val sideEffectDispatcherB = newSingleThreadContext(expectedFeedbackBThread)
        val loopDispatcher = TestCoroutineDispatcher()

        // Getting the Thread id for the loopDispatcher, so we can compare it with the
        // one retrieved in the reducer
        loopDispatcher.invoke {
            expectedReducerThreadId = Thread.currentThread().id
        }

        val feedbackA: (Flow<MockState>) -> Flow<MockEvent> = {
            it.flatMapConcat {
                flowOf("")
                    .onEach { receivedFeedbackAThreads.add(Thread.currentThread().name) }
                    .map { MockEvent.Next }
                    .flowOn(sideEffectDispatcherA)
            }
        }

        val feedbackB: (Flow<MockState>) -> Flow<MockEvent> = {
            it.flatMapConcat {
                flowOf("")
                    .onEach { receivedFeedbackBThreads.add(Thread.currentThread().name) }
                    .map { MockEvent.Next }
                    .flowOn(sideEffectDispatcherB)
            }
        }

        // Given: a reducer that records its execution queue
        val reducer: suspend (MockState, MockEvent) -> MockState = { state, _ ->
            receivedReducerThreads.add(Thread.currentThread().id)
            when (state) {
                is MockState.StateA -> MockState.StateB(state.round + 1)
                is MockState.StateB -> MockState.StateA(state.round + 1)
                else -> state
            }
        }

        // here is the feedback loop definition
        val stateSharedFlow = MutableSharedFlow<MockState>(1)

        val sut = merge(feedbackA(stateSharedFlow), feedbackB(stateSharedFlow))
            .catch {
                Timber.e(it)
                emptyFlow<MockEvent>()
            }
            .scan(MockState.StateA(1), reducer)
            .onEach { stateSharedFlow.emit(it) }
            .flowOn(loopDispatcher)

        // When: executing the loop on a specific scheduler
        val job = launch(loopDispatcher) {
            sut.take(3).collect()
        }

        job.cancel()

        // Then: the feedbacks and reducer happen on the expected schedulers
        Truth.assertThat(receivedFeedbackAThreads).isNotEmpty()
        Truth.assertThat(receivedFeedbackBThreads).isNotEmpty()
        Truth.assertThat(receivedReducerThreads).isNotEmpty()

        receivedFeedbackAThreads.forEach { Truth.assertThat(it).startsWith(expectedFeedbackAThread) }
        receivedFeedbackBThreads.forEach { Truth.assertThat(it).startsWith(expectedFeedbackBThread) }
        receivedReducerThreads.forEach { Truth.assertThat(it).isEqualTo(expectedReducerThreadId) }
    }
}
@qwwdfsad
Copy link
Member

Thanks for the self-contained report!
This is one of the problems of TestCoroutineDispatcher -- it is not confined to any particular thread and always execute coroutines on the current thread.

In your test, you have (after watering down all the machinery) three coroutines: feedbackA and feedbackB that execute the flow on its own thread and send results to the intermediate channel ("buffer") and launch(loopDispatcher) that collects the resulting merged flow (-> receives elements from a channel).

Depending on whether launch(loopDispatcher) suspends during receive from a channel or not, its reducer may be executed either on the current thread (that has expectedReducerThreadId) or on any feedback thread.

@twittemb
Copy link
Author

Hi @qwwdfsad

thanks a lot for your nice explanation. Is there a way to not use TestCoroutineDispatcher and force the execution of the reducer to an expected Thread ? I’ve tried to do so, but the whole loop doesn’t run anymore. It seems mandatory to use a TestCoroutineDispatcher at some point. The point of my test here is to ensure that the reducing takes place on the dispatcher used in the ´flowOn’ statement.

Instead of testing the Thread name/id, do you know a way to identify the currently used dispatcher ? It could avoid the fact that the thread used by the TestCoroutineDispatcher is not reliable in my case ?

thanks for your help.

@qwwdfsad
Copy link
Member

Instead of testing the Thread name/id, do you know a way to identify the currently used dispatcher ?

You can use coroutineContext[CoroutineDispatcher] for that

@dkhalanskyjb dkhalanskyjb self-assigned this Oct 14, 2021
dkhalanskyjb added a commit that referenced this issue Nov 1, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
dkhalanskyjb added a commit that referenced this issue Nov 17, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
dkhalanskyjb added a commit that referenced this issue Nov 17, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
dkhalanskyjb added a commit that referenced this issue Nov 19, 2021
Defines two test dispatchers:
* StandardTestDispatcher, which, combined with runTest,
  gives an illusion of an event loop;
* UnconfinedTestDispatcher, which is like
  Dispatchers.Unconfined, but skips delays.

By default, StandardTestDispatcher is used due to the somewhat
chaotic execution order of Dispatchers.Unconfined.
TestCoroutineDispatcher is deprecated.

Fixes #1626
Fixes #1742
Fixes #2082
Fixes #2102
Fixes #2405
Fixes #2462
yorickhenning pushed a commit to yorickhenning/kotlinx.coroutines that referenced this issue Jan 28, 2022
This commit introduces the new version of the test module.
Please see README.md and MIGRATION.md for a thorough
discussion of the changes.

Fixes Kotlin#1203
Fixes Kotlin#1609
Fixes Kotlin#2379
Fixes Kotlin#1749
Fixes Kotlin#1204
Fixes Kotlin#1390
Fixes Kotlin#1222
Fixes Kotlin#1395
Fixes Kotlin#1881
Fixes Kotlin#1910
Fixes Kotlin#1772
Fixes Kotlin#1626
Fixes Kotlin#1742
Fixes Kotlin#2082
Fixes Kotlin#2102
Fixes Kotlin#2405
Fixes Kotlin#2462

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
pablobaxter pushed a commit to pablobaxter/kotlinx.coroutines that referenced this issue Sep 14, 2022
This commit introduces the new version of the test module.
Please see README.md and MIGRATION.md for a thorough
discussion of the changes.

Fixes Kotlin#1203
Fixes Kotlin#1609
Fixes Kotlin#2379
Fixes Kotlin#1749
Fixes Kotlin#1204
Fixes Kotlin#1390
Fixes Kotlin#1222
Fixes Kotlin#1395
Fixes Kotlin#1881
Fixes Kotlin#1910
Fixes Kotlin#1772
Fixes Kotlin#1626
Fixes Kotlin#1742
Fixes Kotlin#2082
Fixes Kotlin#2102
Fixes Kotlin#2405
Fixes Kotlin#2462

Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants