Skip to content

Commit

Permalink
Introduce SharedFlow collect overload and override that return Nothing (
Browse files Browse the repository at this point in the history
#2803)

* Introduce SharedFlow collect overload and override that return Nothing
* Override will ensure the proper implementation of the interface
* collect extension is added as the very basic lint helper

Fixes #2789
Fixes #2502

Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com>
  • Loading branch information
qwwdfsad and dkhalanskyjb committed Oct 20, 2021
1 parent 7755edb commit e60685f
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 2 deletions.
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -902,6 +902,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -1063,6 +1064,7 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
}

public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getReplayCache ()Ljava/util/List;
}

Expand Down
15 changes: 14 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Expand Up @@ -129,6 +129,19 @@ public interface SharedFlow<out T> : Flow<T> {
* A snapshot of the replay cache.
*/
public val replayCache: List<T>

/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
* This method should never be used directly. To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
* should be used.
*
* **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator
* on a shared flow never completes normally.
*
* @see [Flow.collect]
*/
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>): Nothing
}

/**
Expand Down Expand Up @@ -344,7 +357,7 @@ internal open class SharedFlowImpl<T>(
get() = buffer!!.getBufferAt(replayIndex + replaySize - 1) as T

@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Expand Up @@ -380,7 +380,7 @@ private class StateFlowImpl<T>(
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}

override suspend fun collect(collector: FlowCollector<T>) {
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
Expand Down
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Expand Up @@ -73,6 +73,19 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
override suspend fun emit(value: T) = action(value)
})

/**
* Terminal flow operator that collects the given [SharedFlow] with the provided [action].
* If any exception occurs during `collect` or in the provided flow, this exception is rethrown from this method.
*
* This is a counterpart of a regular [Flow.collect] extension, only different in the return type
* so that any code below `collect` produces a compilation warning.
*/
public suspend inline fun <T> SharedFlow<T>.collect(crossinline action: suspend (value: T) -> Unit): Nothing {
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
}

/**
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
Expand Down

0 comments on commit e60685f

Please sign in to comment.