Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SharedFlow collect overload and override that return Nothing #2803

Merged
merged 8 commits into from Oct 20, 2021
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -900,6 +900,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -1061,6 +1062,7 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
}

public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getReplayCache ()Ljava/util/List;
}

Expand Down
15 changes: 14 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/SharedFlow.kt
Expand Up @@ -129,6 +129,19 @@ public interface SharedFlow<out T> : Flow<T> {
* A snapshot of the replay cache.
*/
public val replayCache: List<T>

/**
* Accepts the given [collector] and [emits][FlowCollector.emit] values into it.
* This method should never be used directly. To collect shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* should be used.
*
* **Shared flow never completes**. A call to [Flow.collect] or any other terminal operator
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* on a shared flow never complete normally.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* @see [Flow.collect]
*/
@InternalCoroutinesApi
override suspend fun collect(collector: FlowCollector<T>): Nothing
}

/**
Expand Down Expand Up @@ -335,7 +348,7 @@ private class SharedFlowImpl<T>(
}

@Suppress("UNCHECKED_CAST")
override suspend fun collect(collector: FlowCollector<T>) {
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/StateFlow.kt
Expand Up @@ -380,7 +380,7 @@ private class StateFlowImpl<T>(
throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported")
}

override suspend fun collect(collector: FlowCollector<T>) {
override suspend fun collect(collector: FlowCollector<T>): Nothing {
val slot = allocateSlot()
try {
if (collector is SubscribedFlowCollector) collector.onSubscription()
Expand Down
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Expand Up @@ -73,6 +73,19 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
override suspend fun emit(value: T) = action(value)
})

/**
* Terminal flow operator that collects the given [SharedFlow] with a provided [action].
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*
* This is a counterpart of a regular [Flow.collect] extension with the only difference in return type,
* so any code below `collect` will produce compilation warning.
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved
*/
public suspend inline fun <T> SharedFlow<T>.collect(crossinline action: suspend (value: T) -> Unit): Nothing {
collect(object : FlowCollector<T> {
override suspend fun emit(value: T) = action(value)
})
}

/**
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
Expand Down