diff --git a/kotlinx-coroutines-core/common/src/Delay.kt b/kotlinx-coroutines-core/common/src/Delay.kt index acb924020a..9f781bf2df 100644 --- a/kotlinx-coroutines-core/common/src/Delay.kt +++ b/kotlinx-coroutines-core/common/src/Delay.kt @@ -6,6 +6,8 @@ package kotlinx.coroutines import kotlinx.coroutines.selects.* import kotlin.coroutines.* +import kotlin.time.Duration +import kotlin.time.ExperimentalTime /** * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support @@ -75,5 +77,31 @@ public suspend fun delay(timeMillis: Long) { } } +/** + * Delays coroutine for a given [duration] without blocking a thread and resumes it after the specified time. + * This suspending function is cancellable. + * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function + * immediately resumes with [CancellationException]. + * + * Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. + * + * Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context. + */ +@ExperimentalTime +public suspend fun delay(duration: Duration) = delay(duration.toDelayMillis()) + /** Returns [Delay] implementation of the given context */ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay + +/** + * Convert this duration to its millisecond value. + * Positive durations are coerced at least `1`. + */ +@ExperimentalTime +internal fun Duration.toDelayMillis(): Long = + when { + this > Duration.ZERO -> toLongMilliseconds().coerceAtLeast(1) + this < Duration.ZERO -> -1 + // 0, -0, NaN + else -> 0 + } diff --git a/kotlinx-coroutines-core/common/src/Timeout.kt b/kotlinx-coroutines-core/common/src/Timeout.kt index 7e6f0d0e2d..b2dd38180a 100644 --- a/kotlinx-coroutines-core/common/src/Timeout.kt +++ b/kotlinx-coroutines-core/common/src/Timeout.kt @@ -10,6 +10,8 @@ import kotlinx.coroutines.selects.* import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* +import kotlin.time.Duration +import kotlin.time.ExperimentalTime /** * Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws @@ -32,6 +34,22 @@ public suspend fun withTimeout(timeMillis: Long, block: suspend CoroutineSco } } +/** + * Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws + * a [TimeoutCancellationException] if the timeout was exceeded. + * + * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of + * the cancellable suspending function inside the block throws a [TimeoutCancellationException]. + * + * The sibling function that does not throw an exception on timeout is [withTimeoutOrNull]. + * Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. + * + * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + */ +@ExperimentalTime +public suspend fun withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T = + withTimeout(timeout.toDelayMillis(), block) + /** * Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns * `null` if this timeout was exceeded. @@ -65,6 +83,22 @@ public suspend fun withTimeoutOrNull(timeMillis: Long, block: suspend Corout } } +/** + * Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns + * `null` if this timeout was exceeded. + * + * The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of + * cancellable suspending function inside the block throws a [TimeoutCancellationException]. + * + * The sibling function that throws an exception on timeout is [withTimeout]. + * Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause. + * + * Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher]. + */ +@ExperimentalTime +public suspend fun withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? = + withTimeoutOrNull(timeout.toDelayMillis(), block) + private fun setupTimeout( coroutine: TimeoutCoroutine, block: suspend CoroutineScope.() -> T diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index b741a1a40f..4c851ceb08 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -14,6 +14,8 @@ import kotlin.coroutines.* import kotlin.coroutines.intrinsics.* import kotlin.jvm.* import kotlin.native.concurrent.* +import kotlin.time.Duration +import kotlin.time.ExperimentalTime /** * Scope for [select] invocation. @@ -52,6 +54,18 @@ public interface SelectBuilder { public fun onTimeout(timeMillis: Long, block: suspend () -> R) } + +/** + * Clause that selects the given [block] after the specified [timeout] passes. + * If timeout is negative or zero, [block] is selected immediately. + * + * **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future. + */ +@ExperimentalCoroutinesApi +@ExperimentalTime +public fun SelectBuilder.onTimeout(timeout: Duration, block: suspend () -> R) = + onTimeout(timeout.toDelayMillis(), block) + /** * Clause for [select] expression without additional parameters that does not select any value. */ diff --git a/kotlinx-coroutines-core/common/test/DelayDurationTest.kt b/kotlinx-coroutines-core/common/test/DelayDurationTest.kt new file mode 100644 index 0000000000..1a254f84c9 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/DelayDurationTest.kt @@ -0,0 +1,72 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "DEPRECATION") + +// KT-21913 + +package kotlinx.coroutines + +import kotlin.test.Test +import kotlin.time.* + +@ExperimentalTime +class DelayDurationTest : TestBase() { + + @Test + fun testCancellation() = runTest(expected = { it is CancellationException }) { + runAndCancel(1.seconds) + } + + @Test + fun testInfinite() = runTest(expected = { it is CancellationException }) { + runAndCancel(Duration.INFINITE) + } + + @Test + fun testRegularDelay() = runTest { + val deferred = async { + expect(2) + delay(1.seconds) + expect(4) + } + + expect(1) + yield() + expect(3) + deferred.await() + finish(5) + } + + @Test + fun testNanoDelay() = runTest { + val deferred = async { + expect(2) + delay(1.nanoseconds) + expect(4) + } + + expect(1) + yield() + expect(3) + deferred.await() + finish(5) + } + + private suspend fun runAndCancel(time: Duration) = coroutineScope { + expect(1) + val deferred = async { + expect(2) + delay(time) + expectUnreached() + } + + yield() + expect(3) + require(deferred.isActive) + deferred.cancel() + finish(4) + deferred.await() + } +} diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt new file mode 100644 index 0000000000..ca57f8d87b --- /dev/null +++ b/kotlinx-coroutines-core/common/test/WithTimeoutDurationTest.kt @@ -0,0 +1,217 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "UNREACHABLE_CODE") // KT-21913 + +package kotlinx.coroutines + +import kotlin.test.* +import kotlin.time.Duration +import kotlin.time.ExperimentalTime +import kotlin.time.milliseconds +import kotlin.time.seconds + +@ExperimentalTime +class WithTimeoutDurationTest : TestBase() { + /** + * Tests a case of no timeout and no suspension inside. + */ + @Test + fun testBasicNoSuspend() = runTest { + expect(1) + val result = withTimeout(10.seconds) { + expect(2) + "OK" + } + assertEquals("OK", result) + finish(3) + } + + /** + * Tests a case of no timeout and one suspension inside. + */ + @Test + fun testBasicSuspend() = runTest { + expect(1) + val result = withTimeout(10.seconds) { + expect(2) + yield() + expect(3) + "OK" + } + assertEquals("OK", result) + finish(4) + } + + /** + * Tests proper dispatching of `withTimeout` blocks + */ + @Test + fun testDispatch() = runTest { + expect(1) + launch { + expect(4) + yield() // back to main + expect(7) + } + expect(2) + // test that it does not yield to the above job when started + val result = withTimeout(1.seconds) { + expect(3) + yield() // yield only now + expect(5) + "OK" + } + assertEquals("OK", result) + expect(6) + yield() // back to launch + finish(8) + } + + + /** + * Tests that a 100% CPU-consuming loop will react on timeout if it has yields. + */ + @Test + fun testYieldBlockingWithTimeout() = runTest( + expected = { it is CancellationException } + ) { + withTimeout(100.milliseconds) { + while (true) { + yield() + } + } + } + + /** + * Tests that [withTimeout] waits for children coroutines to complete. + */ + @Test + fun testWithTimeoutChildWait() = runTest { + expect(1) + withTimeout(100.milliseconds) { + expect(2) + // launch child with timeout + launch { + expect(4) + } + expect(3) + // now will wait for child before returning + } + finish(5) + } + + @Test + fun testBadClass() = runTest { + val bad = BadClass() + val result = withTimeout(100.milliseconds) { + bad + } + assertSame(bad, result) + } + + class BadClass { + override fun equals(other: Any?): Boolean = error("Should not be called") + override fun hashCode(): Int = error("Should not be called") + override fun toString(): String = error("Should not be called") + } + + @Test + fun testExceptionOnTimeout() = runTest { + expect(1) + try { + withTimeout(100.milliseconds) { + expect(2) + delay(1000.milliseconds) + expectUnreached() + "OK" + } + } catch (e: CancellationException) { + assertEquals("Timed out waiting for 100 ms", e.message) + finish(3) + } + } + + @Test + fun testSuppressExceptionWithResult() = runTest( + expected = { it is CancellationException } + ) { + expect(1) + withTimeout(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + finish(3) + } + "OK" + } + expectUnreached() + } + + @Test + fun testSuppressExceptionWithAnotherException() = runTest { + expect(1) + try { + withTimeout(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + expect(3) + throw TestException() + } + expectUnreached() + "OK" + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } + + @Test + fun testNegativeTimeout() = runTest { + expect(1) + try { + withTimeout(-1.milliseconds) { + expectUnreached() + "OK" + } + } catch (e: TimeoutCancellationException) { + assertEquals("Timed out immediately", e.message) + finish(2) + } + } + + @Test + fun testExceptionFromWithinTimeout() = runTest { + expect(1) + try { + expect(2) + withTimeout(1.seconds) { + expect(3) + throw TestException() + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } + + @Test + fun testIncompleteWithTimeoutState() = runTest { + lateinit var timeoutJob: Job + val handle = withTimeout(Duration.INFINITE) { + timeoutJob = coroutineContext[Job]!! + timeoutJob.invokeOnCompletion { } + } + + handle.dispose() + timeoutJob.join() + assertTrue(timeoutJob.isCompleted) + assertFalse(timeoutJob.isActive) + assertFalse(timeoutJob.isCancelled) + } +} diff --git a/kotlinx-coroutines-core/common/test/WithTimeoutOrNullDurationTest.kt b/kotlinx-coroutines-core/common/test/WithTimeoutOrNullDurationTest.kt new file mode 100644 index 0000000000..1b77a3fbb4 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/WithTimeoutOrNullDurationTest.kt @@ -0,0 +1,246 @@ + +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED") // KT-21913 + +package kotlinx.coroutines + +import kotlinx.coroutines.channels.* +import kotlin.test.* +import kotlin.time.Duration +import kotlin.time.ExperimentalTime +import kotlin.time.milliseconds +import kotlin.time.seconds + +@ExperimentalTime +class WithTimeoutOrNullDurationTest : TestBase() { + /** + * Tests a case of no timeout and no suspension inside. + */ + @Test + fun testBasicNoSuspend() = runTest { + expect(1) + val result = withTimeoutOrNull(10.seconds) { + expect(2) + "OK" + } + assertEquals("OK", result) + finish(3) + } + + /** + * Tests a case of no timeout and one suspension inside. + */ + @Test + fun testBasicSuspend() = runTest { + expect(1) + val result = withTimeoutOrNull(10.seconds) { + expect(2) + yield() + expect(3) + "OK" + } + assertEquals("OK", result) + finish(4) + } + + /** + * Tests property dispatching of `withTimeoutOrNull` blocks + */ + @Test + fun testDispatch() = runTest { + expect(1) + launch { + expect(4) + yield() // back to main + expect(7) + } + expect(2) + // test that it does not yield to the above job when started + val result = withTimeoutOrNull(1.seconds) { + expect(3) + yield() // yield only now + expect(5) + "OK" + } + assertEquals("OK", result) + expect(6) + yield() // back to launch + finish(8) + } + + /** + * Tests that a 100% CPU-consuming loop will react on timeout if it has yields. + */ + @Test + fun testYieldBlockingWithTimeout() = runTest { + expect(1) + val result = withTimeoutOrNull(100.milliseconds) { + while (true) { + yield() + } + } + assertNull(result) + finish(2) + } + + @Test + fun testSmallTimeout() = runTest { + val channel = Channel(1) + val value = withTimeoutOrNull(1.milliseconds) { + channel.receive() + } + assertNull(value) + } + + @Test + fun testThrowException() = runTest(expected = {it is AssertionError}) { + withTimeoutOrNull(Duration.INFINITE) { + throw AssertionError() + } + } + + @Test + fun testInnerTimeout() = runTest( + expected = { it is CancellationException } + ) { + withTimeoutOrNull(1000.milliseconds) { + withTimeout(10.milliseconds) { + while (true) { + yield() + } + } + expectUnreached() // will timeout + } + expectUnreached() // will timeout + } + + @Test + fun testNestedTimeout() = runTest(expected = { it is TimeoutCancellationException }) { + withTimeoutOrNull(Duration.INFINITE) { + // Exception from this withTimeout is not suppressed by withTimeoutOrNull + withTimeout(10.milliseconds) { + delay(Duration.INFINITE) + 1 + } + } + + expectUnreached() + } + + @Test + fun testOuterTimeout() = runTest { + var counter = 0 + val result = withTimeoutOrNull(250.milliseconds) { + while (true) { + val inner = withTimeoutOrNull(100.milliseconds) { + while (true) { + yield() + } + } + assertNull(inner) + counter++ + } + } + assertNull(result) + check(counter in 1..2) {"Executed: $counter times"} + } + + @Test + fun testBadClass() = runTest { + val bad = BadClass() + val result = withTimeoutOrNull(100.milliseconds) { + bad + } + assertSame(bad, result) + } + + class BadClass { + override fun equals(other: Any?): Boolean = error("Should not be called") + override fun hashCode(): Int = error("Should not be called") + override fun toString(): String = error("Should not be called") + } + + @Test + fun testNullOnTimeout() = runTest { + expect(1) + val result = withTimeoutOrNull(100.milliseconds) { + expect(2) + delay(1000.milliseconds) + expectUnreached() + "OK" + } + assertNull(result) + finish(3) + } + + @Test + fun testSuppressExceptionWithResult() = runTest { + expect(1) + val result = withTimeoutOrNull(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + expect(3) + } + "OK" + } + assertNull(result) + finish(4) + } + + @Test + fun testSuppressExceptionWithAnotherException() = runTest { + expect(1) + try { + withTimeoutOrNull(100.milliseconds) { + expect(2) + try { + delay(1000.milliseconds) + } catch (e: CancellationException) { + expect(3) + throw TestException() + } + expectUnreached() + "OK" + } + expectUnreached() + } catch (e: TestException) { + // catches TestException + finish(4) + + } + } + + @Test + fun testNegativeTimeout() = runTest { + expect(1) + var result = withTimeoutOrNull(-1.milliseconds) { + expectUnreached() + } + assertNull(result) + result = withTimeoutOrNull(0.milliseconds) { + expectUnreached() + } + assertNull(result) + finish(2) + } + + @Test + fun testExceptionFromWithinTimeout() = runTest { + expect(1) + try { + expect(2) + withTimeoutOrNull(1000.milliseconds) { + expect(3) + throw TestException() + } + expectUnreached() + } catch (e: TestException) { + finish(4) + } + } +} diff --git a/kotlinx-coroutines-core/common/test/selects/SelectTimeoutDurationTest.kt b/kotlinx-coroutines-core/common/test/selects/SelectTimeoutDurationTest.kt new file mode 100644 index 0000000000..1a0051f716 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/selects/SelectTimeoutDurationTest.kt @@ -0,0 +1,94 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.selects + +import kotlinx.coroutines.* +import kotlin.test.* +import kotlin.time.Duration +import kotlin.time.ExperimentalTime +import kotlin.time.milliseconds +import kotlin.time.seconds + +@ExperimentalTime +class SelectTimeoutDurationTest : TestBase() { + @Test + fun testBasic() = runTest { + expect(1) + val result = select { + onTimeout(1000.milliseconds) { + expectUnreached() + "FAIL" + } + onTimeout(100.milliseconds) { + expect(2) + "OK" + } + onTimeout(500.milliseconds) { + expectUnreached() + "FAIL" + } + } + assertEquals("OK", result) + finish(3) + } + + @Test + fun testZeroTimeout() = runTest { + expect(1) + val result = select { + onTimeout(1.seconds) { + expectUnreached() + "FAIL" + } + onTimeout(Duration.ZERO) { + expect(2) + "OK" + } + } + assertEquals("OK", result) + finish(3) + } + + @Test + fun testNegativeTimeout() = runTest { + expect(1) + val result = select { + onTimeout(1.seconds) { + expectUnreached() + "FAIL" + } + onTimeout(-10.milliseconds) { + expect(2) + "OK" + } + } + assertEquals("OK", result) + finish(3) + } + + @Test + fun testUnbiasedNegativeTimeout() = runTest { + val counters = intArrayOf(0, 0, 0) + val iterations =10_000 + for (i in 0..iterations) { + val result = selectUnbiased { + onTimeout(-1.seconds) { + 0 + } + onTimeout(Duration.ZERO) { + 1 + } + onTimeout(1.seconds) { + expectUnreached() + 2 + } + } + ++counters[result] + } + assertEquals(0, counters[2]) + assertTrue { counters[0] > iterations / 4 } + assertTrue { counters[1] > iterations / 4 } + } +}