Skip to content

Commit

Permalink
Cleanup API, update knit
Browse files Browse the repository at this point in the history
  • Loading branch information
pablobaxter committed Sep 14, 2022
1 parent dc73c64 commit ce0c919
Show file tree
Hide file tree
Showing 8 changed files with 22 additions and 121 deletions.
60 changes: 7 additions & 53 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import kotlin.time.*
<!--- TEST_NAME FlowDelayTest -->
<!--- PREFIX .*-duration-.*
----- INCLUDE .*-duration-.*
import kotlin.time.*
----- INCLUDE .*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
Expand Down Expand Up @@ -349,7 +348,7 @@ internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMil
public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelayMillis())

/**
* Returns a flow that will timeout if the upstream takes too long to emit.
* Returns a flow that will emit a [FlowTimeoutException] if the upstream doesn't emit an item within the given time.
*
* Example:
*
Expand All @@ -362,7 +361,7 @@ public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelay
* emit(3)
* delay(1000)
* emit(4)
* }.timeout(100.milliseconds) {
* }.timeout(100.milliseconds).catch {
* emit(-1) // Item to emit on timeout
* }.onEach {
* delay(300) // This will not cause a timeout
Expand All @@ -380,59 +379,16 @@ public fun <T> Flow<T>.sample(period: Duration): Flow<T> = sample(period.toDelay
* Note that delaying on the downstream doesn't trigger the timeout.
*
* @param timeout Timeout period
* @param action Action to invoke on timeout. Default is to throw [FlowTimeoutException]
*/
@FlowPreview
public fun <T> Flow<T>.timeout(
timeout: Duration,
@BuilderInference action: suspend FlowCollector<T>.() -> Unit = { throw FlowTimeoutException(timeout.toDelayMillis()) }
): Flow<T> = timeout(timeout.toDelayMillis(), action)

/**
* Returns a flow that will timeout if the upstream takes too long to emit.
*
* Example:
*
* ```kotlin
* flow {
* emit(1)
* delay(100)
* emit(2)
* delay(100)
* emit(3)
* delay(1000)
* emit(4)
* }.timeout(100) {
* emit(-1) // Item to emit on timeout
* }.onEach {
* delay(300) // This will not cause a timeout
* }
* ```
* <!--- KNIT example-timeout-duration-02.kt -->
*
* produces the following emissions
*
* ```text
* 1, 2, 3, -1
* ```
* <!--- TEST -->
*
* Note that delaying on the downstream doesn't trigger the timeout.
*
* @param timeoutMillis Timeout period in millis
* @param action Action to invoke on timeout. Default is to throw [FlowTimeoutException]
*/
@FlowPreview
public fun <T> Flow<T>.timeout(
timeoutMillis: Long,
@BuilderInference action: suspend FlowCollector<T>.() -> Unit = { throw FlowTimeoutException(timeoutMillis) }
): Flow<T> = timeoutInternal(timeoutMillis, action)
timeout: Duration
): Flow<T> = timeoutInternal(timeout.toDelayMillis())

private fun <T> Flow<T>.timeoutInternal(
timeoutMillis: Long,
action: suspend FlowCollector<T>.() -> Unit
timeoutMillis: Long
): Flow<T> = scopedFlow { downStream ->
require(timeoutMillis >= 0L) { "Timeout should not be negative" }
if (timeoutMillis <= 0L) throw FlowTimeoutException("Timed out immediately")

// Produce the values using the default (rendezvous) channel
// Similar to [debounceInternal]
Expand Down Expand Up @@ -465,9 +421,7 @@ private fun <T> Flow<T>.timeoutInternal(
values.onReceive { value ->
if (value !== DONE) {
if (value === TIMEOUT) {
downStream.action()
values.cancel(ChildCancelledException())
return@onReceive false // Just end the loop here. Nothing more to be done.
throw FlowTimeoutException(timeoutMillis)
}
downStream.emit(NULL.unbox(value))
return@onReceive true
Expand Down
40 changes: 13 additions & 27 deletions kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ package kotlinx.coroutines.flow.operators
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.test.*
import kotlin.time.*
import kotlin.time.Duration.Companion.milliseconds

class TimeoutTest : TestBase() {
@ExperimentalTime
@Test
fun testBasic() = withVirtualTime {
expect(1)
Expand All @@ -28,12 +27,11 @@ class TimeoutTest : TestBase() {

expect(2)
val list = mutableListOf<String>()
assertFailsWith<FlowTimeoutException>(flow.timeout(300).onEach { list.add(it) })
assertFailsWith<FlowTimeoutException>(flow.timeout(300.milliseconds).onEach { list.add(it) })
assertEquals(listOf("A", "B", "C"), list)
finish(5)
}

@ExperimentalTime
@Test
fun testBasicCustomAction() = withVirtualTime {
expect(1)
Expand All @@ -51,12 +49,11 @@ class TimeoutTest : TestBase() {

expect(2)
val list = mutableListOf<String>()
flow.timeout(300) { emit("-1") }.collect { list.add(it) }
flow.timeout(300.milliseconds).catch { if (it is FlowTimeoutException) emit("-1") }.collect { list.add(it) }
assertEquals(listOf("A", "B", "C", "-1"), list)
finish(5)
}

@ExperimentalTime
@Test
fun testDelayedFirst() = withVirtualTime {
expect(1)
Expand All @@ -65,53 +62,47 @@ class TimeoutTest : TestBase() {
delay(100)
emit(1)
expect(4)
}.timeout(250)
}.timeout(250.milliseconds)
expect(2)
assertEquals(1, flow.singleOrNull())
finish(5)
}

@ExperimentalTime
@Test
fun testEmpty() = runTest {
val flow = emptyFlow<Any?>().timeout(1)
val flow = emptyFlow<Any?>().timeout(1.milliseconds)
assertNull(flow.singleOrNull())
}

@ExperimentalTime
@Test
fun testScalar() = runTest {
val flow = flowOf(1, 2, 3).timeout(1)
val flow = flowOf(1, 2, 3).timeout(1.milliseconds)
assertEquals(listOf(1, 2, 3), flow.toList())
}

@ExperimentalTime
@Test
fun testUpstreamError() = testUpstreamError(TestException())

@ExperimentalTime
@Test
fun testUpstreamErrorTimeoutException() = testUpstreamError(FlowTimeoutException(0))

@ExperimentalTime
private inline fun <reified T: Throwable> testUpstreamError(cause: T) = runTest {
val flow = flow {
emit(1)
throw cause
}.timeout(1)
}.timeout(1.milliseconds)

assertFailsWith<T>(flow)
}

@ExperimentalTime
@Test
fun testDownstreamError() = runTest {
val flow = flow {
expect(1)
emit(1)
hang { expect(3) }
expectUnreached()
}.timeout(100).map {
}.timeout(100.milliseconds).map {
expect(2)
yield()
throw TestException()
Expand All @@ -121,7 +112,6 @@ class TimeoutTest : TestBase() {
finish(4)
}

@ExperimentalTime
@Test
fun testUpstreamTimeoutIsolatedContext() = runTest {
val flow = flow {
Expand All @@ -131,13 +121,12 @@ class TimeoutTest : TestBase() {
expect(2)
delay(300)
expectUnreached()
}.flowOn(NamedDispatchers("upstream")).timeout(100)
}.flowOn(NamedDispatchers("upstream")).timeout(100.milliseconds)

assertFailsWith<FlowTimeoutException>(flow)
finish(3)
}

@ExperimentalTime
@Test
fun testUpstreamTimeoutActionIsolatedContext() = runTest {
val flow = flow {
Expand All @@ -147,7 +136,7 @@ class TimeoutTest : TestBase() {
expect(2)
delay(300)
expectUnreached()
}.flowOn(NamedDispatchers("upstream")).timeout(100) {
}.flowOn(NamedDispatchers("upstream")).timeout(100.milliseconds).catch {
expect(3)
emit(2)
}
Expand All @@ -156,7 +145,6 @@ class TimeoutTest : TestBase() {
finish(4)
}

@ExperimentalTime
@Test
fun testUpstreamNoTimeoutIsolatedContext() = runTest {
val flow = flow {
Expand All @@ -165,19 +153,17 @@ class TimeoutTest : TestBase() {
emit(1)
expect(2)
delay(10)
}.flowOn(NamedDispatchers("upstream")).timeout(100)
}.flowOn(NamedDispatchers("upstream")).timeout(100.milliseconds)

assertEquals(listOf(1), flow.toList())
finish(3)
}

@ExperimentalTime
@Test
fun testSharedFlowTimeout() = runTest {
assertFailsWith<FlowTimeoutException>(MutableSharedFlow<Int>().asSharedFlow().timeout(100))
assertFailsWith<FlowTimeoutException>(MutableSharedFlow<Int>().asSharedFlow().timeout(100.milliseconds))
}

@ExperimentalTime
@Test
fun testSharedFlowCancelledNoTimeout() = runTest {
val mutableSharedFlow = MutableSharedFlow<Int>()
Expand All @@ -186,7 +172,7 @@ class TimeoutTest : TestBase() {
expect(1)
val consumerJob = launch {
expect(3)
mutableSharedFlow.asSharedFlow().timeout(100).collect { list.add(it) }
mutableSharedFlow.asSharedFlow().timeout(100.milliseconds).collect { list.add(it) }
expectUnreached()
}
val producerJob = launch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// This file was automatically generated from Delay.kt by Knit tool. Do not edit.
package kotlinx.coroutines.examples.exampleDelayDuration01

import kotlin.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// This file was automatically generated from Delay.kt by Knit tool. Do not edit.
package kotlinx.coroutines.examples.exampleDelayDuration02

import kotlin.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
// This file was automatically generated from Delay.kt by Knit tool. Do not edit.
package kotlinx.coroutines.examples.exampleDelayDuration03

import kotlin.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
@file:OptIn(ExperimentalTime::class)
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

// This file was automatically generated from Delay.kt by Knit tool. Do not edit.
package kotlinx.coroutines.examples.exampleTimeoutDuration01

import kotlin.time.*
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.milliseconds

fun main() = runBlocking {

Expand All @@ -20,7 +19,7 @@ flow {
emit(3)
delay(1000)
emit(4)
}.timeout(100.milliseconds) {
}.timeout(100.milliseconds).catch {
emit(-1) // Item to emit on timeout
}.onEach {
delay(300) // This will not cause a timeout
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,4 @@ class FlowDelayTest {
"1, 2, 3, -1"
)
}

@Test
fun testExampleTimeoutDuration02() {
test("ExampleTimeoutDuration02") { kotlinx.coroutines.examples.exampleTimeoutDuration02.main() }.verifyLines(
"1, 2, 3, -1"
)
}
}

0 comments on commit ce0c919

Please sign in to comment.