Skip to content

Commit

Permalink
~ Add non-suspend stateIn operator with the initial value
Browse files Browse the repository at this point in the history
* Move StateInTest
* Add `out T` projection to StateFlow type
  • Loading branch information
elizarov committed Apr 30, 2020
1 parent 7612c4a commit bd9a339
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 26 deletions.
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -977,6 +977,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Ljava/lang/Object;)Lkotlinx/coroutines/flow/StateFlowJob;
public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V
public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Expand Up @@ -147,7 +147,7 @@ import kotlin.native.concurrent.*
* cost, where `N` is the number of active collectors.
*/
@ExperimentalCoroutinesApi
public interface StateFlow<T> : Flow<T> {
public interface StateFlow<out T> : Flow<T> {
/**
* The current value of this state flow.
*/
Expand Down
80 changes: 55 additions & 25 deletions kotlinx-coroutines-core/common/src/flow/operators/State.kt
Expand Up @@ -16,14 +16,38 @@ import kotlin.jvm.*
* **Preview feature**: The design of `stateIn` operator is tentative and could change.
*/
@FlowPreview
public interface StateFlowJob<T> : StateFlow<T>, Job
public interface StateFlowJob<out T> : StateFlow<T>, Job

/**
* Launches a coroutine that collects this [Flow] and emits all the collected values as a resulting [StateFlow].
* Launches a coroutine that collects this [Flow] and emits all the collected values as the resulting [StateFlow].
* The result of this function is both a [StateFlow] and a [Job]. This call effectively turns a cold [Flow] into a
* hot, active [Flow], making the most recently emitted value available for consumption at any time via
* [StateFlow.value]. This function returns immediately. The state flow it creates is initially set
* to the specified default [value]. As an alternative a version of `stateIn` without a default value can be used,
* which suspend until the source flow emits a first value.
*
* The resulting coroutine can be cancelled by [cancelling][CoroutineScope.cancel] the [scope] in which it is
* launched or by [cancelling][Job.cancel] the resulting [Job].
*
* Errors in the source flow are not propagated to the [scope] but [close][MutableStateFlow.close] the resulting
* [StateFlow] or are rethrown to the caller of `stateIn` if they happen before emission of the first value.
*
* **Preview feature**: The design of `stateIn` operator is tentative and could change.
*/
@FlowPreview
public fun <T> Flow<T>.stateIn(scope: CoroutineScope, value: T): StateFlowJob<T> {
val state = StateFlow(value)
val job = scope.launchStateJob(this, null, state)
return StateFlowJobImpl(state, job)
}

/**
* Launches a coroutine that collects this [Flow] and emits all the collected values as the resulting [StateFlow].
* The result of this function is both a [StateFlow] and a [Job]. This call effectively turns a cold [Flow] into a
* hot, active [Flow], making the most recently emitted value available for consumption at any time via
* [StateFlow.value]. This call suspends until the source flow the emits first value and
* throws [NoSuchElementException] if the flow was empty.
* throws [NoSuchElementException] if the flow was empty. As an alternative, a version of `stateIn` with a default
* value can be used, which does not suspend.
*
* The resulting coroutine can be cancelled by [cancelling][CoroutineScope.cancel] the [scope] in which it is
* launched or by [cancelling][Job.cancel] the resulting [Job].
Expand All @@ -35,32 +59,38 @@ public interface StateFlowJob<T> : StateFlow<T>, Job
*/
@FlowPreview
public suspend fun <T> Flow<T>.stateIn(scope: CoroutineScope): StateFlowJob<T> {
val result = CompletableDeferred<StateFlowJob<T>>()
scope.launch {
var state: MutableStateFlow<T>? = null
var exception: Throwable? = null
try {
collect { value ->
// Update state flow if initialized
state?.let { it.value = value } ?: run {
// Or create state on the first value if state was not created yet (first value)
state = StateFlow(value).also {
// resume stateIn call with initialized StateFlow and current job
result.complete(StateFlowJobImpl(it, coroutineContext[Job]!!))
}
val deferredResult = CompletableDeferred<StateFlowJob<T>>()
scope.launchStateJob(this, deferredResult, null)
return deferredResult.await() // tail call
}

private fun <T> CoroutineScope.launchStateJob(
flow: Flow<T>,
deferredResult: CompletableDeferred<StateFlowJob<T>>?,
initialState: MutableStateFlow<T>?
) = launch {
var state: MutableStateFlow<T>? = initialState
var exception: Throwable? = null
try {
flow.collect { value ->
// Update state flow if initialized
state?.let { it.value = value } ?: run {
// Or create state on the first value if state was not created yet (first value)
state = StateFlow(value).also {
// resume stateIn call with initialized StateFlow and current job
deferredResult?.complete(StateFlowJobImpl(it, coroutineContext[Job]!!))
}
}
} catch (e: Throwable) {
// Ignore cancellation exception -- it is a normal way to stop the flow
if (e !is CancellationException) exception = e
}
// Close the state flow with exception if initialized
state?.apply { close(exception) } ?: run {
// Or complete the deferred exceptionally if the state was not create yet)
result.completeExceptionally(exception ?: NoSuchElementException("Expected at least one element"))
}
} catch (e: Throwable) {
// Ignore cancellation exception -- it is a normal way to stop the flow
if (e !is CancellationException) exception = e
}
// Close the state flow with exception if initialized
state?.apply { close(exception) } ?: run {
// Or complete the deferred exceptionally if the state was not create yet)
deferredResult?.completeExceptionally(exception ?: NoSuchElementException("Expected at least one element"))
}
return result.await() // tail call
}

private class StateFlowJobImpl<T>(state: StateFlow<T>, job: Job) : StateFlowJob<T>, StateFlow<T> by state, Job by job
Expand Up @@ -11,13 +11,52 @@ class StateInTest : TestBase() {
}
}

@Test
fun testEmptyFlowNullValue() = runTest {
val state = emptyFlow<Int>().stateIn(this, null)
assertEquals(null, state.value)
assertFalse(state.isClosed)
yield()
assertEquals(null, state.value)
assertTrue(state.isClosed)
}

@Test
fun testEmptyFlowZeroValue() = runTest {
val state = emptyFlow<Int>().stateIn(this, 0)
assertEquals(0, state.value)
assertFalse(state.isClosed)
yield()
assertEquals(0, state.value)
assertTrue(state.isClosed)
}

@Test
fun testFailingFlow() = runTest {
assertFailsWith<TestException> {
flow<Int> { throw TestException() }.stateIn(this)
}
}

@Test
fun testFailingFlowValue() = runTest {
expect(1)
val state = flow<Int> { throw TestException() }.stateIn(this, 42)
assertEquals(42, state.value)
assertFalse(state.isClosed)
yield()
assertEquals(42, state.value)
assertTrue(state.isClosed)
assertFailsWith<TestException> {
expect(2)
state.collect { value ->
assertEquals(42, value)
expect(3)
}
}
finish(4)
}

@Test
fun testOneElementFlow() = runTest {
val state = flowOf("OK").onCompletion { yield() }.stateIn(this)
Expand All @@ -28,6 +67,16 @@ class StateInTest : TestBase() {
assertTrue(state.isClosed)
}

@Test
fun testOneElementFlowValue() = runTest {
val state = flowOf("OK").stateIn(this, "INIT")
assertEquals("INIT", state.value)
assertFalse(state.isClosed)
yield()
assertEquals("OK", state.value)
assertTrue(state.isClosed)
}

@Test
fun testStateFlowJobCancellation() = runTest {
val flow = flow<Int> {
Expand Down

0 comments on commit bd9a339

Please sign in to comment.