diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index a0ef0740f8..db821a1e05 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -267,6 +267,7 @@ public final class kotlinx/coroutines/Delay$DefaultImpls { public final class kotlinx/coroutines/DelayKt { public static final fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/DispatchedContinuationKt { @@ -527,7 +528,9 @@ public final class kotlinx/coroutines/TimeoutCancellationException : java/util/c public final class kotlinx/coroutines/TimeoutKt { public static final fun withTimeout (JLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun withTimeout-lwyi7ZQ (DLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun withTimeoutOrNull (JLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun withTimeoutOrNull-lwyi7ZQ (DLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } public final class kotlinx/coroutines/YieldKt { @@ -888,6 +891,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; + public static final fun debounce-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow; public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; @@ -954,6 +958,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun retryWhen (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow; public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow; + public static final fun sample-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow; public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; @@ -1092,6 +1097,7 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance { } public final class kotlinx/coroutines/selects/SelectKt { + public static final fun onTimeout-0lHKgQg (Lkotlinx/coroutines/selects/SelectBuilder;DLkotlin/jvm/functions/Function1;)V public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index 3404f63611..cc205ecaf0 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -6,6 +6,7 @@ package kotlinx.coroutines import kotlinx.coroutines.selects.* import kotlin.coroutines.* +import kotlin.time.* /** * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support @@ -75,5 +76,26 @@ public suspend fun delay(timeMillis: Long) { } } +/** + * Delays coroutine for a given [duration] without blocking a thread and resumes it after the specified time. + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. + * + * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. + */ +@ExperimentalTime +public suspend fun delay(duration: Duration) = delay(duration.toDelayMillis()) + /** Returns [Delay] implementation of the given context */ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay + +/** + * Convert this duration to its millisecond value. + * Positive durations are coerced at least `1`. + */ +@ExperimentalTime +internal fun Duration.toDelayMillis(): Long = + if (this > Duration.ZERO) toLongMilliseconds().coerceAtLeast(1) else 0 diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 8aedba30a1..87fe733773 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -10,6 +10,7 @@ import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* +import kotlin.time.* /** * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws @@ -32,6 +33,22 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco } } +/** + * Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws + * a [TimeoutCancellationException] if the timeout was exceeded. + * + * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of + * the cancellable suspending function inside the block throws a [TimeoutCancellationException]. + * + * The sibling function that does not throw an exception on timeout is [withTimeoutOrNull]. + * Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. + * + * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + */ +@ExperimentalTime +public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T = + withTimeout(timeout.toDelayMillis(), block) + /** * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns * `null` if this timeout was exceeded. @@ -65,6 +82,22 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout } } +/** + * Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns + * `null` if this timeout was exceeded. + * + * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of + * cancellable suspending function inside the block throws a [TimeoutCancellationException]. + * + * The sibling function that throws an exception on timeout is [withTimeout]. + * Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. + * + * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + */ +@ExperimentalTime +public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? = + withTimeoutOrNull(timeout.toDelayMillis(), block) + private fun setupTimeout( coroutine: TimeoutCoroutine, block: suspend CoroutineScope.() -> T diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index e473561e3d..5f3c900c1b 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -12,7 +12,7 @@ import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.internal.* import kotlinx.coroutines.selects.* import kotlin.jvm.* -import kotlinx.coroutines.flow.internal.unsafeFlow as flow +import kotlin.time.* /** * Returns a flow that mirrors the original flow, but filters out values @@ -71,6 +71,34 @@ public fun Flow.debounce(timeoutMillis: Long): Flow { } } +/** + * Returns a flow that mirrors the original flow, but filters out values + * that are followed by the newer values within the given [timeout]. + * The latest value is always emitted. + * + * Example: + * ``` + * flow { + * emit(1) + * delay(90.milliseconds) + * emit(2) + * delay(90.milliseconds) + * emit(3) + * delay(1010.milliseconds) + * emit(4) + * delay(1010.milliseconds) + * emit(5) + * }.debounce(1000.milliseconds) + * ``` + * produces `3, 4, 5`. + * + * Note that the resulting flow does not emit anything as long as the original flow emits + * items faster than every [timeout] milliseconds. + */ +@ExperimentalTime +@FlowPreview +public fun Flow.debounce(timeout: Duration): Flow = debounce(timeout.toDelayMillis()) + /** * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. * @@ -133,3 +161,23 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil } } } + +/** + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period]. + * + * Example: + * ``` + * flow { + * repeat(10) { + * emit(it) + * delay(50.milliseconds) + * } + * }.sample(100.milliseconds) + * ``` + * produces `1, 3, 5, 7, 9`. + * + * Note that the latest element is not emitted if it does not fit into the sampling window. + */ +@ExperimentalTime +@FlowPreview +public fun Flow.sample(period: Duration): Flow = sample(period.toDelayMillis()) diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 3ba1b06215..1df5c62bf5 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -14,6 +14,7 @@ import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* import kotlin.native.concurrent.* +import kotlin.time.* /** * Scope for [select] invocation. @@ -52,6 +53,17 @@ public interface SelectBuilder { public fun onTimeout(timeMillis: Long, block: suspend () -> R) } +/** + * Clause that selects the given [block] after the specified [timeout] passes. + * If timeout is negative or zero, [block] is selected immediately. + * + * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future. + */ +@ExperimentalCoroutinesApi +@ExperimentalTime +public fun SelectBuilder.onTimeout(timeout: Duration, block: suspend () -> R) = + onTimeout(timeout.toDelayMillis(), block) + /** * Clause for [select] expression without additional parameters that does not select any value. */ diff --git a/kotlinx-coroutines-core/common/test/DelayDurationTest.kt b/kotlinx-coroutines-core/common/test/DelayDurationTest.kt new file mode 100644 index 0000000000..3dd55bde84 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/DelayDurationTest.kt @@ -0,0 +1,72 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "DEPRECATION") + +// KT-21913 + +package kotlinx.coroutines + +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class DelayDurationTest : TestBase() { + + @Test + fun testCancellation() = runTest(expected = { it is CancellationException }) { + runAndCancel(1.seconds) + } + + @Test + fun testInfinite() = runTest(expected = { it is CancellationException }) { + runAndCancel(Duration.INFINITE) + } + + @Test + fun testRegularDelay() = runTest { + val deferred = async { + expect(2) + delay(1.seconds) + expect(4) + } + + expect(1) + yield() + expect(3) + deferred.await() + finish(5) + } + + @Test + fun testNanoDelay() = runTest { + val deferred = async { + expect(2) + delay(1.nanoseconds) + expect(4) + } + + expect(1) + yield() + expect(3) + deferred.await() + finish(5) + } + + private suspend fun runAndCancel(time: Duration) = coroutineScope { + expect(1) + val deferred = async { + expect(2) + delay(time) + expectUnreached() + } + + yield() + expect(3) + require(deferred.isActive) + deferred.cancel() + finish(4) + deferred.await() + } +} diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt new file mode 100644 index 0000000000..b91a87f0bd --- /dev/null +++ b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt @@ -0,0 +1,214 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "UNREACHABLE_CODE") // KT-21913 + +package kotlinx.coroutines + +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class WithTimeoutDurationTest : TestBase() { + /** + * Tests a case of no timeout and no suspension inside. + */ + @Test + fun testBasicNoSuspend() = runTest { + expect(1) + val result = withTimeout(10.seconds) { + expect(2) + "OK" + } + assertEquals("OK", result) + finish(3) + } + + /** + * Tests a case of no timeout and one suspension inside. + */ + @Test + fun testBasicSuspend() = runTest { + expect(1) + val result = withTimeout(10.seconds) { + expect(2) + yield() + expect(3) + "OK" + } + assertEquals("OK", result) + finish(4) + } + + /** + * Tests proper dispatching of `withTimeout` blocks + */ + @Test + fun testDispatch() = runTest { + expect(1) + launch { + expect(4) + yield() // back to main + expect(7) + } + expect(2) + // test that it does not yield to the above job when started + val result = withTimeout(1.seconds) { + expect(3) + yield() // yield only now + expect(5) + "OK" + } + assertEquals("OK", result) + expect(6) + yield() // back to launch + finish(8) + } + + + /** + * Tests that a 100% CPU-consuming loop will react on timeout if it has yields. + */ + @Test + fun testYieldBlockingWithTimeout() = runTest( + expected = { it is CancellationException } + ) { + withTimeout(100.milliseconds) { + while (true) { + yield() + } + } + } + + /** + * Tests that [withTimeout] waits for children coroutines to complete. + */ + @Test + fun testWithTimeoutChildWait() = runTest { + expect(1) + withTimeout(100.milliseconds) { + expect(2) + // launch child with timeout + launch { + expect(4) + } + expect(3) + // now will wait for child before returning + } + finish(5) + } + + @Test + fun testBadClass() = runTest { + val bad = BadClass() + val result = withTimeout(100.milliseconds) { + bad + } + assertSame(bad, result) + } + + class BadClass { + override fun equals(other: Any?): Boolean = error("Should not be called") + override fun hashCode(): Int = error("Should not be called") + override fun toString(): String = error("Should not be called") + } + + @Test + fun testExceptionOnTimeout() = runTest { + expect(1) + try { + withTimeout(100.milliseconds) { + expect(2) + delay(1000.milliseconds) + expectUnreached() + "OK" + } + } catch (e: CancellationException) { + assertEquals("Timed out waiting for 100 ms", e.message) + finish(3) + } + } + + @Test + fun testSuppressExceptionWithResult() = runTest( + expected = { it is CancellationException } + ) { + expect(1) + withTimeout(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + finish(3) + } + "OK" + } + expectUnreached() + } + + @Test + fun testSuppressExceptionWithAnotherException() = runTest { + expect(1) + try { + withTimeout(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + expect(3) + throw TestException() + } + expectUnreached() + "OK" + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } + + @Test + fun testNegativeTimeout() = runTest { + expect(1) + try { + withTimeout(-1.milliseconds) { + expectUnreached() + "OK" + } + } catch (e: TimeoutCancellationException) { + assertEquals("Timed out immediately", e.message) + finish(2) + } + } + + @Test + fun testExceptionFromWithinTimeout() = runTest { + expect(1) + try { + expect(2) + withTimeout(1.seconds) { + expect(3) + throw TestException() + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } + + @Test + fun testIncompleteWithTimeoutState() = runTest { + lateinit var timeoutJob: Job + val handle = withTimeout(Duration.INFINITE) { + timeoutJob = coroutineContext[Job]!! + timeoutJob.invokeOnCompletion { } + } + + handle.dispose() + timeoutJob.join() + assertTrue(timeoutJob.isCompleted) + assertFalse(timeoutJob.isActive) + assertFalse(timeoutJob.isCancelled) + } +} diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutOrNullDurationTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutOrNullDurationTest.kt new file mode 100644 index 0000000000..f72bb7d99d --- /dev/null +++ b/kotlinx-coroutines-core/common/test/WithTimeoutOrNullDurationTest.kt @@ -0,0 +1,243 @@ + +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class WithTimeoutOrNullDurationTest : TestBase() { + /** + * Tests a case of no timeout and no suspension inside. + */ + @Test + fun testBasicNoSuspend() = runTest { + expect(1) + val result = withTimeoutOrNull(10.seconds) { + expect(2) + "OK" + } + assertEquals("OK", result) + finish(3) + } + + /** + * Tests a case of no timeout and one suspension inside. + */ + @Test + fun testBasicSuspend() = runTest { + expect(1) + val result = withTimeoutOrNull(10.seconds) { + expect(2) + yield() + expect(3) + "OK" + } + assertEquals("OK", result) + finish(4) + } + + /** + * Tests property dispatching of `withTimeoutOrNull` blocks + */ + @Test + fun testDispatch() = runTest { + expect(1) + launch { + expect(4) + yield() // back to main + expect(7) + } + expect(2) + // test that it does not yield to the above job when started + val result = withTimeoutOrNull(1.seconds) { + expect(3) + yield() // yield only now + expect(5) + "OK" + } + assertEquals("OK", result) + expect(6) + yield() // back to launch + finish(8) + } + + /** + * Tests that a 100% CPU-consuming loop will react on timeout if it has yields. + */ + @Test + fun testYieldBlockingWithTimeout() = runTest { + expect(1) + val result = withTimeoutOrNull(100.milliseconds) { + while (true) { + yield() + } + } + assertNull(result) + finish(2) + } + + @Test + fun testSmallTimeout() = runTest { + val channel = Channel(1) + val value = withTimeoutOrNull(1.milliseconds) { + channel.receive() + } + assertNull(value) + } + + @Test + fun testThrowException() = runTest(expected = {it is AssertionError}) { + withTimeoutOrNull(Duration.INFINITE) { + throw AssertionError() + } + } + + @Test + fun testInnerTimeout() = runTest( + expected = { it is CancellationException } + ) { + withTimeoutOrNull(1000.milliseconds) { + withTimeout(10.milliseconds) { + while (true) { + yield() + } + } + expectUnreached() // will timeout + } + expectUnreached() // will timeout + } + + @Test + fun testNestedTimeout() = runTest(expected = { it is TimeoutCancellationException }) { + withTimeoutOrNull(Duration.INFINITE) { + // Exception from this withTimeout is not suppressed by withTimeoutOrNull + withTimeout(10.milliseconds) { + delay(Duration.INFINITE) + 1 + } + } + + expectUnreached() + } + + @Test + fun testOuterTimeout() = runTest { + var counter = 0 + val result = withTimeoutOrNull(250.milliseconds) { + while (true) { + val inner = withTimeoutOrNull(100.milliseconds) { + while (true) { + yield() + } + } + assertNull(inner) + counter++ + } + } + assertNull(result) + check(counter in 1..2) {"Executed: $counter times"} + } + + @Test + fun testBadClass() = runTest { + val bad = BadClass() + val result = withTimeoutOrNull(100.milliseconds) { + bad + } + assertSame(bad, result) + } + + class BadClass { + override fun equals(other: Any?): Boolean = error("Should not be called") + override fun hashCode(): Int = error("Should not be called") + override fun toString(): String = error("Should not be called") + } + + @Test + fun testNullOnTimeout() = runTest { + expect(1) + val result = withTimeoutOrNull(100.milliseconds) { + expect(2) + delay(1000.milliseconds) + expectUnreached() + "OK" + } + assertNull(result) + finish(3) + } + + @Test + fun testSuppressExceptionWithResult() = runTest { + expect(1) + val result = withTimeoutOrNull(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + expect(3) + } + "OK" + } + assertNull(result) + finish(4) + } + + @Test + fun testSuppressExceptionWithAnotherException() = runTest { + expect(1) + try { + withTimeoutOrNull(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + expect(3) + throw TestException() + } + expectUnreached() + "OK" + } + expectUnreached() + } catch (e: TestException) { + // catches TestException + finish(4) + + } + } + + @Test + fun testNegativeTimeout() = runTest { + expect(1) + var result = withTimeoutOrNull(-1.milliseconds) { + expectUnreached() + } + assertNull(result) + result = withTimeoutOrNull(0.milliseconds) { + expectUnreached() + } + assertNull(result) + finish(2) + } + + @Test + fun testExceptionFromWithinTimeout() = runTest { + expect(1) + try { + expect(2) + withTimeoutOrNull(1000.milliseconds) { + expect(3) + throw TestException() + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } +} diff --git a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt index 2a6e9c1238..4065671e3d 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/DebounceTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow @@ -7,6 +7,7 @@ package kotlinx.coroutines.flow import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlin.test.* +import kotlin.time.* class DebounceTest : TestBase() { @Test @@ -198,4 +199,28 @@ class DebounceTest : TestBase() { assertFailsWith(flow) finish(4) } + + @ExperimentalTime + @Test + public fun testDurationBasic() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500.milliseconds) + emit("B") + delay(500.milliseconds) + emit("C") + delay(250.milliseconds) + emit("D") + delay(2000.milliseconds) + emit("E") + expect(4) + } + + expect(2) + val result = flow.debounce(1000.milliseconds).toList() + assertEquals(listOf("A", "D", "E"), result) + finish(5) + } } diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index 9c96352df2..22a0d4a39a 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. */ package kotlinx.coroutines.flow.operators @@ -8,6 +8,7 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* import kotlinx.coroutines.flow.* import kotlin.test.* +import kotlin.time.* class SampleTest : TestBase() { @Test @@ -273,4 +274,28 @@ class SampleTest : TestBase() { assertFailsWith(flow) finish(4) } + + @ExperimentalTime + @Test + public fun testDurationBasic() = withVirtualTime { + expect(1) + val flow = flow { + expect(3) + emit("A") + delay(1500.milliseconds) + emit("B") + delay(500.milliseconds) + emit("C") + delay(250.milliseconds) + emit("D") + delay(2000.milliseconds) + emit("E") + expect(4) + } + + expect(2) + val result = flow.sample(1000.milliseconds).toList() + assertEquals(listOf("A", "B", "D"), result) + finish(5) + } } diff --git a/kotlinx-coroutines-core/common/test/selects/SelectTimeoutDurationTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectTimeoutDurationTest.kt new file mode 100644 index 0000000000..66cb72a535 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/selects/SelectTimeoutDurationTest.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.selects + +import kotlinx.coroutines.* +import kotlin.test.* +import kotlin.time.* + +@ExperimentalTime +class SelectTimeoutDurationTest : TestBase() { + @Test + fun testBasic() = runTest { + expect(1) + val result = select { + onTimeout(1000.milliseconds) { + expectUnreached() + "FAIL" + } + onTimeout(100.milliseconds) { + expect(2) + "OK" + } + onTimeout(500.milliseconds) { + expectUnreached() + "FAIL" + } + } + assertEquals("OK", result) + finish(3) + } + + @Test + fun testZeroTimeout() = runTest { + expect(1) + val result = select { + onTimeout(1.seconds) { + expectUnreached() + "FAIL" + } + onTimeout(Duration.ZERO) { + expect(2) + "OK" + } + } + assertEquals("OK", result) + finish(3) + } + + @Test + fun testNegativeTimeout() = runTest { + expect(1) + val result = select { + onTimeout(1.seconds) { + expectUnreached() + "FAIL" + } + onTimeout(-10.milliseconds) { + expect(2) + "OK" + } + } + assertEquals("OK", result) + finish(3) + } + + @Test + fun testUnbiasedNegativeTimeout() = runTest { + val counters = intArrayOf(0, 0, 0) + val iterations =10_000 + for (i in 0..iterations) { + val result = selectUnbiased { + onTimeout(-1.seconds) { + 0 + } + onTimeout(Duration.ZERO) { + 1 + } + onTimeout(1.seconds) { + expectUnreached() + 2 + } + } + ++counters[result] + } + assertEquals(0, counters[2]) + assertTrue { counters[0] > iterations / 4 } + assertTrue { counters[1] > iterations / 4 } + } +}