Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HELP] A way of knowing whether a flow is being collected (analogue of LiveData's onActive()/onInactive()) #2194

Closed
psteiger opened this issue Aug 11, 2020 · 3 comments

Comments

@psteiger
Copy link

psteiger commented Aug 11, 2020

Hello Kotlin community,

Ever since I migrated fully from LiveData to Flow/StateFlow, I miss a way for knowing whether a StateFlow is being collected in order to start or stop the upstream flow collection, which can be done with LiveData's onActive()/onInactive() callbacks.

Example:

private val scope = CoroutineScope(Dispatchers.IO)

private val upstreamPeer: Flow<Resource<User>> = 
    combine(barePeer, position, comments, portfolio, ::transform)
        .onStart { emit(Resource.Loading()) }
        .buffer(Channel.CONFLATED)
        .flowOn(scope)

private val _peer = MutableStateFlow<Resource<User>>(Resource.Loading())

override val peer: StateFlow<Resource<User>> = _peer

fun startUpstreamCollection() {
    upstreamPeer
        .onEach { _peer.value = it }
        .launchIn(scope)
}

What I need is a way of knowing whether peer is being collected, so I startUpstreamCollection(). Also the symmetric: stop collection when there are no peer collectors.

Could someone shed some light into this issue? Am I missing something?

@psteiger psteiger changed the title [HELP] A way for knowing whether a flow is being collected (analogue of LiveData's onActive()/onInactive()) [HELP] A way of knowing whether a flow is being collected (analogue of LiveData's onActive()/onInactive()) Aug 12, 2020
@twyatt
Copy link
Contributor

twyatt commented Aug 22, 2020

@psteiger per #2034 (comment):

Shared flow provides subscriptionCount as an analogy to onActive/onInactive

It's not available yet, but per #2069 (comment), they:

tentatively plan to merge and release it shortly after Kotlin 1.4 is released as a part of kotlinx.coroutines version 1.4.0.

I believe shareIn (#2047) will give you the functionality you're looking for out-of-the-box (without having to manually call startUpstreamCollection()).

@elizarov
Copy link
Contributor

This is going to be available in the upcoming shared flow. See its design as explained in #2034.

@psteiger
Copy link
Author

psteiger commented Nov 16, 2020

A heads-up on this issue:

Shared flow provides subscriptionCount as an analogy to onActive/onInactive

subscriptionCount is not really an analogue of LiveData's onActive()/onInactive() because it accounts for the total of existing collectors without considering paused collectors, such as collectors launched in a coroutine launched with LifecycleCoroutineScope.launchWhenStarted.

What I ended up doing is, instead of relying on pausable coroutines, using coroutines that get canceled on onStop() and recreated on onStart() to leverage SharedFlow.subscriptionCount (or SharedFlows launched with shareIn(CoroutineScope, SharingStarted.WhileSubscribed(), replay)):

class Observer<T>(
    lifecycleOwner: LifecycleOwner,
    private val flow: Flow<T>,
    private val collector: suspend (T) -> Unit
) : DefaultLifecycleObserver {

    var job: Job? = null

    override fun onStart(owner: LifecycleOwner) {
        job = owner.lifecycleScope.launch {
            flow.collect {
                collector(it)
            }
        }
    }

    override fun onStop(owner: LifecycleOwner) {
        job?.cancel()
        job = null
    }

    init {
        lifecycleOwner.lifecycle.addObserver(this)
    }
}

inline fun <reified T> Flow<T>.observe(
    lifecycleOwner: LifecycleOwner,
    noinline collector: suspend (T) -> Unit
) = Observer(lifecycleOwner, this, collector)

inline fun <reified T> Flow<T>.observeIn(
    lifecycleOwner: LifecycleOwner
) = Observer(lifecycleOwner, this, {})

Then I launch such coroutine with:

sharedFlow.observe(lifecycleOwner) {
    // ...
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants