Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.collects receives items after it was cancelled on a single thread #1265

Closed
Thomas-Vos opened this issue Jun 10, 2019 · 13 comments
Closed

Comments

@Thomas-Vos
Copy link
Contributor

Could anyone explain to me why this code throws an exception (on Android)? The collect and cancel functions are both called on the main thread. I tried this with both version 1.2.1 and 1.3.0-M1

var cancelled = false
val job = GlobalScope.launch(Dispatchers.Main) {
    val flow = flow {
        while (true) {
            emit(Unit)
        }
    }
    flow.flowOn(Dispatchers.IO)
        .collect {
            if (cancelled) { // main thread
                throw IllegalStateException() // this exception is thrown
            }
        }
}

GlobalScope.launch(Dispatchers.Main) {
    delay(1000)
    job.cancel() // main thread
    cancelled = true // set to true after the Flow was cancelled.
}

Here is an Android sample project, just run the app and it will crash.

I'm not sure if this is intended behaviour or a bug. I need to make sure all Flows are cancelled in the Android onDestroy so they do not try to update the view if it is already gone. So if this is not a bug I am not sure how I would need to handle this otherwise.

There is a long discussion about this with more code examples on Slack: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1559908916083600

We had some trouble trying to reproduce this in a unit test (it did not throw the exception), so that's why I attached an Android project.

@elizarov
Copy link
Contributor

Coroutines cancellation is cooperative, but your collect implementation does not cooperate with coroutine cancellation mechanics. It checks its own cancelled flag. I'm not sure why you assume that it is somehow supposed to work. You are advised to review this section of the guide on the approaches you can use to make your code cooperate with cancellation:
https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html

@nickallendev
Copy link

Hi @elizarov,

I believe the purpose of the example was to point out that collect calls its lambda without checking for cancellation first. If the example ignores cancelled and checks !isActive instead, the result is the same.

The documentation you linked states:

All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled.

I think it's easy for developers to gather from this that collect will check for cancellation before calling its provided lambda. This also seems like an intuitive assumption for developers to make.

Here's a use case I've dealt with where this seems especially problematic:

  • An Activity uses C libraries to share code with other platforms
  • Those C libraries are manipulated based on events from objects that live on after the Activity
  • The Activity cleans up its native resources and subscriptions in onDestroy

Note: no thread switching is involved, only different lifetimes between source and observer

Currently, this code base uses RxJava, but if it were naively switched to use Flow and collect, it could result in a corrupted heap due to collect calling a lambda while no longer active that manipulates freed native resources.

While creating a custom collectWhileActive terminal operator is trivial, it seems quite easy for other developers in a similar situation to not know they need such an operator.

@elizarov
Copy link
Contributor

But if you have an analogue of while (true) { emit(Unit) } in Rx you'll have the same problem. The real code would be cooperating with cancellation in some way, because it will be invoking some function that checks for cancellation.

@nickallendev
Copy link

I've found that Rx does not have the same problem because Rx does not have atomic cancellation (figured out that's the issue here).

Rx's ObservableObserveOn checks if it is disposed on the observe-side thread. Kotlin's ChannelFlow does no such check so it inherits Channel's atomic cancellation behavior.

This reframes the issue in my mind as: Should Flow operators like flowOn use atomic cancellation?

I'm leaning towards no, myself. A single Channel can guarantee delivery OR cancellation, but a flow may be built from multiple channels (multiple flowOns with a suspending map between, for example) which means that the benefit of atomic cancellation is something that is easily lost end-to-end.

I'm also thinking it'll help more developers avoid race-condition bugs than if the behavior is left as-is.

And if a developer really does need atomic cancellation, they can still use a Channel.

Note: I've since noticed Issue 1177 which is similar. That specific repro was fixed by changing withContext but the race condition still exists for flowOn and collect.

Here's an example that has failed consistently for me on Android in an Activity's onCreate:

    var timeToCancel = false
    GlobalScope.launch(Dispatchers.Main) {
        flow {
            while (isActive) {
                delay(100)
                if (timeToCancel) {
                    runOnUiThread { //Pretend unfortunate timing of onDestroy
                        cancel()
                    }
                }
                emit(Unit)
            }
        }.flowOn(Dispatchers.IO).collect {
            if (!isActive) {
                throw IllegalStateException("How'd I get here?!")
            }
        }
    }
    GlobalScope.launch(Dispatchers.Main) {
        delay(1000)
        timeToCancel = true
    }

@PaulWoitaschek
Copy link
Contributor

I have replaced every single collect with a safeCollect function in my project:

/**
 * Only proceed with the given action if the coroutine has not been cancelled.
 * Necessary because Flow.collect receives items even after coroutine was cancelled
 * https://github.com/Kotlin/kotlinx.coroutines/issues/1265
 */
