diff --git a/docs/shared-mutable-state-and-concurrency.md b/docs/shared-mutable-state-and-concurrency.md index 316d56e5bc..8b83ad0b20 100644 --- a/docs/shared-mutable-state-and-concurrency.md +++ b/docs/shared-mutable-state-and-concurrency.md @@ -24,7 +24,7 @@ but others are unique. ### The problem -Let us launch a hundred coroutines all doing the same action thousand times. +Let us launch a hundred coroutines all doing the same action a thousand times. We'll also measure their completion time for further comparisons:
@@ -102,7 +102,7 @@ increment the `counter` concurrently from multiple threads without any synchroni ### Volatiles are of no help -There is common misconception that making a variable `volatile` solves concurrency problem. Let us try it: +There is a common misconception that making a variable `volatile` solves concurrency problem. Let us try it: @@ -158,7 +158,7 @@ do not provide atomicity of larger actions (increment in our case). ### Thread-safe data structures The general solution that works both for threads and for coroutines is to use a thread-safe (aka synchronized, -linearizable, or atomic) data structure that provides all the necessarily synchronization for the corresponding +linearizable, or atomic) data structure that provides all the necessary synchronization for the corresponding operations that needs to be performed on a shared state. In the case of a simple counter we can use `AtomicInteger` class which has atomic `incrementAndGet` operations: diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index aa55fea721..0c40691640 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -64,36 +64,58 @@ fun main() = runBlocking { */ @FlowPreview public fun Flow.debounce(timeoutMillis: Long): Flow { - require(timeoutMillis > 0) { "Debounce timeout should be positive" } - return scopedFlow { downstream -> - // Actually Any, KT-30796 - val values = produce(capacity = Channel.CONFLATED) { - collect { value -> send(value ?: NULL) } - } - var lastValue: Any? = null - while (lastValue !== DONE) { - select { - // Should be receiveOrClosed when boxing issues are fixed - values.onReceiveOrNull { - if (it == null) { - if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) - lastValue = DONE - } else { - lastValue = it - } - } + require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } + if (timeoutMillis == 0L) return this + return debounceInternal { timeoutMillis } +} - lastValue?.let { value -> - // set timeout when lastValue != null - onTimeout(timeoutMillis) { - lastValue = null // Consume the value - downstream.emit(NULL.unbox(value)) - } - } - } - } +/** + * Returns a flow that mirrors the original flow, but filters out values + * that are followed by the newer values within the given [timeout][timeoutMillis]. + * The latest value is always emitted. + * + * A variation of [debounce] that allows specifying the timeout value dynamically. + * + * Example: + * + * ```kotlin + * flow { + * emit(1) + * delay(90) + * emit(2) + * delay(90) + * emit(3) + * delay(1010) + * emit(4) + * delay(1010) + * emit(5) + * }.debounce { + * if (it == 1) { + * 0L + * } else { + * 1000L + * } + * } + * ``` + * + * + * produces the following emissions + * + * ```text + * 1, 3, 4, 5 + * ``` + * + * + * Note that the resulting flow does not emit anything as long as the original flow emits + * items faster than every [timeoutMillis] milliseconds. + * + * @param timeoutMillis [T] is the emitted value and the return value is timeout in milliseconds. + */ +@FlowPreview +public fun Flow.debounce(timeoutMillis: (T) -> Long): Flow = + debounceInternal { emittedItem -> + timeoutMillis(emittedItem) } -} /** * Returns a flow that mirrors the original flow, but filters out values @@ -129,7 +151,98 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { */ @ExperimentalTime @FlowPreview -public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.toDelayMillis()) +public fun Flow.debounceWithDuration(timeout: Duration): Flow = debounce(timeout.toDelayMillis()) + +/** + * Returns a flow that mirrors the original flow, but filters out values + * that are followed by the newer values within the given [timeout]. + * The latest value is always emitted. + * + * A variation of [debounceWithDuration] that allows specifying the timeout value dynamically. + * + * Example: + * + * ```kotlin + * flow { + * emit(1) + * delay(90.milliseconds) + * emit(2) + * delay(90.milliseconds) + * emit(3) + * delay(1010.milliseconds) + * emit(4) + * delay(1010.milliseconds) + * emit(5) + * }.debounce { + * if (it == 1) { + * 0.milliseconds + * } else { + * 1000.milliseconds + * } + * } + * ``` + * + * + * produces the following emissions + * + * ```text + * 1, 3, 4, 5 + * ``` + * + * + * Note that the resulting flow does not emit anything as long as the original flow emits + * items faster than every [timeout] milliseconds. + * + * @param timeout [T] is the emitted value and the return value is timeout in [Duration]. + */ +@ExperimentalTime +@FlowPreview +public fun Flow.debounceWithDuration(timeout: (T) -> Duration): Flow = + debounceInternal { emittedItem -> + timeout(emittedItem).toDelayMillis() + } + +private fun Flow.debounceInternal(timeoutMillisSelector: (T) -> Long) : Flow = + scopedFlow { downstream -> + // Actually Any, KT-30796 + val values = produce(capacity = 0) { + collect { value -> send(value ?: NULL) } + } + var lastValue: Any? = null + while (lastValue !== DONE) { + select { + // Give a chance to consume lastValue first before onReceiveOrNull receives a new value + lastValue?.let { value -> + val unboxedValue: T = NULL.unbox(value) + val timeoutMillis = timeoutMillisSelector(unboxedValue) + require(timeoutMillis >= 0L) { "Debounce timeout should not be negative" } + + if (timeoutMillis == 0L) { + lastValue = null + runBlocking { + launch { downstream.emit(unboxedValue) } + } + } else { + // Set timeout when lastValue != null + onTimeout(timeoutMillis) { + lastValue = null // Consume the value + downstream.emit(unboxedValue) + } + } + } + + // Should be receiveOrClosed when boxing issues are fixed + values.onReceiveOrNull { + if (it == null) { + if (lastValue != null) downstream.emit(NULL.unbox(lastValue)) + lastValue = DONE + } else { + lastValue = it + } + } + } + } + } /** * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. @@ -144,7 +257,7 @@ public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.t * } * }.sample(200) * ``` - * + * * * produces the following emissions * @@ -215,7 +328,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil * } * }.sample(200.milliseconds) * ``` - * + * * * produces the following emissions * diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt index 4065671e3d..cd30ec54b8 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt @@ -11,7 +11,7 @@ import kotlin.time.* class DebounceTest : TestBase() { @Test - public fun testBasic() = withVirtualTime { + fun testBasic() = withVirtualTime { expect(1) val flow = flow { expect(3) @@ -159,7 +159,7 @@ class DebounceTest : TestBase() { expect(2) throw TestException() }.flowOn(NamedDispatchers("source")).debounce(Long.MAX_VALUE).map { - expectUnreached() + expectUnreached() } assertFailsWith(flow) finish(3) @@ -175,7 +175,6 @@ class DebounceTest : TestBase() { expect(2) yield() throw TestException() - it } assertFailsWith(flow) @@ -193,7 +192,6 @@ class DebounceTest : TestBase() { expect(2) yield() throw TestException() - it } assertFailsWith(flow) @@ -202,7 +200,7 @@ class DebounceTest : TestBase() { @ExperimentalTime @Test - public fun testDurationBasic() = withVirtualTime { + fun testDurationBasic() = withVirtualTime { expect(1) val flow = flow { expect(3) @@ -219,8 +217,106 @@ class DebounceTest : TestBase() { } expect(2) - val result = flow.debounce(1000.milliseconds).toList() + val result = flow.debounceWithDuration(1000.milliseconds).toList() assertEquals(listOf("A", "D", "E"), result) finish(5) } + + @ExperimentalTime + @Test + fun testDebounceSelectorBasic() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit(1) + delay(90) + emit(2) + delay(90) + emit(3) + delay(1010) + emit(4) + delay(1010) + emit(5) + expect(4) + } + + expect(2) + val result = flow.debounce { + if (it == 1) { + 0 + } else { + 1000 + } + }.toList() + + assertEquals(listOf(1, 3, 4, 5), result) + finish(5) + } + + @Test + fun testZeroDebounceTime() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + emit("B") + emit("C") + expect(4) + } + + expect(2) + val result = flow.debounce(0).toList() + + assertEquals(listOf("A", "B", "C"), result) + finish(5) + } + + @ExperimentalTime + @Test + fun testZeroDebounceTimeSelector() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + emit("B") + expect(4) + } + + expect(2) + val result = flow.debounce { 0 }.toList() + + assertEquals(listOf("A", "B"), result) + finish(5) + } + + @ExperimentalTime + @Test + fun testDebounceDurationSelectorBasic() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500.milliseconds) + emit("B") + delay(500.milliseconds) + emit("C") + delay(250.milliseconds) + emit("D") + delay(2000.milliseconds) + emit("E") + expect(4) + } + + expect(2) + val result = flow.debounceWithDuration { + if (it == "C") { + 0.milliseconds + } else { + 1000.milliseconds + } + }.toList() + + assertEquals(listOf("A", "C", "D", "E"), result) + finish(5) + } }