Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debounce selector #2148

Closed
wants to merge 9 commits into from
99 changes: 71 additions & 28 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -16,7 +16,7 @@ import kotlin.time.*

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* that are followed by the newer values within the given [timeoutMillis].
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, leave this part of the docs as before (it is consistent with how sample is documented, too)

* The latest value is always emitted.
*
* Example:
Expand All @@ -37,39 +37,50 @@ import kotlin.time.*
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
* @param timeoutMillis must be positive
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
mkano9 marked this conversation as resolved.
Show resolved Hide resolved
return scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
return debounceInternal { timeoutMillis }
}

lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
/**
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
* ```
* flow {
* emit(1)
* delay(100)
* emit(2)
* delay(100)
* emit(3)
* delay(1000)
* emit(4)
* delay(100)
* emit(5)
* delay(100)
* emit(6)
* }.debounce {
* if (it == 4) {
* 0L
* } else {
* 300L
* }
* }
* ```
* produces `3, 4, 6`.
*
* @param timeoutMillisSelector [T] is the emitted value and the return value is timeout in milliseconds. 0ms timeout is allowed.
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillisSelector: (T) -> Long): Flow<T> =
debounceInternal { emittedItem ->
val timeoutMillis = timeoutMillisSelector(emittedItem)
require(timeoutMillis >= 0) { "Debounce timeout should not be negative" }
mkano9 marked this conversation as resolved.
Show resolved Hide resolved
timeoutMillis
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
Expand Down Expand Up @@ -99,6 +110,38 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}

lastValue?.let { value ->
val unboxedValue: T = NULL.unbox(value)
val timeoutMillis = timeoutMillisSelector(unboxedValue)
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A case with timeoutMillis == 0 should be explicitly optimized here -- emit it immediately instead of installing onTimeout case.

lastValue = null // Consume the value
downstream.emit(unboxedValue)
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
*
Expand Down
55 changes: 51 additions & 4 deletions kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
Expand Up @@ -11,7 +11,7 @@ import kotlin.time.*

class DebounceTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand Down Expand Up @@ -175,7 +175,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -193,7 +192,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -202,7 +200,7 @@ class DebounceTest : TestBase() {

@ExperimentalTime
@Test
public fun testDurationBasic() = withVirtualTime {
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand All @@ -223,4 +221,53 @@ class DebounceTest : TestBase() {
assertEquals(listOf("A", "D", "E"), result)
finish(5)
}

@Test
fun testDebounceSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500)
emit("B")
delay(500)
emit("C")
delay(1)
emit("D")
delay(800)
emit("E")
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == "C") {
0
} else {
1000
}
}.toList()

assertEquals(listOf("A", "C", "E"), result)
finish(5)
}

@Test
fun testZeroDebounceTime() = runTest {
assertFailsWith<IllegalArgumentException> {
flowOf<Int>().debounce(0)
}
}

@Test
fun testNegativeDebounceTimeSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(2)
emit("A")
delay(1)
mkano9 marked this conversation as resolved.
Show resolved Hide resolved
}
assertFailsWith<IllegalArgumentException>(flow.debounce { -1 })
finish(3)
}
}