suspend inline fun <T> Flow<T>.safeCollect(crossinline action: suspend (T) -> Unit) {
  collect {
    coroutineContext.ensureActive()
    action(it)
  }
}

It would be great if this could be implemented as a default as it's really unexpected behavior and its error prone to always have to remember to not use specific library functions.
On Android my collect functions are always right before they touch the views so they must not get invoked after the view is gone.

@ansman
Copy link
Contributor

ansman commented Dec 22, 2019

What's really strange with this example is that emit should suspend and throw when cancelled but for some reason it doesn't. Adding a yield() inside the loop makes no difference either.

So what baffles me is why the coroutine that is collecting isn't throwing a cancellation exception. When the job is cancelled that coroutine is suspended and should be cancelled atomically?

As I understand it using flowOn is creating a channel under then hood so it is essentially the same as producing items into a buffered channel and consuming it on the other side? It feels like consuming a channel on the main thread should not be able to lead to this condition if cancellation is also done on the main thread.

@ansman
Copy link
Contributor

ansman commented Dec 22, 2019

Here is another example that is really strange that behaves very differently than expected (using channels directly):

runBlocking {
    val c = Channel<Int>(Channel.BUFFERED)
    val job = launch {
        for (i in c) {
            println("isActive = $isActive")
            println("Got $i")
        }
    }

    println("Sending 1")
    c.send(1)
    launch {
        println("Sending 2")
        c.send(2)
        println("Cancelling")
        job.cancel()
        println("Sending 3")
        c.send(3)
    }
}

I would expect the snippet above to print:

Sending 1
isActive = true
Got 1
Sending 2
Cancelling
Sending 3

But it actually prints:

Sending 1
isActive = true
Got 1
Sending 2
Cancelling
Sending 3
isActive = false
Got 2
isActive = false
Got 3

@elizarov
Copy link
Contributor

@ansman There is nothing strange with your Channel example. It behaves so by design, because receiving from a channel is an atomic operation. Details of that are documented here: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/receive.html

@ansman
Copy link
Contributor

ansman commented Dec 23, 2019

This is pretty unexpected to me but explains the issue with flows. This does feel like a very dangerous behavior on Android where objects tend to be nulled out in response to lifecycle methods. This would definitely not happen with RX since like @nickallendev said RX checks for cancellation both in the producer and consumer sides

@elizarov
Copy link
Contributor

It does not have to be this way for flow. I don't believe flows should support atomic value transfer across different threads/dispatchers. I'll submit PR with a fix for flows.

