Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add debounce selector #2314

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
177 changes: 145 additions & 32 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -64,36 +64,58 @@ fun main() = runBlocking {
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }
if (timeoutMillis == 0L) return this
return debounceInternal { timeoutMillis }
}

lastValue?.let { value ->
// set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(NULL.unbox(value))
}
}
}
}
/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* A variation of [debounce] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90)
* emit(2)
* delay(90)
* emit(3)
* delay(1010)
* emit(4)
* delay(1010)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0L
* } else {
* 1000L
* }
* }
* ```
* <!--- KNIT example-delay-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*
* @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds.
*/
@FlowPreview
public fun <T> Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T> =
debounceInternal { emittedItem ->
timeoutMillis(emittedItem)
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
Expand Down Expand Up @@ -129,7 +151,98 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())
public fun <T> Flow<T>.debounceWithDuration(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ideal that I am renaming this method from Flow<T>.debounce(timeout: Duration): Flow<T>.

The selector variation method name Flow<T>.debounce(timeout: (T) -> Duration): Flow<T> below was conflicting with Flow<T>.debounce(timeoutMillis: (T) -> Long): Flow<T>. I could just rename only the duration selector variation to e.g. Flow<T>.debounceWithDurationSelector(timeout: (T) -> Duration): Flow<T>, but I felt this would be a better grouping to keep the word duration in the method names for both instead of just one of them.

Please let me know if renaming an existing method concerns.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The right way to do it is this:

  • Keep the Kotlin name debounce
  • Add @JvmName("debounceDuration") to avoid platform declaration clash (we don't use With or other prepositions in these kind of names).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JvmName does not seem to work when a function argument is used. I see the issue mentioned in https://youtrack.jetbrains.com/issue/KT-22119. I pushed with debounceDuration(...).


/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* A variation of [debounceWithDuration] that allows specifying the timeout value dynamically.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce {
* if (it == 1) {
* 0.milliseconds
* } else {
* 1000.milliseconds
* }
* }
* ```
* <!--- KNIT example-delay-duration-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 4, 5
* ```
* <!--- TEST -->
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] milliseconds.
*
* @param timeout [T] is the emitted value and the return value is timeout in [Duration].
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounceWithDuration(timeout: (T) -> Duration): Flow<T> =
debounceInternal { emittedItem ->
timeout(emittedItem).toDelayMillis()
}

private fun <T> Flow<T>.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow<T> =
scopedFlow { downstream ->
// Actually Any, KT-30796
val values = produce<Any?>(capacity = 0) {
collect { value -> send(value ?: NULL) }
}
var lastValue: Any? = null
while (lastValue !== DONE) {
select<Unit> {
// Give a chance to consume lastValue first before onReceiveOrNull receives a new value
lastValue?.let { value ->
Copy link
Contributor Author

@mkano9 mkano9 Oct 18, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reversed the order of values.onReceiveOrNull and lastValue?.let because when the flow emits "A", "B" at once with 0 timeout like the testZeroDebounceTimeSelector test, values.onReceiveOrNull kept picking it up first and get selected by select<Unit>; therefore, skipping the first value "A".

val unboxedValue: T = NULL.unbox(value)
val timeoutMillis = timeoutMillisSelector(unboxedValue)
require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" }

if (timeoutMillis == 0L) {
lastValue = null
runBlocking {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of runBlocking/launch here? It should be just downstream.emit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the Suspension functions can be called only within coroutine body error. Any suggestions? I used runBlocking to make sure it gets executed before moving on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly what emit is doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant to say that the select {} block does not let me call a suspend function downstream.emit(unboxedValue).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should move this code outside of select {} block, that is, you should check conditions and emit if needed first, then do select.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I pushed a fix for this.

launch { downstream.emit(unboxedValue) }
}
} else {
// Set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
downstream.emit(unboxedValue)
}
}
}

// Should be receiveOrClosed when boxing issues are fixed
values.onReceiveOrNull {
if (it == null) {
if (lastValue != null) downstream.emit(NULL.unbox(lastValue))
lastValue = DONE
} else {
lastValue = it
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
Expand All @@ -144,15 +257,15 @@ public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.t
* }
* }.sample(200)
* ```
* <!--- KNIT example-delay-02.kt -->
* <!--- KNIT example-delay-03.kt -->
*
* produces the following emissions
*
* ```text
* 1, 3, 5, 7, 9
* ```
* <!--- TEST -->
*
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@FlowPreview
Expand Down Expand Up @@ -215,7 +328,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
* }
* }.sample(200.milliseconds)
* ```
* <!--- KNIT example-delay-duration-02.kt -->
* <!--- KNIT example-delay-duration-03.kt -->
*
* produces the following emissions
*
Expand Down
108 changes: 102 additions & 6 deletions kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt
Expand Up @@ -11,7 +11,7 @@ import kotlin.time.*

class DebounceTest : TestBase() {
@Test
public fun testBasic() = withVirtualTime {
fun testBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand Down Expand Up @@ -159,7 +159,7 @@ class DebounceTest : TestBase() {
expect(2)
throw TestException()
}.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map {
expectUnreached()
expectUnreached()
}
assertFailsWith<TestException>(flow)
finish(3)
Expand All @@ -175,7 +175,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -193,7 +192,6 @@ class DebounceTest : TestBase() {
expect(2)
yield()
throw TestException()
it
}

assertFailsWith<TestException>(flow)
Expand All @@ -202,7 +200,7 @@ class DebounceTest : TestBase() {

@ExperimentalTime
@Test
public fun testDurationBasic() = withVirtualTime {
fun testDurationBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
Expand All @@ -219,8 +217,106 @@ class DebounceTest : TestBase() {
}

expect(2)
val result = flow.debounce(1000.milliseconds).toList()
val result = flow.debounceWithDuration(1000.milliseconds).toList()
assertEquals(listOf("A", "D", "E"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit(1)
delay(90)
emit(2)
delay(90)
emit(3)
delay(1010)
emit(4)
delay(1010)
emit(5)
expect(4)
}

expect(2)
val result = flow.debounce {
if (it == 1) {
0
} else {
1000
}
}.toList()

assertEquals(listOf(1, 3, 4, 5), result)
finish(5)
}

@Test
fun testZeroDebounceTime() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
emit("C")
expect(4)
}

expect(2)
val result = flow.debounce(0).toList()

assertEquals(listOf("A", "B", "C"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testZeroDebounceTimeSelector() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
emit("B")
expect(4)
}

expect(2)
val result = flow.debounce { 0 }.toList()

assertEquals(listOf("A", "B"), result)
finish(5)
}

@ExperimentalTime
@Test
fun testDebounceDurationSelectorBasic() = withVirtualTime {
expect(1)
val flow = flow {
expect(3)
emit("A")
delay(1500.milliseconds)
emit("B")
delay(500.milliseconds)
emit("C")
delay(250.milliseconds)
emit("D")
delay(2000.milliseconds)
emit("E")
expect(4)
}

expect(2)
val result = flow.debounceWithDuration {
if (it == "C") {
0.milliseconds
} else {
1000.milliseconds
}
}.toList()

assertEquals(listOf("A", "C", "D", "E"), result)
finish(5)
}
}