Skip to content

Commit

Permalink
kotlin.time.Duration support (#1811)
Browse files Browse the repository at this point in the history
Fixes #1402

Co-authored-by: Francesco Vasco <fsco_v-github@yahoo.it>
  • Loading branch information
elizarov and fvasco committed Mar 13, 2020
1 parent 5cfd6c0 commit 1e7ca75
Show file tree
Hide file tree
Showing 11 changed files with 794 additions and 3 deletions.
6 changes: 6 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -267,6 +267,7 @@ public final class kotlinx/coroutines/Delay$DefaultImpls {

public final class kotlinx/coroutines/DelayKt {
public static final fun delay (JLkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun delay-p9JZ4hM (DLkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/DispatchedContinuationKt {
Expand Down Expand Up @@ -527,7 +528,9 @@ public final class kotlinx/coroutines/TimeoutCancellationException : java/util/c

public final class kotlinx/coroutines/TimeoutKt {
public static final fun withTimeout (JLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun withTimeout-lwyi7ZQ (DLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun withTimeoutOrNull (JLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun withTimeoutOrNull-lwyi7ZQ (DLkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class kotlinx/coroutines/YieldKt {
Expand Down Expand Up @@ -888,6 +891,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun debounce-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -954,6 +958,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun retryWhen (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun sample-8GFy2Ro (Lkotlinx/coroutines/flow/Flow;D)Lkotlinx/coroutines/flow/Flow;
public static final fun scan (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanFold (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun scanReduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -1092,6 +1097,7 @@ public abstract interface class kotlinx/coroutines/selects/SelectInstance {
}

public final class kotlinx/coroutines/selects/SelectKt {
public static final fun onTimeout-0lHKgQg (Lkotlinx/coroutines/selects/SelectBuilder;DLkotlin/jvm/functions/Function1;)V
public static final fun select (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

Expand Down
22 changes: 22 additions & 0 deletions kotlinx-coroutines-core/common/src/Delay.kt
Expand Up @@ -6,6 +6,7 @@ package kotlinx.coroutines

import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.time.*

/**
* This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
Expand Down Expand Up @@ -75,5 +76,26 @@ public suspend fun delay(timeMillis: Long) {
}
}

/**
* Delays coroutine for a given [duration] without blocking a thread and resumes it after the specified time.
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* immediately resumes with [CancellationException].
*
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context.
*/
@ExperimentalTime
public suspend fun delay(duration: Duration) = delay(duration.toDelayMillis())

/** Returns [Delay] implementation of the given context */
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay

/**
* Convert this duration to its millisecond value.
* Positive durations are coerced at least `1`.
*/
@ExperimentalTime
internal fun Duration.toDelayMillis(): Long =
if (this > Duration.ZERO) toLongMilliseconds().coerceAtLeast(1) else 0
33 changes: 33 additions & 0 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Expand Up @@ -10,6 +10,7 @@ import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
import kotlin.time.*

/**
* Runs a given suspending [block] of code inside a coroutine with a specified [timeout][timeMillis] and throws
Expand All @@ -32,6 +33,22 @@ public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineSco
}
}

/**
* Runs a given suspending [block] of code inside a coroutine with the specified [timeout] and throws
* a [TimeoutCancellationException] if the timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* the cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that does not throw an exception on timeout is [withTimeoutOrNull].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T =
withTimeout(timeout.toDelayMillis(), block)

/**
* Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns
* `null` if this timeout was exceeded.
Expand Down Expand Up @@ -65,6 +82,22 @@ public suspend fun <T> withTimeoutOrNull(timeMillis: Long, block: suspend Corout
}
}

/**
* Runs a given suspending block of code inside a coroutine with the specified [timeout] and returns
* `null` if this timeout was exceeded.
*
* The code that is executing inside the [block] is cancelled on timeout and the active or next invocation of
* cancellable suspending function inside the block throws a [TimeoutCancellationException].
*
* The sibling function that throws an exception on timeout is [withTimeout].
* Note that the timeout action can be specified for a [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeoutOrNull(timeout: Duration, block: suspend CoroutineScope.() -> T): T? =
withTimeoutOrNull(timeout.toDelayMillis(), block)

private fun <U, T: U> setupTimeout(
coroutine: TimeoutCoroutine<U, T>,
block: suspend CoroutineScope.() -> T
Expand Down
50 changes: 49 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Expand Up @@ -12,7 +12,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
import kotlin.time.*

/**
* Returns a flow that mirrors the original flow, but filters out values
Expand Down Expand Up @@ -71,6 +71,34 @@ public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout].
* The latest value is always emitted.
*
* Example:
* ```
* flow {
* emit(1)
* delay(90.milliseconds)
* emit(2)
* delay(90.milliseconds)
* emit(3)
* delay(1010.milliseconds)
* emit(4)
* delay(1010.milliseconds)
* emit(5)
* }.debounce(1000.milliseconds)
* ```
* produces `3, 4, 5`.
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeout] milliseconds.
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.debounce(timeout: Duration): Flow<T> = debounce(timeout.toDelayMillis())

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
*
Expand Down Expand Up @@ -133,3 +161,23 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period].
*
* Example:
* ```
* flow {
* repeat(10) {
* emit(it)
* delay(50.milliseconds)
* }
* }.sample(100.milliseconds)
* ```
* produces `1, 3, 5, 7, 9`.
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
@ExperimentalTime
@FlowPreview
public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelayMillis())
12 changes: 12 additions & 0 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Expand Up @@ -14,6 +14,7 @@ import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
import kotlin.native.concurrent.*
import kotlin.time.*

/**
* Scope for [select] invocation.
Expand Down Expand Up @@ -52,6 +53,17 @@ public interface SelectBuilder<in R> {
public fun onTimeout(timeMillis: Long, block: suspend () -> R)
}

/**
* Clause that selects the given [block] after the specified [timeout] passes.
* If timeout is negative or zero, [block] is selected immediately.
*
* **Note: This is an experimental api.** It may be replaced with light-weight timer/timeout channels in the future.
*/
@ExperimentalCoroutinesApi
@ExperimentalTime
public fun <R> SelectBuilder<R>.onTimeout(timeout: Duration, block: suspend () -> R) =
onTimeout(timeout.toDelayMillis(), block)

/**
* Clause for [select] expression without additional parameters that does not select any value.
*/
Expand Down
72 changes: 72 additions & 0 deletions kotlinx-coroutines-core/common/test/DelayDurationTest.kt
@@ -0,0 +1,72 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("NAMED_ARGUMENTS_NOT_ALLOWED", "DEPRECATION")

// KT-21913

package kotlinx.coroutines

import kotlin.test.*
import kotlin.time.*

@ExperimentalTime
class DelayDurationTest : TestBase() {

@Test
fun testCancellation() = runTest(expected = { it is CancellationException }) {
runAndCancel(1.seconds)
}

@Test
fun testInfinite() = runTest(expected = { it is CancellationException }) {
runAndCancel(Duration.INFINITE)
}

@Test
fun testRegularDelay() = runTest {
val deferred = async {
expect(2)
delay(1.seconds)
expect(4)
}

expect(1)
yield()
expect(3)
deferred.await()
finish(5)
}

@Test
fun testNanoDelay() = runTest {
val deferred = async {
expect(2)
delay(1.nanoseconds)
expect(4)
}

expect(1)
yield()
expect(3)
deferred.await()
finish(5)
}

private suspend fun runAndCancel(time: Duration) = coroutineScope {
expect(1)
val deferred = async {
expect(2)
delay(time)
expectUnreached()
}

yield()
expect(3)
require(deferred.isActive)
deferred.cancel()
finish(4)
deferred.await()
}
}

0 comments on commit 1e7ca75

Please sign in to comment.