@elizarov elizarov added design and removed question labels Dec 23, 2019
elizarov added a commit that referenced this issue Dec 23, 2019
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomicCancellation` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control atomicity of this receive and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Dec 24, 2019
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Dec 24, 2019
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Dec 24, 2019
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Jan 28, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Feb 17, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Apr 8, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity. Atomic nature of the underlying channels is designed to prevent loss of elements, which is really not an issue for a typical application, but creates problem when used with channels.

This change introduces an optional `atomic` parameter to an internal `ReceiveChannel.receiveOrClosed` method to control cancellation atomicity of this function and tweaks `emitAll(ReceiveChannel)` implementation to abandon atomicity in favor of predictable cancellation.

Fixes #1265
elizarov added a commit that referenced this issue Apr 21, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Apr 22, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Apr 22, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Apr 22, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Jun 16, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
elizarov added a commit that referenced this issue Sep 16, 2020
This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for #1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic

Fixes #1265
Fixes #1813
Fixes #1915
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
…otlin#1937)

This is a problematic for Android when Main dispatcher is cancelled on destroyed activity.
Atomic nature of channels is designed to prevent loss of elements,
which is really not an issue for a typical application, but creates problem when used with channels.

* Internal suspendAtomicCancellableCoroutine -> suspendCancellableCoroutine
* Internal suspendAtomicCancellableCoroutineReusable -> suspendCancellableCoroutineReusable
* Remove atomic cancellation from docs
* Ensures that flowOn does not resume downstream after cancellation.
* MODE_ATOMIC_DEFAULT renamed into MODE_ATOMIC
* Introduced MODE_CANCELLABLE_REUSABLE to track suspendCancellableCoroutineReusable
* Better documentation for MODE_XXX constants.
* Added stress test for proper handling of MODE_CANCELLABLE_REUSABLE
  and fixed test for Kotlin#1123 bug with job.join (working in MODE_CANCELLABLE) that was not
  properly failing in the absence of the proper code in CancellableContinuationImpl.getResult
* Added test for Flow.combine that should be fixed
* Support extended invokeOnCancellation contract
* Introduced internal tryResumeAtomic
* Channel onUnderliveredElement is introduced as a replacement.

Fixes Kotlin#1265
Fixes Kotlin#1813
Fixes Kotlin#1915
Fixes Kotlin#1936

Co-authored-by: Louis CAD <louis.cognault@gmail.com>
Co-authored-by: Vsevolod Tolstopyatov <qwwdfsad@gmail.com>
@donkeytronkilla
Copy link

donkeytronkilla commented Feb 10, 2021

@elizarov am I mistaken about the fix merged in #1937? I am using 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2' and I am expecting that when I have a flow which is canceled then that flow should be signaling the cancellation to any ongoing cancellable work in its downstream associated collector. But what I see is that the cancellable work that is kicked off inside my collector continues to run even after the flow that kicked it off has been cancelled. This is on Android where I have a simple activity with one button:

class MainActivity : AppCompatActivity() {
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        
        val currentClick = MutableStateFlow(0)

        lifecycleScope.launch {
            currentClick.flatMapLatest { currentClickCount ->
                flow {
                    var endlessCounter = 1
                    while (true) {
                        emit(Pair(currentClickCount, endlessCounter++))
                        delay(6000)
                    }
                }
            }.collect { (currentClickCount, endlessCounter) ->
                doSomething(currentClickCount, endlessCounter)
            }
        }

        findViewById<Button>(R.id.button).setOnClickListener {
            Log.i(TAG, "Button clicked: should cancel any ongoing work in doSomething and start new stream")
            lifecycleScope.launch { currentClick.emit(currentClick.value + 1) }
        }
    }

    private suspend fun doSomething(currentClickCount: Int, endlessCounter: Int) {
        Log.i(TAG, "doSomething 1: currentClickCount: $currentClickCount, endlessCounter: $endlessCounter ")
        delay(2000)
        Log.i(TAG, "doSomething 2: currentClickCount: $currentClickCount, endlessCounter: $endlessCounter ")
        delay(2000)
        Log.i(TAG, "doSomething 3: currentClickCount: $currentClickCount, endlessCounter: $endlessCounter ")
    }
}

I was expecting that if I click this button and emit a new value on my currentClick stateflow then when flatMapLatest cancels the previous internal endless flow and kicks of another one, then the collector for the canceled flow should be signaled and this cancellation should propagate down to my doSomething suspend method in which I have delay() calls that cooperate with cancellation. Instead what I see if I click my button to emit a new value on my currentClick stateflow which triggers the flatMapLatest is the previous collector invocation of doSomething continues until all the delay() calls have completed, even if I cancel the source flow right after the first delay() inside doSomething.

Here is the logs I see in this scenario:

2021-02-10 15:50:05.587 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 0, endlessCounter: 1 
2021-02-10 15:50:07.593 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 0, endlessCounter: 1 
2021-02-10 15:50:09.599 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 0, endlessCounter: 1 
2021-02-10 15:50:11.588 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 0, endlessCounter: 2 
2021-02-10 15:50:13.597 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 0, endlessCounter: 2 
2021-02-10 15:50:15.604 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 0, endlessCounter: 2 
2021-02-10 15:50:17.595 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 0, endlessCounter: 3 
2021-02-10 15:50:18.571 24901-24901/com.example.launchdarrker I/GeoMain1: Button clicked: should cancel any ongoing work in doSomething and start new stream
2021-02-10 15:50:19.598 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 0, endlessCounter: 3 
2021-02-10 15:50:21.607 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 0, endlessCounter: 3 
2021-02-10 15:50:21.607 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 1: currentClickCount: 1, endlessCounter: 1 
2021-02-10 15:50:23.610 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 2: currentClickCount: 1, endlessCounter: 1 
2021-02-10 15:50:25.614 24901-24901/com.example.launchdarrker I/GeoMain1: doSomething 3: currentClickCount: 1, endlessCounter: 1 

@nickallendev
Copy link

@gpartida

#1937 stops the lambda passed to collect from being invoked if lifecycleScope has already been cancelled.

Flow implementations are required to emit items with the same CoroutineContext as was used to call collect(called context preservation). This means the behavior you were expecting is actually impossible with any conforming implementation. Cancelling the work downstream is only possible by cancelling the entire collection.

To get your desired behavior, you can call collectLatest and perform all the work you want to be cancelled when a new event arrives within the provided lambda.

Note: instead of .flatMapLatest { flow { ... } } consider .transformLatest { ... }

@donkeytronkilla
Copy link

@nickallendev thanks! collectLatest is exactly what I was missing. As for flatMapLatest vs transformLatest I had just come up with a toy example of my actual code to explain my question. In my actual code I take the emission of one flow and for each one I call another method which itself returns a Flow. I'm wondering if there is still any reason to prefer transformLatest with maybe emitAll(flowReturningFunc()) over flatMapLatest in this scenario?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants