Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debounce operator with selector to Flow #1216

Closed
Thomas-Vos opened this issue May 22, 2019 · 2 comments · Fixed by #2336
Closed

Add debounce operator with selector to Flow #1216

Thomas-Vos opened this issue May 22, 2019 · 2 comments · Fixed by #2336
Labels

Comments

@Thomas-Vos
Copy link
Contributor

Thomas-Vos commented May 22, 2019

Could an operator like the following be added to Flow? The debounce operator is already added, but I need one with a different timeout depending on the item. It shouldn’t be too difficult to modify the current operator to add this.

fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T>

My use case is the following:
when the app is opened, it connects to multiple devices simultaneously. This normally takes about a second. It shows connecting... and finally connected. What I want is that the "connecting" text is only shown if it takes more than one second. For example if the device is connected in 0.5 seconds, the "connecting" message must not be shown (only the "connected" message).

I need the timeout for "connecting, because otherwise the "connecting" message will be shown very shortly if the device is connected quickly.

Here's an example:

enum class ConnectionState {
    DISCONNECTED,
    CONNECTING,
    CONNECTED
}

fun appVisible() {
    // Multiple devices are being connected simultaneously. Every time one of the
    // connection states changes, a new list is sent to this flow.
    val statesFlow: Flow<List<ConnectionState>> = observeConnectionStates()
    
    statesFlow
        .debounce { states ->
            if (states.any { it == ConnectionState.CONNECTING }) {
                1000L // still connecting, delay by one second if nothing changes.
            } else {
                0L // all devices are connected, immediately show the message
            }
        }
        .collect { TODO("update message") }
}

Previously I was using this RxJava operator (line 7294-7325), but I am migrating my code to coroutines for multiplatform support.

(related discussion on Slack: https://kotlinlang.slack.com/archives/C1CFAFJSK/p1558109098082700)

@Thomas-Vos
Copy link
Contributor Author

I'm currently using the following implementation. It is a modified version of the already implemented debounce operator:

fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> = flow {
    coroutineScope {
        val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
        // Channel is not closed deliberately as there is no close with value
        val collector = launch {
            try {
                collect { value -> values.send(value ?: NullSurrogate) }
            } catch (e: Throwable) {
                values.close(e) // Workaround for #1130
                throw e
            }
        }

        var isDone = false
        var lastValue: Any? = null
        while (!isDone) {
            select<Unit> {
                values.onReceive {
                    lastValue = it
                }

                lastValue?.let { value -> // set timeout when lastValue != null
                    val unboxed: T = NullSurrogate.unbox(value)
                    val valueTimeout = timeoutMillis(unboxed)
                    require(valueTimeout >= 0) { "Debounce timeout should be positive" }
                    onTimeout(valueTimeout) {
                        lastValue = null // Consume the value
                        emit(unboxed)
                    }
                }

                // Close with value 'idiom'
                collector.onJoin {
                    if (lastValue != null) emit(NullSurrogate.unbox(lastValue))
                    isDone = true
                }
            }
        }
    }
}

@Thomas-Vos
Copy link
Contributor Author

Added in version 1.4.0, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants