Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow.last() operator #2246

Closed
gildor opened this issue Sep 14, 2020 · 10 comments
Closed

Flow.last() operator #2246

gildor opened this issue Sep 14, 2020 · 10 comments
Assignees

Comments

@gildor
Copy link
Contributor

gildor commented Sep 14, 2020

I found myself often doing this flow.toList().last() usually for code that works with Flow which represents progress, when I want to receive the final item, it would be complimentary to first()/single() item

It also can be replaced with something like

flow
   .filterInInstance<SomeTerminalItemType>()
   .single()

but it's not always so simple, for example, a sealed hierarchy may have a few terminal types for operation

Reactive libraries have such operator http://reactivex.io/documentation/operators/last.html

@fvasco
Copy link
Contributor

fvasco commented Sep 14, 2020

Flow which represents progress

Are you considered a StateFlow?

@gildor
Copy link
Contributor Author

gildor commented Sep 14, 2020

@fvasco How state flow would help with it?

I need the final value of flow, not the current value

@elizarov
Copy link
Contributor

Interesting. Can you elaborate, please, on the kind of case where you ended up needing it? What kind of flow is that?

@gildor
Copy link
Contributor Author

gildor commented Sep 14, 2020

Any kind of progress where the final item of flow is some result item

The last case where I found it was a Flow which represents CI Job states, like in queue, executing, finished and error, so in some cases I use it like runCiJob().last(), this case where I have 2 terminal states, finished and error, so I cannot use filterInInstance()

I also had similar cases where I had a Flow which represents some file processing (we have a lot of them), it emits progress and the final item of this flow emits resulting processed file, so often consumer just wants to startFlow and receive final result

@fvasco
Copy link
Contributor

fvasco commented Sep 14, 2020

Hi, @gildor,
you need a Flow with a state, strictly speaking on this you case, you should prefer a Deferred (or a CompletableStateFlow, which looks very ugly).

I am not suggesting you to use a Deferred instead of Flow, I am considering that a deferred task should put its result in a Deferred.

@gildor
Copy link
Contributor Author

gildor commented Sep 15, 2020

@fvasco I'm really don't understand your suggestion. Yes, indeed I can use Deferred, but it works if I care only about the final result, but it's not the case, I still may need intermediate states to do some side effect: log, show progress, update status etc

I am considering that a deferred task should put its result in a Deferred

Even this doesn't make sense for me, I don't see how deferred makes it better.
It's just about convenience, not the fact that something cannot be implemented without last().

Usually now instead of exposing one Flow for API, I introduce 2 method: 1 with flow with all intermediate states and 1 suspend function with the final result, it works fine for very often used apis, but sometimes I just want to receive the final result of existing Flow API

@fvasco
Copy link
Contributor

fvasco commented Sep 15, 2020

Hi, @gildor,
sorry for my words, I try to rephrase better.

Yes, indeed I can use Deferred, but it works if I care only about the final result, but it's not the case

Yes, indeed.

I try to explain my concern, these are not strictly related to the last() operator.
I am considering your use case because it looks a common one.

In you example, you wrote: runCiJob().last().

A first api specification can be fun runCiJob(): Job, but it allows only three states and it is not possible to get updates.

