Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debounce selector #2314

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -941,7 +941,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun debounce-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun debounceDuration (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun debounceDuration-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down
174 changes: 142 additions & 32 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -64,36 +64,58 @@ fun main() = runBlocking {
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
}

lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0L
* } else {
* 1000L
* }
* }
* ```
* <!--- KNIT example-delay-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*
* @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
debounceInternal { emittedItem ->
timeoutMillis(emittedItem)
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
Expand Down Expand Up @@ -129,7 +151,95 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())
public fun <T> Flow<T>.debounceDuration(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* A variation of [debounceDuration] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0.milliseconds
* } else {
* 1000.milliseconds
* }
* }
* ```
* <!--- KNIT example-delay-duration-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] milliseconds.
*
* @param timeout [T] is the emitted value and the return value is timeout in [Duration].
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounceDuration(timeout: (T) -> Duration): Flow<T> =
debounceInternal { emittedItem ->
timeout(emittedItem).toDelayMillis()
}

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = 0) {
collect { value -> send(value ?: NULL) }
}

var lastValue: Any? = null
while (lastValue !== DONE) {
val timeoutMillis = lastValue?.let { timeoutMillisSelector(NULL.unbox(it)) }
if (timeoutMillis != null) {
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) {
downstream.emit(NULL.unbox(lastValue))
lastValue = null // Consume the value
}
}
select<Unit> {
// Set timeout when lastValue exists and is not consumed yet
if (lastValue != null && timeoutMillis != null) {
onTimeout(timeoutMillis) {
downstream.emit(NULL.unbox(lastValue))
lastValue = null // Consume the value
}
}

// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
Expand All @@ -144,15 +254,15 @@ public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.t
* }
* }.sample(200)
* ```
* <!--- KNIT example-delay-02.kt -->
* <!--- KNIT example-delay-03.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 5, 7, 9
* ```
* <!--- TEST -->
*
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@FlowPreview
Expand Down Expand Up @@ -215,7 +325,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
* }
* }.sample(200.milliseconds)
* ```
* <!--- KNIT example-delay-duration-02.kt -->
* <!--- KNIT example-delay-duration-03.kt -->
*
* produces the following emissions
*
Expand Down
108 changes: 102 additions & 6 deletions kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
Expand Up @@ -11,7 +11,7 @@ import kotlin.time.*

class DebounceTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand Down Expand Up @@ -159,7 +159,7 @@ class DebounceTest : TestBase() {
expect(2)
throw TestException()
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
expectUnreached()
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
Expand All @@ -175,7 +175,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -193,7 +192,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -202,7 +200,7 @@ class DebounceTest : TestBase() {

@ExperimentalTime
@Test
public fun testDurationBasic() = withVirtualTime {
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand All @@ -219,8 +217,106 @@ class DebounceTest : TestBase() {
}

expect(2)
val result = flow.debounce(1000.milliseconds).toList()
val result = flow.debounceDuration(1000.milliseconds).toList()
assertEquals(listOf("A", "D", "E"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == 1) {
0
} else {
1000
}
}.toList()

assertEquals(listOf(1, 3, 4, 5), result)
finish(5)
}

@Test
fun testZeroDebounceTime() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
emit("C")
expect(4)
}

expect(2)
val result = flow.debounce(0).toList()

assertEquals(listOf("A", "B", "C"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testZeroDebounceTimeSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
expect(4)
}

expect(2)
val result = flow.debounce { 0 }.toList()

assertEquals(listOf("A", "B"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceDurationSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500.milliseconds)
emit("B")
delay(500.milliseconds)
emit("C")
delay(250.milliseconds)
emit("D")
delay(2000.milliseconds)
emit("E")
expect(4)
}

expect(2)
val result = flow.debounceDuration {
if (it == "C") {
0.milliseconds
} else {
1000.milliseconds
}
}.toList()

assertEquals(listOf("A", "C", "D", "E"), result)
finish(5)
}
}
19 changes: 15 additions & 4 deletions kotlinx-coroutines-core/jvm/test/examples/example-delay-02.kt
Expand Up @@ -11,9 +11,20 @@ import kotlinx.coroutines.flow.*
fun main() = runBlocking {

flow {
repeat(10) {
emit(it)
delay(110)
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
}.debounce {
if (it == 1) {
0L
} else {
1000L
}
}.sample(200)
}
.toList().joinToString().let { println(it) } }