Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debounce selector #2148

Closed
wants to merge 9 commits into from
101 changes: 72 additions & 29 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -16,7 +16,7 @@ import kotlin.time.*

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* that are followed by the newer values within the given [timeoutMillis].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, leave this part of the docs as before (it is consistent with how sample is documented, too)

* The latest value is always emitted.
*
* Example:
Expand All @@ -40,36 +40,46 @@ import kotlin.time.*
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
}

lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
/**
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
* ```
* flow {
* emit(1)
* delay(100)
* emit(2)
* delay(100)
* emit(3)
* delay(1000)
* emit(4)
* delay(100)
* emit(5)
* delay(100)
* emit(6)
* }.debounce {
* if (it == 4) {
* 0L
* } else {
* 300L
* }
* }
* ```
* produces `3, 4, 6`.
*
* @param timeoutMillisSelector [T] is the emitted value and the return value is timeout in milliseconds.
*/
@ExperimentalTime
Copy link
Contributor

@elizarov elizarov Oct 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one should not have @ExperimentalTime annotation, but there should be another version with (T) -> Duration selector that has one.

@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillisSelector: (T) -> Long): Flow<T> =
debounceInternal { emittedItem ->
timeoutMillisSelector(emittedItem)
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
Expand Down Expand Up @@ -99,6 +109,39 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = 0) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}

lastValue?.let { value ->
val unboxedValue: T = NULL.unbox(value)
val timeoutMillis = timeoutMillisSelector(unboxedValue)
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A case with timeoutMillis == 0 should be explicitly optimized here -- emit it immediately instead of installing onTimeout case.

lastValue = null // Consume the value
downstream.emit(unboxedValue)
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
*
Expand Down
74 changes: 69 additions & 5 deletions kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
Expand Up @@ -11,7 +11,7 @@ import kotlin.time.*

class DebounceTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand Down Expand Up @@ -159,7 +159,7 @@ class DebounceTest : TestBase() {
expect(2)
throw TestException()
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
expectUnreached()
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
Expand All @@ -175,7 +175,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -193,7 +192,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -202,7 +200,7 @@ class DebounceTest : TestBase() {

@ExperimentalTime
@Test
public fun testDurationBasic() = withVirtualTime {
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand All @@ -223,4 +221,70 @@ class DebounceTest : TestBase() {
assertEquals(listOf("A", "D", "E"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500)
emit("B")
delay(500)
emit("C")
delay(1)
emit("D")
delay(800)
emit("E")
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == "C") {
0
} else {
1000
}
}.toList()

assertEquals(listOf("A", "C", "E"), result)
finish(5)
}

@Test
fun testZeroDebounceTime() = withVirtualTime {
expect(1)
val flow = flow {
expect(2)
emit("A")
emit("B")
emit("C")
expect(3)
}

val result = flow.debounce(0).toList()

assertEquals(listOf("A", "B", "C"), result)
finish(4)
}

@ExperimentalTime
@Test
fun testZeroDebounceTimeSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(2)
emit("A")
delay(1)
mkano9 marked this conversation as resolved.
Show resolved Hide resolved
emit("B")
expect(3)
}

val result = flow.debounce { 0 }.toList()

assertEquals(listOf("A", "B"), result)
finish(4)
}
}