A second proposal can be fun runCiJob(): Flow<State>, so it may return a hot Flow. If this Flow does not own a current state, each invocation should await the next emission.
In a corner case, if a coroutine run runCiJob() and it resumes after the task was completed, then the last event was already sent and the Flow was closed (there isn't anymore the last event in the Flow).

Instead, if a Flow has a current state (third proposal) then the API may be: fun runCiJob(): StateFlow<State>. However a StateFlow cannot be closed, so last() cannot work.

In any case, you defined a custom protocol (in queue, executing, finished and error), so, for your specific use case, a custom function should fix the issue. You already propose that.

suspend fun Flow<State>.await() = first { it.terminal }

@fvasco
Copy link
Contributor

fvasco commented Dec 1, 2020

We got the same issue, we elaborate a sequence of event and have to get the last one.

Unfortunately, we require a suspend fun inside a filter { }, so we have to switch to Flow, but last is missing.

Here our patch:

public suspend fun <T> Flow<T>.last(): T? = reduce { _, value -> value }
public suspend fun <T> Flow<T>.lastOrNull(): T? = fold<T?, T?>(null) { _, value -> value }

@lmj0011
Copy link

lmj0011 commented Jan 30, 2021

I also have a case where having Flow<T>.last() would be quite useful. I have a Helper class in my android project that produces unique numbers with the help of DataStore. Currently, I'm using flow.conflate().first() in an attempt to get latest value without providing an action block, which appears to work fine. For some reason, fvasco patch didn't work for me

Here's a snippet for brevity:

//[...]

fun getNextRuntimeUniqueInt(): Flow<Int?> {
        return dataStore.data.mapLatest { prefs ->
            prefs[Keys.NEXT_RUNTIME_UNIQUE_INT]
        }
    }

//[...]

val cnt = dataStoreHelper.getNextRuntimeUniqueLong().conflate().first()

full context:

DataStoreHelper.kt

class DataStoreHelper(val context: Context) {
    private val dataStore: DataStore<Preferences> = context.createDataStore(
        name = "preferences"
    )

    fun getNextRuntimeUniqueInt(): Flow<Int?> {
        return dataStore.data.mapLatest { prefs ->
            prefs[Keys.NEXT_RUNTIME_UNIQUE_INT]
        }
    }
    suspend fun setNextRuntimeUniqueInt(count: Int) {
        dataStore.edit { prefs ->
            prefs[Keys.NEXT_RUNTIME_UNIQUE_INT] = count
        }
    }

    fun getNextRuntimeUniqueLong(): Flow<Long?> {
        return dataStore.data.mapLatest { prefs ->
            prefs[Keys.NEXT_RUNTIME_UNIQUE_LONG]
        }
    }
    suspend fun setNextRuntimeUniqueLong(count: Long) {
        dataStore.edit { prefs ->
            prefs[Keys.NEXT_RUNTIME_UNIQUE_LONG] = count
        }
    }
}

UniqueRuntimeNumberHelper.kt

class UniqueRuntimeNumberHelper(val context: Context)  {
    companion object {
        const val INITIAL_NEXT_INT = 1
        const val INITIAL_NEXT_LONG = 1L
    }

    private val dataStoreHelper: DataStoreHelper = (context.applicationContext as App).kodein.instance()

    /**
     * Get a unique Integer, should only be used for runtime uniqueness
     */
    suspend fun nextInt(): Int {
        val cnt = dataStoreHelper.getNextRuntimeUniqueInt().conflate().first()
        val intCounter = if (cnt != null) AtomicInteger(cnt) else AtomicInteger(INITIAL_NEXT_INT)

        val value = intCounter.incrementAndGet()
        return when {
            (value < Int.MAX_VALUE) -> {
                dataStoreHelper.setNextRuntimeUniqueInt(value)
                value
            }
            else -> {
                dataStoreHelper.setNextRuntimeUniqueInt(INITIAL_NEXT_INT)
                INITIAL_NEXT_INT
            }
        }
    }

    /**
     * Get a unique Long, should only be used for runtime uniqueness
     */
    suspend fun nextLong(): Long {
        val cnt = dataStoreHelper.getNextRuntimeUniqueLong().conflate().first()
        val longCounter = if (cnt != null) AtomicLong(cnt) else AtomicLong(INITIAL_NEXT_LONG)

        val value = longCounter.incrementAndGet()
        return when {
            (value < Long.MAX_VALUE) -> {
                dataStoreHelper.setNextRuntimeUniqueLong(value)
                value
            }
            else -> {
                dataStoreHelper.setNextRuntimeUniqueLong(INITIAL_NEXT_LONG)
                INITIAL_NEXT_LONG
            }
        }
    }
}

@fvasco
Copy link
Contributor

fvasco commented Jan 30, 2021

Hi @lmj0011,
the last operator is supposed to return the last emitted value of a Flow.

Similar to: https://kotlinlang.org/api/latest/jvm/stdlib/kotlin.sequences/last.html

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants