Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debounce selector #2336

Merged
merged 5 commits into from Oct 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -941,7 +941,9 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun debounce-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow;
public static final fun debounceDuration (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down
185 changes: 152 additions & 33 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -64,37 +64,59 @@ fun main() = runBlocking {
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
elizarov marked this conversation as resolved.
Show resolved Hide resolved
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}

lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
}
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
}

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0L
* } else {
* 1000L
* }
* }
* ```
* <!--- KNIT example-delay-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*
* @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
*/
@FlowPreview
@OptIn(kotlin.experimental.ExperimentalTypeInference::class)
@OverloadResolutionByLambdaReturnType
public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
debounceInternal(timeoutMillis)

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
Expand Down Expand Up @@ -129,7 +151,104 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> =
debounce(timeout.toDelayMillis())

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0.milliseconds
* } else {
* 1000.milliseconds
* }
* }
* ```
* <!--- KNIT example-delay-duration-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] unit.
*
* @param timeout [T] is the emitted value and the return value is timeout in [Duration].
elizarov marked this conversation as resolved.
Show resolved Hide resolved
*/
@ExperimentalTime
@FlowPreview
@JvmName("debounceDuration")
@OptIn(kotlin.experimental.ExperimentalTypeInference::class)
@OverloadResolutionByLambdaReturnType
public fun <T> Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> =
debounceInternal { emittedItem ->
timeout(emittedItem).toDelayMillis()
}

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Produce the values using the default (rendezvous) channel
// Note: the actual type is Any, KT-30796
val values = produce<Any?> {
elizarov marked this conversation as resolved.
Show resolved Hide resolved
collect { value -> send(value ?: NULL) }
}
// Now consume the values
var lastValue: Any? = null
while (lastValue !== DONE) {
var timeoutMillis = 0L // will be always computed when lastValue != null
// Compute timeout for this value
if (lastValue != null) {
timeoutMillis = timeoutMillisSelector(NULL.unbox(lastValue))
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) {
downstream.emit(NULL.unbox(lastValue))
lastValue = null // Consume the value
}
}
// assert invariant: lastValue != null implies timeoutMillis > 0
assert { lastValue == null || timeoutMillis > 0 }
// wait for the next value with timeout
select<Unit> {
// Set timeout when lastValue exists and is not consumed yet
if (lastValue != null) {
onTimeout(timeoutMillis) {
downstream.emit(NULL.unbox(lastValue))
lastValue = null // Consume the value
}
}
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull { value ->
if (value == null) {
elizarov marked this conversation as resolved.
Show resolved Hide resolved
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = value
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
Expand All @@ -144,15 +263,15 @@ public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.t
* }
* }.sample(200)
* ```
* <!--- KNIT example-delay-02.kt -->
* <!--- KNIT example-delay-03.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 5, 7, 9
* ```
* <!--- TEST -->
*
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@FlowPreview
Expand Down Expand Up @@ -215,7 +334,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
* }
* }.sample(200.milliseconds)
* ```
* <!--- KNIT example-delay-duration-02.kt -->
* <!--- KNIT example-delay-duration-03.kt -->
*
* produces the following emissions
*
Expand Down
106 changes: 101 additions & 5 deletions kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
Expand Up @@ -11,7 +11,7 @@ import kotlin.time.*

class DebounceTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand Down Expand Up @@ -159,7 +159,7 @@ class DebounceTest : TestBase() {
expect(2)
throw TestException()
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
expectUnreached()
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
Expand All @@ -175,7 +175,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -193,7 +192,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -202,7 +200,7 @@ class DebounceTest : TestBase() {

@ExperimentalTime
@Test
public fun testDurationBasic() = withVirtualTime {
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand All @@ -223,4 +221,102 @@ class DebounceTest : TestBase() {
assertEquals(listOf("A", "D", "E"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == 1) {
0
} else {
1000
}
}.toList()

assertEquals(listOf(1, 3, 4, 5), result)
finish(5)
}

@Test
fun testZeroDebounceTime() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
emit("C")
expect(4)
}

expect(2)
val result = flow.debounce(0).toList()

assertEquals(listOf("A", "B", "C"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testZeroDebounceTimeSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
expect(4)
}

expect(2)
val result = flow.debounce { 0 }.toList()

assertEquals(listOf("A", "B"), result)
finish(5)
}

@ExperimentalTime
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
@Test
fun testDebounceDurationSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500.milliseconds)
emit("B")
delay(500.milliseconds)
emit("C")
delay(250.milliseconds)
emit("D")
delay(2000.milliseconds)
emit("E")
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == "C") {
0.milliseconds
} else {
1000.milliseconds
}
}.toList()

assertEquals(listOf("A", "C", "D", "E"), result)
finish(5)
}
}