From 37c87022cc96215bbafa96a88edaf28811b1f524 Mon Sep 17 00:00:00 2001 From: Roman Elizarov Date: Wed, 17 Jul 2019 09:55:14 -0700 Subject: [PATCH] StateFlow implementation StateFlow is a Flow analogue to ConflatedBroadcastChannel. Since Flow API is simpler than channels APIs, the implementation of StateFlow is simpler. It consumes and allocates less memory, while still providing full deadlock-freedom (even though it is not lock-free internally). Fixes #1973 Fixes #395 Fixes #1816 --- .../api/kotlinx-coroutines-core.api | 45 +- .../common/src/flow/StateFlow.kt | 410 ++++++++++++++++++ .../common/src/flow/internal/ChannelFlow.kt | 29 +- .../common/src/flow/operators/Context.kt | 10 +- .../common/src/flow/operators/State.kt | 66 +++ .../common/test/flow/StateFlowTest.kt | 166 +++++++ .../common/test/flow/StateInTest.kt | 79 ++++ .../jvm/test/flow/StateFlowStressTest.kt | 80 ++++ 8 files changed, 871 insertions(+), 14 deletions(-) create mode 100644 kotlinx-coroutines-core/common/src/flow/StateFlow.kt create mode 100644 kotlinx-coroutines-core/common/src/flow/operators/State.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt create mode 100644 kotlinx-coroutines-core/common/test/flow/StateInTest.kt create mode 100644 kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 0368cf1cc0..2d333552c9 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -979,6 +979,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun skip (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow; public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Ljava/lang/Object;)Lkotlinx/coroutines/flow/Flow; public static final fun startWith (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow; + public static final fun stateIn (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)V public static final fun subscribe (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function2;)V @@ -998,7 +999,38 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } -public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow { +public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow { + public abstract fun close (Ljava/lang/Throwable;)Z + public abstract fun getValue ()Ljava/lang/Object; + public abstract fun setValue (Ljava/lang/Object;)V +} + +public final class kotlinx/coroutines/flow/MutableStateFlow$DefaultImpls { + public static synthetic fun close$default (Lkotlinx/coroutines/flow/MutableStateFlow;Ljava/lang/Throwable;ILjava/lang/Object;)Z +} + +public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/Flow { + public abstract fun getValue ()Ljava/lang/Object; + public abstract fun isClosed ()Z +} + +public abstract interface class kotlinx/coroutines/flow/StateFlowJob : kotlinx/coroutines/Job, kotlinx/coroutines/flow/StateFlow { +} + +public final class kotlinx/coroutines/flow/StateFlowJob$DefaultImpls { + public static synthetic fun cancel (Lkotlinx/coroutines/flow/StateFlowJob;)V + public static fun fold (Lkotlinx/coroutines/flow/StateFlowJob;Ljava/lang/Object;Lkotlin/jvm/functions/Function2;)Ljava/lang/Object; + public static fun get (Lkotlinx/coroutines/flow/StateFlowJob;Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext$Element; + public static fun minusKey (Lkotlinx/coroutines/flow/StateFlowJob;Lkotlin/coroutines/CoroutineContext$Key;)Lkotlin/coroutines/CoroutineContext; + public static fun plus (Lkotlinx/coroutines/flow/StateFlowJob;Lkotlin/coroutines/CoroutineContext;)Lkotlin/coroutines/CoroutineContext; + public static fun plus (Lkotlinx/coroutines/flow/StateFlowJob;Lkotlinx/coroutines/Job;)Lkotlinx/coroutines/Job; +} + +public final class kotlinx/coroutines/flow/StateFlowKt { + public static final fun StateFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/MutableStateFlow; +} + +public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow { public final field capacity I public final field context Lkotlin/coroutines/CoroutineContext; public fun (Lkotlin/coroutines/CoroutineContext;I)V @@ -1007,10 +1039,9 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; + public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow; public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; public fun toString ()Ljava/lang/String; - public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; - public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow; } public final class kotlinx/coroutines/flow/internal/CombineKt { @@ -1021,6 +1052,14 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt { public static final fun checkIndexOverflow (I)I } +public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow { + public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow; +} + +public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls { + public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow; +} + public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt { public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; } diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt new file mode 100644 index 0000000000..f1e12c2bb3 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -0,0 +1,410 @@ + /* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.jvm.* +import kotlin.native.concurrent.* + +/** + * A [Flow] that represents a read-only state with a single updatable data [value] that emits updates + * to the value to its collectors. The current value can be retrieved via [value] property. + * The flow of future updates to the value can be observed by collecting values from this flow. + * + * A [mutable state flow][MutableStateFlow] is created using `StateFlow(value)` constructor function with + * the initial value. The value of mutable state flow can be updated by setting its [value] property. + * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates, + * but always collects the most recently emitted value. + * + * [StateFlow] is useful as a data-model class to represented any kind of state. + * Derived values can be defined using various operators on the flows, with [combine] operator being especially + * useful to combine values from multiple state flows using arbitrary functions. + * + * For example, the following class encapsulates an integer state and increments its value on each call to `inc`: + * + * ``` + * class CounterModel { + * private val _counter = StateFlow(0) // private mutable state flow + * val counter: StateFlow get() = _counter // publicly exposed as read-only state flow + * + * fun inc() { + * _counter.value++ + * } + * } + * ``` + * + * Having two instances of the above `CounterModel` class one can define the sum of their counters like this: + * + * ``` + * val aModel = CounterModel() + * val bModel = CounterModel() + * val sumFlow: Flow = aModel.counter.combine(bModel.counter) { a, b -> a + b } + * ``` + * + * ### Closing state flow + * + * A state flow can be optionally [closed][MutableStateFlow.close]. + * When state flow is closed all its collectors complete normally or with the specified exception. + * Closing a state flow transitions it to the terminal state. Once the state flow is closed its value cannot be changed. + * The most recent value of the closed state flow is still available via [value] property. + * Closing a state is appropriate in situations where the source of the state updates is permanently destroyed. + * + * **Preview feature**: The design of state flow closing is tentative and could change. + * + * ### Turning Flow into a StateFlow + * + * A regular [Flow] is cold. It does not have the concept of the last value and it only becomes active when collected. + * It can be made hot by turning it into a [StateFlow] using [stateIn] operator. For example: + * + * ``` + * coroutineScope { + * val sumSate: StateFlow = sumFlow.stateIn(this) // start computing sumFlow in this scope + * println(sumState.value) // the current value of the sum + * sumState.cancel() // stop computing sumFlow + * } + * ``` + * + * **Preview feature**: The design of `stateIn` operator is tentative and could change. + * + * ### Strong equality-based conflation + * + * Values in state flow are conflated using [Any.equals] comparison. It is used to conflate incoming updates + * to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors + * when new value is equal to the previously emitted one. State flow behavior with classes that violate + * the contract for [Any.equals] is unspecified. + * + * Note, that this conflation logic is stronger that the conflation that is used by [Flow.conflate] operator, + * which is based on object identity. For example, consider this flow of strings that are emitted + * every 100ms: + * + * ``` + * val flow = flowOf(0, 1, 0).map { it.toString() }.onEach { delay(100) } + * ``` + * + * Conflating this flow before a slow collector (that takes 250 ms on each value), produces + * `[0, 0]`. The second zero string is not conflated, because it is a different [String] object with different + * identity: + * + * ``` + * flow.conflate().onEach { delay(250) }.toList().let { println(it) } + * ``` + * + * Converting the same flow to `StateFlow` and then feeding to slow collector, produces just `[0]`. + * The second zero string is conflated because it is equal to the previous zero: + * + * ``` + * coroutineScope { // scope for stateIn + * flow.stateIn(this).onEach { delay(250) }.toList().let { println(it) } + * } + * ``` + * + * ### StateFlow vs ConflatedBroadcastChannel + * + * Conceptually state flow is similar to + * [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel] + * and is designed to completely replace `ConflatedBroadcastChannel` in the future. + * It has the following important improvements: + * + * * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows + * for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that + * allocates objects on each emitted value. + * * `StateFlow` always has a value which can be safely read at any time via [value] property. + * Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value. + * * `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow]. + * * `StateFlow` conflation works similarly to [Flow.conflate] operator in that the last emitted value + * before close is received by collectors and is always available afterwards, unlike + * `ConflatedBroadcastChannel` that does not keep the last value when closed. + * * `StateFlow` conflation is based on equality, unlike conflation in `ConflatedBroadcastChannel` + * that is based on reference identity. + * + * `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking + * more pragmatic design choices for the sake of convenience. + * + * ### Concurrency + * + * All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. + * + * ### Operator fusion + * + * Application of [flowOn], [conflate], and + * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity + * has no effect on the state flow. + * + * ### Implementation notes + * + * State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure + * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when + * using unconfined coroutines. Adding new collectors has `O(1)` amortized cost, but updating a [value] has `O(N)` + * cost, where `N` is the number of active collectors. + */ +@ExperimentalCoroutinesApi +public interface StateFlow : Flow { + /** + * The current value of this state flow. + */ + public val value: T + + /** + * Returns `true` if this state flow is closed. A [value] of the closed state flow does not change. + * A closed state flow emits a single [value] to all its collectors and completes. + * See [MutableStateFlow.close] for details. + * + * **Preview feature**: The design of state flow closing is tentative and could change. + */ + @FlowPreview + public val isClosed: Boolean +} + +/** + * A mutable [StateFlow] that provides a setter for [value] and a method to [close] the flow. + * + * See [StateFlow] documentation for details. + */ +@ExperimentalCoroutinesApi +public interface MutableStateFlow : StateFlow { + /** + * The current value of this state flow. + * + * Setting a value that is [equal][Any.equals] to the previous one does nothing. + * Setting a value to a [closed][isClosed] state flow produces [IllegalStateException]. + */ + public override var value: T + + /** + * Closes this state flow. + * This is an idempotent operation -- subsequent invocations of this function have no effect and return `false`. + * All active and future collectors from this [Flow] complete either normally (when [cause] is null) or + * with the corresponding exception that was specified as a [cause]. + * + * **Preview feature**: The design of state flow closing is tentative and could change. + */ + @FlowPreview + public fun close(cause: Throwable? = null): Boolean +} + +/** + * Creates a [MutableStateFlow] with the given initial [value]. + */ +@Suppress("FunctionName") +@ExperimentalCoroutinesApi +public fun StateFlow(value: T): MutableStateFlow = StateFlowImpl(value ?: NULL) + +// ------------------------------------ Implementation ------------------------------------ + +@SharedImmutable +private val NONE = Symbol("NONE") + +@SharedImmutable +private val PENDING = Symbol("PENDING") + +private const val INITIAL_SIZE = 2 // optimized for just a few collectors + +// StateFlow slots are allocated for its collectors +private class StateFlowSlot { + /** + * Each slot can have one of the following states: + * + * * `null` -- it is not used right now. Can [allocate] to new collector. + * * `NONE` -- used by a collector, but neither suspended nor has pending value. + * * `PENDING` -- pending to process new value. + * * `CancellableContinuationImpl` -- suspended waiting for new value. + * + * It is important that default `null` value is used, because there can be a race between allocation + * of a new slot and trying to do [makePending] on this slot. + */ + private val _state = atomic(null) + + fun allocate(): Boolean { + // No need for atomic check & update here, since allocated happens under StateFlow lock + if (_state.value != null) return false // not free + _state.value = NONE // allocated + return true + } + + fun free() { + _state.value = null // free now + } + + @Suppress("UNCHECKED_CAST") + fun makePending() { + _state.loop { state -> + when { + state == null -> return // this slot is free - skip it + state === PENDING -> return // already pending, nothing to do + state === NONE -> { // mark as pending + if (_state.compareAndSet(state, PENDING)) return + } + else -> { // must be a suspend continuation state + // we must still use CAS here since continuation may get cancelled and free the slot at any time + if (_state.compareAndSet(state, NONE)) { + (state as CancellableContinuationImpl).resume(Unit) + return + } + } + } + } + } + + fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state -> + assert { state !is CancellableContinuationImpl<*> } + return state === PENDING + } + + @Suppress("UNCHECKED_CAST") + suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont -> + assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING + if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending + // CAS failed -- the only possible reason is that it is already in pending state now + assert { _state.value === PENDING } + cont.resume(Unit) + } +} + +private class StateFlowImpl(initialValue: Any) : SynchronizedObject(), MutableStateFlow, FusibleFlow { + private val _state = atomic(initialValue) // T | NULL | Closed + private var sequence = 0 // serializes updates, value update is in process when sequence is odd + private var slots = arrayOfNulls(INITIAL_SIZE) + private var nSlots = 0 // number of allocated (!free) slots + private var nextIndex = 0 // oracle for the next free slot index + + @Suppress("UNCHECKED_CAST") + public override var value: T + get() { + val state = _state.value + val value = (state as? Closed)?.value ?: state + return NULL.unbox(value) + } + set(value) { + check(update(value ?: NULL)) { "StateFlow is closed" } + } + + override val isClosed: Boolean + get() = _state.value is Closed + + // Update returns false when the state flow was already closed + private fun update(newState: Any): Boolean { + var curSequence = 0 + var curSlots: Array = this.slots // benign race, we will not use it + synchronized(this) { + val oldState = _state.value + if (oldState is Closed) return false // already closed + if (oldState == newState) return true // Don't do anything if value is not changing + // When closing, capture the previous value into the closed state + val updatedState = if (newState is Closed) Closed(oldState, newState.cause) else newState + _state.value = updatedState + curSequence = sequence + if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update) + curSequence++ // make it odd + sequence = curSequence + } else { + // update is already in process, notify it, and return + sequence = curSequence + 2 // change sequence to notify, keep it odd + return true + } + curSlots = slots // read current reference to collectors under lock + } + /* + Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines + Loop until we're done firing all the changes. This is sort of simple flat combining that + ensures sequential firing of concurrent updates and avoids the storm of collector resumes + when updates happen concurrently from many threads. + */ + while (true) { + // Benign race on element read from array + for (col in curSlots) { + col?.makePending() + } + // check if the value was updated again while we were updating the old one + synchronized(this) { + if (sequence == curSequence) { // nothing changed, we are done + sequence = curSequence + 1 // make sequence even again + return true // done + } + // reread everything for the next loop under the lock + curSequence = sequence + curSlots = slots + } + } + } + + override fun close(cause: Throwable?): Boolean = + update(Closed(null, cause)) // we don't know the value yet, update will get it under lock + + @Suppress("UNCHECKED_CAST") + override suspend fun collect(collector: FlowCollector) { + val slot = allocateSlot() + var prevValue: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet) + try { + // The loop is arranged so that it starts delivering current value without waiting first + while (true) { + // Here the coroutine could have waited for a while to be dispatched, + // so we use the most recent state here to ensure the best possible conflation of stale values + val state = _state.value + // Conflate value emissions using equality + // This also avoids double-emission of the last value on close, + // but ensures that the most recent value is emitted before close + val newValue = if (state is Closed) state.value else state + if (prevValue == null || newValue != prevValue) { + collector.emit(NULL.unbox(newValue)) + prevValue = newValue + } + // Close if needed + if (state is Closed) { + state.cause?.let { + throw recoverStackTrace(it) // produce a better stack-trace for debugging + } + break + } + // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot + if (!slot.takePending()) { // try fast-path without suspending first + slot.awaitPending() // only suspend for new values when needed + } + } + } finally { + freeSlot(slot) + } + } + + private fun allocateSlot(): StateFlowSlot = synchronized(this) { + val size = slots.size + if (nSlots >= size) slots = slots.copyOf(2 * size) + var index = nextIndex + var slot: StateFlowSlot + while (true) { + slot = slots[index] ?: StateFlowSlot().also { slots[index] = it } + index++ + if (index >= slots.size) index = 0 + if (slot.allocate()) break // break when found and allocated free slot + } + nextIndex = index + nSlots++ + slot + } + + private fun freeSlot(slot: StateFlowSlot): Unit = synchronized(this) { + slot.free() + nSlots-- + } + + override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow { + // context is irrelevant for state flow and it is always conflated + // so it should not do anything unless buffering is requested + return when (capacity) { + Channel.CONFLATED, Channel.RENDEZVOUS -> this + else -> ChannelFlowOperatorImpl(this, context, capacity) + } + } + + private class Closed( + @JvmField val value: Any?, // T | NULL | null (transiently, never null is stored state) + @JvmField val cause: Throwable? + ) +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index f2f748b2ff..994d38074e 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -16,7 +16,27 @@ internal fun Flow.asChannelFlow(): ChannelFlow = this as? ChannelFlow ?: ChannelFlowOperatorImpl(this) /** - * Operators that use channels extend this ChannelFlow and are always fused with each other. + * Operators that can fuse with [buffer] and [flowOn] operators implement this interface. + * + * @suppress **This an internal API and should not be used from general code.** + */ +@InternalCoroutinesApi +public interface FusibleFlow : Flow { + /** + * This function is called by [flowOn] (with context) and [buffer] (with capacity) operators + * that are applied to this flow. + */ + public fun fuse( + context: CoroutineContext = EmptyCoroutineContext, + capacity: Int = Channel.OPTIONAL_CHANNEL + ): FusibleFlow +} + +/** + * Operators that use channels extend this `ChannelFlow` and are always fused with each other. + * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting + * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it + * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name). * * @suppress **This an internal API and should not be used from general code.** */ @@ -26,7 +46,7 @@ public abstract class ChannelFlow( @JvmField public val context: CoroutineContext, // buffer capacity between upstream and downstream context @JvmField public val capacity: Int -) : Flow { +) : FusibleFlow { // shared code to create a suspend lambda from collectTo function in one place internal val collectToFun: suspend (ProducerScope) -> Unit @@ -35,10 +55,7 @@ public abstract class ChannelFlow( private val produceCapacity: Int get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity - public fun update( - context: CoroutineContext = EmptyCoroutineContext, - capacity: Int = Channel.OPTIONAL_CHANNEL - ): ChannelFlow { + public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow { // note: previous upstream context (specified before) takes precedence val newContext = context + this.context val newCapacity = when { diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 5f184e8feb..f5869760e3 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -108,10 +108,10 @@ public fun Flow.buffer(capacity: Int = BUFFERED): Flow { require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) { "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity" } - return if (this is ChannelFlow) - update(capacity = capacity) - else - ChannelFlowOperatorImpl(this, capacity = capacity) + return when (this) { + is FusibleFlow -> fuse(capacity = capacity) + else -> ChannelFlowOperatorImpl(this, capacity = capacity) + } } /** @@ -196,7 +196,7 @@ public fun Flow.flowOn(context: CoroutineContext): Flow { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this - this is ChannelFlow -> update(context = context) + this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/State.kt b/kotlinx-coroutines-core/common/src/flow/operators/State.kt new file mode 100644 index 0000000000..95302ee0f7 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/State.kt @@ -0,0 +1,66 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +@file:JvmMultifileClass +@file:JvmName("FlowKt") + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.jvm.* + +/** + * A job that is created by [stateIn] operator. + * + * **Preview feature**: The design of `stateIn` operator is tentative and could change. + */ +@FlowPreview +public interface StateFlowJob : StateFlow, Job + +/** + * Launches a coroutine that collects this [Flow] and emits all the collected values as a resulting [StateFlow]. + * The result of this function is both a [StateFlow] and a [Job]. This call effectively turns a cold [Flow] into a + * hot, active [Flow], making the most recently emitted value available for consumption at any time via + * [StateFlow.value]. This call suspends until the source flow the emits first value and + * throws [NoSuchElementException] if the flow was empty. + * + * The resulting coroutine can be cancelled by [cancelling][CoroutineScope.cancel] the [scope] in which it is + * launched or by [cancelling][Job.cancel] the resulting [Job]. + * + * Errors in the source flow are not propagated to the [scope] but [close][MutableStateFlow.close] the resulting + * [StateFlow] or are rethrown to the caller of `stateIn` if they happen before emission of the first value. + * + * **Preview feature**: The design of `stateIn` operator is tentative and could change. + */ +@FlowPreview +public suspend fun Flow.stateIn(scope: CoroutineScope): StateFlowJob { + val result = CompletableDeferred>() + scope.launch { + var state: MutableStateFlow? = null + var exception: Throwable? = null + try { + collect { value -> + // Update state flow if initialized + state?.let { it.value = value } ?: run { + // Or create state on the first value if state was not created yet (first value) + state = StateFlow(value).also { + // resume stateIn call with initialized StateFlow and current job + result.complete(StateFlowJobImpl(it, coroutineContext[Job]!!)) + } + } + } + } catch (e: Throwable) { + // Ignore cancellation exception -- it is a normal way to stop the flow + if (e !is CancellationException) exception = e + } + // Close the state flow with exception if initialized + state?.apply { close(exception) } ?: run { + // Or complete the deferred exceptionally if the state was not create yet) + result.completeExceptionally(exception ?: NoSuchElementException("Expected at least one element")) + } + } + return result.await() // tail call +} + +private class StateFlowJobImpl(state: StateFlow, job: Job) : StateFlowJob, StateFlow by state, Job by job \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt new file mode 100644 index 0000000000..9aafcf03d8 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt @@ -0,0 +1,166 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class StateFlowTest : TestBase() { + @Test + fun testNullAndNormalClose() = runTest { + expect(1) + val state = StateFlow(0) + assertFalse(state.isClosed) + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + state.collect { value -> + when (value) { + 0 -> expect(3) + 1 -> expect(5) + null -> expect(8) + 2 -> expect(10) + else -> expectUnreached() + } + } + expect(12) + } + expect(4) // collector is waiting + state.value = 1 // fire in the hole! + assertFalse(state.isClosed) + assertEquals(1, state.value) + yield() + expect(6) + state.value = 1 // same value, nothing happens + yield() + expect(7) + state.value = null // null value + assertFalse(state.isClosed) + assertNull(state.value) + yield() + expect(9) + state.value = 2 // another value + assertFalse(state.isClosed) + assertEquals(2, state.value) + yield() + expect(11) + state.close() + assertTrue(state.isClosed) + assertEquals(2, state.value) // the last value is still there + yield() + finish(13) + } + + @Test + fun testCloseWithException() = runTest { + expect(1) + val state = StateFlow("A") + assertFalse(state.isClosed) + assertEquals("A", state.value) + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + try { + state.collect { value -> + when (value) { + "A" -> expect(3) + "B" -> expect(5) + else -> expectUnreached() + } + } + } catch (e: TestException) { + expect(7) + return@launch + } + expectUnreached() + } + expect(4) + state.value = "B" + assertFalse(state.isClosed) + assertEquals("B", state.value) + yield() + expect(6) + state.close(TestException("OK")) + assertTrue(state.isClosed) + assertEquals("B", state.value) // the last value is still there + yield() + finish(8) + } + + @Test + fun testCollectClosed() = runTest { + expect(1) + val state = StateFlow(0) + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + assertEquals(listOf(0, 42), state.toList()) // collects initial value and update + expect(3) + } + state.value = 42 // update + state.close() // and immediately close + yield() + assertEquals(listOf(42), state.toList()) // last value only when collecting from closed state flow + finish(4) + } + + @Test + fun testEqualsConflation() = runTest { + expect(1) + val state = StateFlow(Data(0)) + launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + state.collect { value -> + when(value.i) { + 0 -> expect(3) // initial value + 2 -> expect(5) + 4 -> expect(7) + else -> error("Unexpected $value") + } + } + expect(9) + } + state.value = Data(1) // conflated + state.value = Data(0) // equals to last emitted + yield() // no repeat zero + state.value = Data(3) // conflated + state.value = Data(2) // delivered + expect(4) + yield() + state.value = Data(2) // equals to last one, dropped + yield() + state.value = Data(5) // conflated + state.value = Data(4) // delivered + expect(6) + yield() + state.close() // last value will not repeat as a side-effect of close! + expect(8) + yield() + finish(10) + } + + data class Data(val i: Int) + + @Test + fun testDataModel() = runTest { + val s = CounterModel() + launch { + val sum = s.counter.take(11).sum() + assertEquals(55, sum) + } + repeat(10) { + yield() + s.inc() + } + } + + class CounterModel { + // private data flow + private val _counter = StateFlow(0) + // publicly exposed as a flow + val counter: StateFlow get() = _counter + + fun inc() { + _counter.value++ + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/common/test/flow/StateInTest.kt b/kotlinx-coroutines-core/common/test/flow/StateInTest.kt new file mode 100644 index 0000000000..8cf7cf8680 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/StateInTest.kt @@ -0,0 +1,79 @@ +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class StateInTest : TestBase() { + @Test + fun testEmptyFlow() = runTest { + assertFailsWith { + emptyFlow().stateIn(this) + } + } + + @Test + fun testFailingFlow() = runTest { + assertFailsWith { + flow { throw TestException() }.stateIn(this) + } + } + + @Test + fun testOneElementFlow() = runTest { + val state = flowOf("OK").onCompletion { yield() }.stateIn(this) + assertEquals("OK", state.value) + assertFalse(state.isClosed) + yield() + assertEquals("OK", state.value) + assertTrue(state.isClosed) + } + + @Test + fun testStateFlowJobCancellation() = runTest { + val flow = flow { + repeat(10) { + emit(it) + yield() + } + } + val state = flow.stateIn(this) + assertEquals(0, state.value) + yield() + assertEquals(1, state.value) + state.cancel() // cancel this job + yield() + assertEquals(1, state.value) + assertTrue(state.isClosed) + // check that state is closed normally with value one + assertEquals(listOf(1), state.toList()) + } + + @Test + fun testStateFlowFailure() = runTest { + expect(1) + val flow = flow { + expect(2) + emit("OK") + yield() + expect(4) + throw TestException() + } + val state = flow.stateIn(this) + expect(3) + assertEquals("OK", state.value) + assertFalse(state.isClosed) + yield() + expect(5) + assertEquals("OK", state.value) + assertTrue(state.isClosed) + // try to collect fails after producing the value + assertFailsWith { + expect(6) + state.collect { + assertEquals("OK", it) + expect(7) + } + } + finish(8) + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt new file mode 100644 index 0000000000..7e97c41a33 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.random.* + +class StateFlowStressTest : TestBase() { + private val nSeconds = 3 * stressTestMultiplier + private val state = StateFlow(0) + private lateinit var pool: ExecutorCoroutineDispatcher + + @After + fun tearDown() { + pool.close() + } + + fun stress(nEmitters: Int, nCollectors: Int) = runTest { + pool = newFixedThreadPoolContext(nEmitters + nCollectors, "StateFlowStressTest") + val collected = Array(nCollectors) { LongArray(nEmitters) } + val collectors = launch { + repeat(nCollectors) { collector -> + launch(pool) { + val c = collected[collector] + // collect, but abort and collect again after every 1000 values to stress allocation/deallocation + do { + val batchSize = Random.nextInt(1..1000) + var index = 0 + val cnt = state.onEach { value -> + val emitter = (value % nEmitters).toInt() + val current = value / nEmitters + // the first value in batch is allowed to repeat, but cannot go back + val ok = if (index == 0) current >= c[emitter] else current > c[emitter] + check(ok) { + "Values must be monotonic, but $current is not, " + + "was ${c[emitter]} in collector #$collector from emitter #$emitter" + } + c[emitter] = current + }.take(batchSize).map { 1 }.sum() + } while (cnt == batchSize) + } + } + } + val emitted = LongArray(nEmitters) + val emitters = launch { + repeat(nEmitters) { emitter -> + launch(pool) { + var current = 1L + while (true) { + state.value = current * nEmitters + emitter + emitted[emitter] = current + current++ + if (current % 1000 == 0L) yield() // make it cancellable + } + } + } + } + for (second in 1..nSeconds) { + delay(1000) + val cs = collected.map { it.sum() } + println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + } + emitters.cancelAndJoin() + state.close() + collectors.join() + // make sure nothing hanged up + require(collected.all { c -> + c.withIndex().all { (emitter, current) -> current > emitted[emitter] / 2 } + }) + } + + @Test + fun testSingleEmitterAndCollector() = stress(1, 1) + + @Test + fun testTenEmittersAndCollectors() = stress(10, 10) +} \ No newline at end of file