diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 0368cf1cc0..c91d329465 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -998,7 +998,26 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; } -public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/Flow { +public final class kotlinx/coroutines/flow/LintKt { + public static final fun conflate (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow; + public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/StateFlow;)Lkotlinx/coroutines/flow/Flow; + public static final fun flowOn (Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;)Lkotlinx/coroutines/flow/Flow; +} + +public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotlinx/coroutines/flow/StateFlow { + public abstract fun getValue ()Ljava/lang/Object; + public abstract fun setValue (Ljava/lang/Object;)V +} + +public abstract interface class kotlinx/coroutines/flow/StateFlow : kotlinx/coroutines/flow/Flow { + public abstract fun getValue ()Ljava/lang/Object; +} + +public final class kotlinx/coroutines/flow/StateFlowKt { + public static final fun MutableStateFlow (Ljava/lang/Object;)Lkotlinx/coroutines/flow/MutableStateFlow; +} + +public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/coroutines/flow/internal/FusibleFlow { public final field capacity I public final field context Lkotlin/coroutines/CoroutineContext; public fun (Lkotlin/coroutines/CoroutineContext;I)V @@ -1007,10 +1026,9 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor public fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun collectTo (Lkotlinx/coroutines/channels/ProducerScope;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; protected abstract fun create (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; + public fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow; public fun produceImpl (Lkotlinx/coroutines/CoroutineScope;)Lkotlinx/coroutines/channels/ReceiveChannel; public fun toString ()Ljava/lang/String; - public final fun update (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/ChannelFlow; - public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow; } public final class kotlinx/coroutines/flow/internal/CombineKt { @@ -1021,6 +1039,14 @@ public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt { public static final fun checkIndexOverflow (I)I } +public abstract interface class kotlinx/coroutines/flow/internal/FusibleFlow : kotlinx/coroutines/flow/Flow { + public abstract fun fuse (Lkotlin/coroutines/CoroutineContext;I)Lkotlinx/coroutines/flow/internal/FusibleFlow; +} + +public final class kotlinx/coroutines/flow/internal/FusibleFlow$DefaultImpls { + public static synthetic fun fuse$default (Lkotlinx/coroutines/flow/internal/FusibleFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/FusibleFlow; +} + public final class kotlinx/coroutines/flow/internal/SafeCollector_commonKt { public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; } diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 378b9f148e..b7e2518694 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -156,6 +156,12 @@ import kotlin.coroutines.* * * Flow is [Reactive Streams](http://www.reactive-streams.org/) compliant, you can safely interop it with * reactive streams using [Flow.asPublisher] and [Publisher.asFlow] from `kotlinx-coroutines-reactive` module. + * + * ### Not stable for inheritance + * + * **`Flow` interface is not stable for inheritance in 3rd party libraries**, as new methods + * might be added to this interface in the future, but is stable for use. + * Use `flow { ... }` builder function to create an implementation. */ public interface Flow { /** diff --git a/kotlinx-coroutines-core/common/src/flow/Migration.kt b/kotlinx-coroutines-core/common/src/flow/Migration.kt index bdc205e082..fc34f85b4c 100644 --- a/kotlinx-coroutines-core/common/src/flow/Migration.kt +++ b/kotlinx-coroutines-core/common/src/flow/Migration.kt @@ -22,7 +22,7 @@ import kotlin.jvm.* * Deprecated functions also are moved here when they renamed. The difference is that they have * a body with their implementation while pure stubs have [noImpl]. */ -private fun noImpl(): Nothing = +internal fun noImpl(): Nothing = throw UnsupportedOperationException("Not implemented, should not be called") /** diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt new file mode 100644 index 0000000000..f14a1ac45e --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -0,0 +1,316 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.atomicfu.* +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.flow.internal.* +import kotlinx.coroutines.internal.* +import kotlin.coroutines.* +import kotlin.native.concurrent.* + +/** + * A [Flow] that represents a read-only state with a single updatable data [value] that emits updates + * to the value to its collectors. The current value can be retrieved via [value] property. + * The flow of future updates to the value can be observed by collecting values from this flow. + * + * A [mutable state flow][MutableStateFlow] is created using `MutableStateFlow(value)` constructor function with + * the initial value. The value of mutable state flow can be updated by setting its [value] property. + * Updates to the [value] are always [conflated][Flow.conflate]. So a slow collector skips fast updates, + * but always collects the most recently emitted value. + * + * [StateFlow] is useful as a data-model class to represent any kind of state. + * Derived values can be defined using various operators on the flows, with [combine] operator being especially + * useful to combine values from multiple state flows using arbitrary functions. + * + * For example, the following class encapsulates an integer state and increments its value on each call to `inc`: + * + * ``` + * class CounterModel { + * private val _counter = MutableStateFlow(0) // private mutable state flow + * val counter: StateFlow get() = _counter // publicly exposed as read-only state flow + * + * fun inc() { + * _counter.value++ + * } + * } + * ``` + * + * Having two instances of the above `CounterModel` class one can define the sum of their counters like this: + * + * ``` + * val aModel = CounterModel() + * val bModel = CounterModel() + * val sumFlow: Flow = aModel.counter.combine(bModel.counter) { a, b -> a + b } + * ``` + * + * ### Strong equality-based conflation + * + * Values in state flow are conflated using [Any.equals] comparison in a similar way to + * [distinctUntilChanged] operator. It is used to conflate incoming updates + * to [value][MutableStateFlow.value] in [MutableStateFlow] and to suppress emission of the values to collectors + * when new value is equal to the previously emitted one. State flow behavior with classes that violate + * the contract for [Any.equals] is unspecified. + * + * ### StateFlow vs ConflatedBroadcastChannel + * + * Conceptually state flow is similar to + * [ConflatedBroadcastChannel][kotlinx.coroutines.channels.ConflatedBroadcastChannel] + * and is designed to completely replace `ConflatedBroadcastChannel` in the future. + * It has the following important difference: + * + * * `StateFlow` is simpler, because it does not have to implement all the [Channel] APIs, which allows + * for faster, garbage-free implementation, unlike `ConflatedBroadcastChannel` implementation that + * allocates objects on each emitted value. + * * `StateFlow` always has a value which can be safely read at any time via [value] property. + * Unlike `ConflatedBroadcastChannel`, there is no way to create a state flow without a value. + * * `StateFlow` has a clear separation into a read-only `StateFlow` interface and a [MutableStateFlow]. + * * `StateFlow` conflation is based on equality like [distinctUntilChanged] operator, + * unlike conflation in `ConflatedBroadcastChannel` that is based on reference identity. + * * `StateFlow` cannot be currently closed like `ConflatedBroadcastChannel` and can never represent a failure. + * This feature might be added in the future if enough compelling use-cases are found. + * + * `StateFlow` is designed to better cover typical use-cases of keeping track of state changes in time, taking + * more pragmatic design choices for the sake of convenience. + * + * ### Concurrency + * + * All methods of data flow are **thread-safe** and can be safely invoked from concurrent coroutines without + * external synchronization. + * + * ### Operator fusion + * + * Application of [flowOn][Flow.flowOn], [conflate][Flow.conflate], + * [buffer] with [CONFLATED][Channel.CONFLATED] or [RENDEZVOUS][Channel.RENDEZVOUS] capacity, + * or a [distinctUntilChanged][Flow.distinctUntilChanged] operator has no effect on the state flow. + * + * ### Implementation notes + * + * State flow implementation is optimized for memory consumption and allocation-freedom. It uses a lock to ensure + * thread-safety, but suspending collector coroutines are resumed outside of this lock to avoid dead-locks when + * using unconfined coroutines. Adding new collectors has `O(1)` amortized cost, but updating a [value] has `O(N)` + * cost, where `N` is the number of active collectors. + * + * ### Not stable for inheritance + * + * **`StateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods + * might be added to this interface in the future, but is stable for use. + * Use `MutableStateFlow()` constructor function to create an implementation. + */ +@ExperimentalCoroutinesApi +public interface StateFlow : Flow { + /** + * The current value of this state flow. + */ + public val value: T +} + +/** + * A mutable [StateFlow] that provides a setter for [value] and a method to [close] the flow. + * + * See [StateFlow] documentation for details. + * + * ### Not stable for inheritance + * + * **`MutableStateFlow` interface is not stable for inheritance in 3rd party libraries**, as new methods + * might be added to this interface in the future, but is stable for use. + * Use `MutableStateFlow()` constructor function to create an implementation. + */ +@ExperimentalCoroutinesApi +public interface MutableStateFlow : StateFlow { + /** + * The current value of this state flow. + * + * Setting a value that is [equal][Any.equals] to the previous one does nothing. + */ + public override var value: T +} + +/** + * Creates a [MutableStateFlow] with the given initial [value]. + */ +@Suppress("FunctionName") +@ExperimentalCoroutinesApi +public fun MutableStateFlow(value: T): MutableStateFlow = StateFlowImpl(value ?: NULL) + +// ------------------------------------ Implementation ------------------------------------ + +@SharedImmutable +private val NONE = Symbol("NONE") + +@SharedImmutable +private val PENDING = Symbol("PENDING") + +private const val INITIAL_SIZE = 2 // optimized for just a few collectors + +// StateFlow slots are allocated for its collectors +private class StateFlowSlot { + /** + * Each slot can have one of the following states: + * + * * `null` -- it is not used right now. Can [allocate] to new collector. + * * `NONE` -- used by a collector, but neither suspended nor has pending value. + * * `PENDING` -- pending to process new value. + * * `CancellableContinuationImpl` -- suspended waiting for new value. + * + * It is important that default `null` value is used, because there can be a race between allocation + * of a new slot and trying to do [makePending] on this slot. + */ + private val _state = atomic(null) + + fun allocate(): Boolean { + // No need for atomic check & update here, since allocated happens under StateFlow lock + if (_state.value != null) return false // not free + _state.value = NONE // allocated + return true + } + + fun free() { + _state.value = null // free now + } + + @Suppress("UNCHECKED_CAST") + fun makePending() { + _state.loop { state -> + when { + state == null -> return // this slot is free - skip it + state === PENDING -> return // already pending, nothing to do + state === NONE -> { // mark as pending + if (_state.compareAndSet(state, PENDING)) return + } + else -> { // must be a suspend continuation state + // we must still use CAS here since continuation may get cancelled and free the slot at any time + if (_state.compareAndSet(state, NONE)) { + (state as CancellableContinuationImpl).resume(Unit) + return + } + } + } + } + } + + fun takePending(): Boolean = _state.getAndSet(NONE)!!.let { state -> + assert { state !is CancellableContinuationImpl<*> } + return state === PENDING + } + + @Suppress("UNCHECKED_CAST") + suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont -> + assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING + if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending + // CAS failed -- the only possible reason is that it is already in pending state now + assert { _state.value === PENDING } + cont.resume(Unit) + } +} + +private class StateFlowImpl(initialValue: Any) : SynchronizedObject(), MutableStateFlow, FusibleFlow { + private val _state = atomic(initialValue) // T | NULL + private var sequence = 0 // serializes updates, value update is in process when sequence is odd + private var slots = arrayOfNulls(INITIAL_SIZE) + private var nSlots = 0 // number of allocated (!free) slots + private var nextIndex = 0 // oracle for the next free slot index + + @Suppress("UNCHECKED_CAST") + public override var value: T + get() = NULL.unbox(_state.value) + set(value) { + var curSequence = 0 + var curSlots: Array = this.slots // benign race, we will not use it + val newState = value ?: NULL + synchronized(this) { + val oldState = _state.value + if (oldState == newState) return // Don't do anything if value is not changing + _state.value = newState + curSequence = sequence + if (curSequence and 1 == 0) { // even sequence means quiescent state flow (no ongoing update) + curSequence++ // make it odd + sequence = curSequence + } else { + // update is already in process, notify it, and return + sequence = curSequence + 2 // change sequence to notify, keep it odd + return + } + curSlots = slots // read current reference to collectors under lock + } + /* + Fire value updates outside of the lock to avoid deadlocks with unconfined coroutines + Loop until we're done firing all the changes. This is sort of simple flat combining that + ensures sequential firing of concurrent updates and avoids the storm of collector resumes + when updates happen concurrently from many threads. + */ + while (true) { + // Benign race on element read from array + for (col in curSlots) { + col?.makePending() + } + // check if the value was updated again while we were updating the old one + synchronized(this) { + if (sequence == curSequence) { // nothing changed, we are done + sequence = curSequence + 1 // make sequence even again + return // done + } + // reread everything for the next loop under the lock + curSequence = sequence + curSlots = slots + } + } + } + + override suspend fun collect(collector: FlowCollector) { + val slot = allocateSlot() + var prevState: Any? = null // previously emitted T!! | NULL (null -- nothing emitted yet) + try { + // The loop is arranged so that it starts delivering current value without waiting first + while (true) { + // Here the coroutine could have waited for a while to be dispatched, + // so we use the most recent state here to ensure the best possible conflation of stale values + val newState = _state.value + // Conflate value emissions using equality + if (prevState == null || newState != prevState) { + collector.emit(NULL.unbox(newState)) + prevState = newState + } + // Note: if awaitPending is cancelled, then it bails out of this loop and calls freeSlot + if (!slot.takePending()) { // try fast-path without suspending first + slot.awaitPending() // only suspend for new values when needed + } + } + } finally { + freeSlot(slot) + } + } + + private fun allocateSlot(): StateFlowSlot = synchronized(this) { + val size = slots.size + if (nSlots >= size) slots = slots.copyOf(2 * size) + var index = nextIndex + var slot: StateFlowSlot + while (true) { + slot = slots[index] ?: StateFlowSlot().also { slots[index] = it } + index++ + if (index >= slots.size) index = 0 + if (slot.allocate()) break // break when found and allocated free slot + } + nextIndex = index + nSlots++ + slot + } + + private fun freeSlot(slot: StateFlowSlot): Unit = synchronized(this) { + slot.free() + nSlots-- + } + + override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow { + // context is irrelevant for state flow and it is always conflated + // so it should not do anything unless buffering is requested + return when (capacity) { + Channel.CONFLATED, Channel.RENDEZVOUS -> this + else -> ChannelFlowOperatorImpl(this, context, capacity) + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt index f2f748b2ff..994d38074e 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/ChannelFlow.kt @@ -16,7 +16,27 @@ internal fun Flow.asChannelFlow(): ChannelFlow = this as? ChannelFlow ?: ChannelFlowOperatorImpl(this) /** - * Operators that use channels extend this ChannelFlow and are always fused with each other. + * Operators that can fuse with [buffer] and [flowOn] operators implement this interface. + * + * @suppress **This an internal API and should not be used from general code.** + */ +@InternalCoroutinesApi +public interface FusibleFlow : Flow { + /** + * This function is called by [flowOn] (with context) and [buffer] (with capacity) operators + * that are applied to this flow. + */ + public fun fuse( + context: CoroutineContext = EmptyCoroutineContext, + capacity: Int = Channel.OPTIONAL_CHANNEL + ): FusibleFlow +} + +/** + * Operators that use channels extend this `ChannelFlow` and are always fused with each other. + * This class servers as a skeleton implementation of [FusibleFlow] and provides other cross-cutting + * methods like ability to [produceIn] and [broadcastIn] the corresponding flow, thus making it + * possible to directly use the backing channel if it exists (hence the `ChannelFlow` name). * * @suppress **This an internal API and should not be used from general code.** */ @@ -26,7 +46,7 @@ public abstract class ChannelFlow( @JvmField public val context: CoroutineContext, // buffer capacity between upstream and downstream context @JvmField public val capacity: Int -) : Flow { +) : FusibleFlow { // shared code to create a suspend lambda from collectTo function in one place internal val collectToFun: suspend (ProducerScope) -> Unit @@ -35,10 +55,7 @@ public abstract class ChannelFlow( private val produceCapacity: Int get() = if (capacity == Channel.OPTIONAL_CHANNEL) Channel.BUFFERED else capacity - public fun update( - context: CoroutineContext = EmptyCoroutineContext, - capacity: Int = Channel.OPTIONAL_CHANNEL - ): ChannelFlow { + public override fun fuse(context: CoroutineContext, capacity: Int): FusibleFlow { // note: previous upstream context (specified before) takes precedence val newContext = context + this.context val newCapacity = when { diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt index 5f184e8feb..4cb17428cc 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Context.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Context.kt @@ -108,10 +108,10 @@ public fun Flow.buffer(capacity: Int = BUFFERED): Flow { require(capacity >= 0 || capacity == BUFFERED || capacity == CONFLATED) { "Buffer size should be non-negative, BUFFERED, or CONFLATED, but was $capacity" } - return if (this is ChannelFlow) - update(capacity = capacity) - else - ChannelFlowOperatorImpl(this, capacity = capacity) + return when (this) { + is FusibleFlow -> fuse(capacity = capacity) + else -> ChannelFlowOperatorImpl(this, capacity = capacity) + } } /** @@ -145,6 +145,10 @@ public fun Flow.buffer(capacity: Int = BUFFERED): Flow { * Adjacent applications of `conflate`/[buffer], [channelFlow], [flowOn], [produceIn], and [broadcastIn] are * always fused so that only one properly configured channel is used for execution. * **Conflation takes precedence over `buffer()` calls with any other capacity.** + * + * Note that any instance of [StateFlow] already behaves as if `conflate` operator is + * applied to it, so applying `conflate` to a `StateFlow` has not effect. + * See [StateFlow] documentation on Operator Fusion. */ public fun Flow.conflate(): Flow = buffer(CONFLATED) @@ -190,13 +194,16 @@ public fun Flow.conflate(): Flow = buffer(CONFLATED) * .flowOn(Dispatchers.Default) * ``` * + * Note that an instance of [StateFlow] does not have an execution context by itself, + * so applying `flowOn` to a `StateFlow` has not effect. See [StateFlow] documentation on Operator Fusion. + * * @throws [IllegalArgumentException] if provided context contains [Job] instance. */ public fun Flow.flowOn(context: CoroutineContext): Flow { checkFlowContext(context) return when { context == EmptyCoroutineContext -> this - this is ChannelFlow -> update(context = context) + this is FusibleFlow -> fuse(context = context) else -> ChannelFlowOperatorImpl(this, context = context) } } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt b/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt index 6b8c61c1af..35048484d2 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Distinct.kt @@ -7,15 +7,22 @@ package kotlinx.coroutines.flow -import kotlinx.coroutines.* import kotlinx.coroutines.flow.internal.* import kotlin.jvm.* import kotlinx.coroutines.flow.internal.unsafeFlow as flow /** * Returns flow where all subsequent repetitions of the same value are filtered out. + * + * Note that any instance of [StateFlow] already behaves as if `distinctUtilChanged` operator is + * applied to it, so applying `distinctUntilChanged` to a `StateFlow` has no effect. + * See [StateFlow] documentation on Operator Fusion. */ -public fun Flow.distinctUntilChanged(): Flow = distinctUntilChangedBy { it } +public fun Flow.distinctUntilChanged(): Flow = + when (this) { + is StateFlow<*> -> this + else -> distinctUntilChangedBy { it } + } /** * Returns flow where all subsequent repetitions of the same value are filtered out, when compared diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt new file mode 100644 index 0000000000..23bc1f84cb --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/operators/Lint.kt @@ -0,0 +1,43 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlin.coroutines.* + +/** + * Returns this. + * Applying [flowOn][Flow.flowOn] operator to [StateFlow] has no effect. + * See [StateFlow] documentation on Operator Fusion. + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Applying flowOn operator to StateFlow has no effect. See StateFlow documentation on Operator Fusion.", + replaceWith = ReplaceWith("this") +) +public fun StateFlow.flowOn(context: CoroutineContext): Flow = noImpl() + +/** + * Returns this. + * Applying [conflate][Flow.conflate] operator to [StateFlow] has no effect. + * See [StateFlow] documentation on Operator Fusion. + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Applying conflate operator to StateFlow has no effect. See StateFlow documentation on Operator Fusion.", + replaceWith = ReplaceWith("this") +) +public fun StateFlow.conflate(): Flow = noImpl() + +/** + * Returns this. + * Applying [distinctUntilChanged][Flow.distinctUntilChanged] operator to [StateFlow] has no effect. + * See [StateFlow] documentation on Operator Fusion. + */ +@Deprecated( + level = DeprecationLevel.ERROR, + message = "Applying distinctUntilChanged operator to StateFlow has no effect. See StateFlow documentation on Operator Fusion.", + replaceWith = ReplaceWith("this") +) +public fun StateFlow.distinctUntilChanged(): Flow = noImpl() diff --git a/kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt new file mode 100644 index 0000000000..a6be97eb97 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/StateFlowTest.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class StateFlowTest : TestBase() { + @Test + fun testNormalAndNull() = runTest { + expect(1) + val state = MutableStateFlow(0) + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + assertFailsWith { + state.collect { value -> + when (value) { + 0 -> expect(3) + 1 -> expect(5) + null -> expect(8) + 2 -> expect(10) + else -> expectUnreached() + } + } + } + expect(12) + } + expect(4) // collector is waiting + state.value = 1 // fire in the hole! + assertEquals(1, state.value) + yield() + expect(6) + state.value = 1 // same value, nothing happens + yield() + expect(7) + state.value = null // null value + assertNull(state.value) + yield() + expect(9) + state.value = 2 // another value + assertEquals(2, state.value) + yield() + expect(11) + job.cancel() + yield() + finish(13) + } + + @Test + fun testEqualsConflation() = runTest { + expect(1) + val state = MutableStateFlow(Data(0)) + val job = launch(start = CoroutineStart.UNDISPATCHED) { + expect(2) + assertFailsWith { + state.collect { value -> + when(value.i) { + 0 -> expect(3) // initial value + 2 -> expect(5) + 4 -> expect(7) + else -> error("Unexpected $value") + } + } + } + expect(9) + } + state.value = Data(1) // conflated + state.value = Data(0) // equals to last emitted + yield() // no repeat zero + state.value = Data(3) // conflated + state.value = Data(2) // delivered + expect(4) + yield() + state.value = Data(2) // equals to last one, dropped + yield() + state.value = Data(5) // conflated + state.value = Data(4) // delivered + expect(6) + yield() + expect(8) + job.cancel() + yield() + finish(10) + } + + data class Data(val i: Int) + + @Test + fun testDataModel() = runTest { + val s = CounterModel() + launch { + val sum = s.counter.take(11).sum() + assertEquals(55, sum) + } + repeat(10) { + yield() + s.inc() + } + } + + class CounterModel { + // private data flow + private val _counter = MutableStateFlow(0) + // publicly exposed as a flow + val counter: StateFlow get() = _counter + + fun inc() { + _counter.value++ + } + } +} \ No newline at end of file diff --git a/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt new file mode 100644 index 0000000000..dc3cd43c93 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/StateFlowStressTest.kt @@ -0,0 +1,80 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.* +import kotlin.random.* + +class StateFlowStressTest : TestBase() { + private val nSeconds = 3 * stressTestMultiplier + private val state = MutableStateFlow(0) + private lateinit var pool: ExecutorCoroutineDispatcher + + @After + fun tearDown() { + pool.close() + } + + fun stress(nEmitters: Int, nCollectors: Int) = runTest { + pool = newFixedThreadPoolContext(nEmitters + nCollectors, "StateFlowStressTest") + val collected = Array(nCollectors) { LongArray(nEmitters) } + val collectors = launch { + repeat(nCollectors) { collector -> + launch(pool) { + val c = collected[collector] + // collect, but abort and collect again after every 1000 values to stress allocation/deallocation + do { + val batchSize = Random.nextInt(1..1000) + var index = 0 + val cnt = state.onEach { value -> + val emitter = (value % nEmitters).toInt() + val current = value / nEmitters + // the first value in batch is allowed to repeat, but cannot go back + val ok = if (index++ == 0) current >= c[emitter] else current > c[emitter] + check(ok) { + "Values must be monotonic, but $current is not, " + + "was ${c[emitter]} in collector #$collector from emitter #$emitter" + } + c[emitter] = current + + }.take(batchSize).map { 1 }.sum() + } while (cnt == batchSize) + } + } + } + val emitted = LongArray(nEmitters) + val emitters = launch { + repeat(nEmitters) { emitter -> + launch(pool) { + var current = 1L + while (true) { + state.value = current * nEmitters + emitter + emitted[emitter] = current + current++ + if (current % 1000 == 0L) yield() // make it cancellable + } + } + } + } + for (second in 1..nSeconds) { + delay(1000) + val cs = collected.map { it.sum() } + println("$second: emitted=${emitted.sum()}, collected=${cs.min()}..${cs.max()}") + } + emitters.cancelAndJoin() + collectors.cancelAndJoin() + // make sure nothing hanged up + require(collected.all { c -> + c.withIndex().all { (emitter, current) -> current > emitted[emitter] / 2 } + }) + } + + @Test + fun testSingleEmitterAndCollector() = stress(1, 1) + + @Test + fun testTenEmittersAndCollectors() = stress(10, 10) +} \ No newline at end of file