From 53669a8393e20c89d5537647a8f1f11f4ba01b43 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 1 Jul 2021 16:15:45 +0300 Subject: [PATCH 1/8] Introduce SharedFlow collect overload and override that return Nothing * Override will ensure the proper implementation of the interface * collect extension is added as the very basic lint helper Fixes #2789 Fixes #2502 --- .../api/kotlinx-coroutines-core.api | 2 ++ .../common/src/flow/SharedFlow.kt | 15 ++++++++++++++- .../common/src/flow/StateFlow.kt | 2 +- .../common/src/flow/terminal/Collect.kt | 13 +++++++++++++ 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 50bfb60d62..5f7607c581 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -900,6 +900,7 @@ public final class kotlinx/coroutines/flow/FlowKt { public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public static final fun collect (Lkotlinx/coroutines/flow/SharedFlow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun collectLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public static final fun combine (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow; @@ -1061,6 +1062,7 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli } public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow { + public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public abstract fun getReplayCache ()Ljava/util/List; } diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 9bcf088e95..bf1c90bd48 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -129,6 +129,19 @@ public interface SharedFlow : Flow { * A snapshot of the replay cache. */ public val replayCache: List + + /** + * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. + * This method should never be implemented or used directly. To collect shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension + * should be used. + * + * **Shared flow never completes**. A call to [Flow.collect] or any other terminal operator + * on a shared flow never complete normally. + * + * @see [Flow.collect] + */ + @InternalCoroutinesApi + override suspend fun collect(collector: FlowCollector): Nothing } /** @@ -335,7 +348,7 @@ private class SharedFlowImpl( } @Suppress("UNCHECKED_CAST") - override suspend fun collect(collector: FlowCollector) { + override suspend fun collect(collector: FlowCollector): Nothing { val slot = allocateSlot() try { if (collector is SubscribedFlowCollector) collector.onSubscription() diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 9e82e78771..bb79363230 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -380,7 +380,7 @@ private class StateFlowImpl( throw UnsupportedOperationException("MutableStateFlow.resetReplayCache is not supported") } - override suspend fun collect(collector: FlowCollector) { + override suspend fun collect(collector: FlowCollector): Nothing { val slot = allocateSlot() try { if (collector is SubscribedFlowCollector) collector.onSubscription() diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index d26839f9ea..91a94eacf5 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -73,6 +73,19 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value override suspend fun emit(value: T) = action(value) }) +/** + * Terminal flow operator that collects the given [SharedFlow] with a provided [action]. + * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. + * + * This is a counterpart of a regular [Flow.collect] extension with the only difference in return type, + * so any code below `collect` will produce compilation warning. + */ +public suspend inline fun SharedFlow.collect(crossinline action: suspend (value: T) -> Unit): Nothing { + collect(object : FlowCollector { + override suspend fun emit(value: T) = action(value) + }) +} + /** * Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element. * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. From 3490f25319d7759523c31d9e2f7aa451151269fd Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 1 Jul 2021 16:17:52 +0300 Subject: [PATCH 2/8] ~nit --- kotlinx-coroutines-core/common/src/flow/SharedFlow.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index bf1c90bd48..db7ffc39be 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -132,7 +132,7 @@ public interface SharedFlow : Flow { /** * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. - * This method should never be implemented or used directly. To collect shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension + * This method should never be used directly. To collect shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension * should be used. * * **Shared flow never completes**. A call to [Flow.collect] or any other terminal operator From 9bef6dde82e9fd7b1ba41af8ca10927044c8a768 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 12:03:07 +0300 Subject: [PATCH 3/8] Update kotlinx-coroutines-core/common/src/flow/SharedFlow.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/SharedFlow.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index db7ffc39be..7cc3383f6d 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -135,7 +135,7 @@ public interface SharedFlow : Flow { * This method should never be used directly. To collect shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension * should be used. * - * **Shared flow never completes**. A call to [Flow.collect] or any other terminal operator + * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator * on a shared flow never complete normally. * * @see [Flow.collect] From aa4d2cd5ee1c722764462394b4cd971c3cd8c879 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 12:03:15 +0300 Subject: [PATCH 4/8] Update kotlinx-coroutines-core/common/src/flow/SharedFlow.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/SharedFlow.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 7cc3383f6d..2fa7d496d7 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -136,7 +136,7 @@ public interface SharedFlow : Flow { * should be used. * * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator - * on a shared flow never complete normally. + * on a shared flow never completes normally. * * @see [Flow.collect] */ From 3be840702f49432758aed59fc4da90d03d6904ee Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 12:03:30 +0300 Subject: [PATCH 5/8] Update kotlinx-coroutines-core/common/src/flow/SharedFlow.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/SharedFlow.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt index 2fa7d496d7..9037c4d1ff 100644 --- a/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/SharedFlow.kt @@ -132,7 +132,7 @@ public interface SharedFlow : Flow { /** * Accepts the given [collector] and [emits][FlowCollector.emit] values into it. - * This method should never be used directly. To collect shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension + * This method should never be used directly. To emit values from a shared flow into a specific collector, either `collector.emitAll(flow)` or `collect { ... }` extension * should be used. * * **A shared flow never completes**. A call to [Flow.collect] or any other terminal operator From 4f0652808fbe2a3358895eabcfd07c092ba9cf96 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 12:03:38 +0300 Subject: [PATCH 6/8] Update kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index 91a94eacf5..095a7f2d76 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -74,7 +74,7 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value }) /** - * Terminal flow operator that collects the given [SharedFlow] with a provided [action]. + * Terminal flow operator that collects the given [SharedFlow] with the provided [action]. * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. * * This is a counterpart of a regular [Flow.collect] extension with the only difference in return type, From 0fd32cff0914656ff63775cd4d189c0fc00dd958 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 12:03:43 +0300 Subject: [PATCH 7/8] Update kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index 095a7f2d76..ce88a35b7a 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -77,8 +77,8 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value * Terminal flow operator that collects the given [SharedFlow] with the provided [action]. * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. * - * This is a counterpart of a regular [Flow.collect] extension with the only difference in return type, - * so any code below `collect` will produce compilation warning. + * This is a counterpart of a regular [Flow.collect] extension, only different in the return type + * so that any code below `collect` produces a compilation warning. */ public suspend inline fun SharedFlow.collect(crossinline action: suspend (value: T) -> Unit): Nothing { collect(object : FlowCollector { From 93e34e31686a86d839b795be4c0e51f27cbe1bb3 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Wed, 20 Oct 2021 12:04:12 +0300 Subject: [PATCH 8/8] Update kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com> --- kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt index ce88a35b7a..f28382f72b 100644 --- a/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt +++ b/kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt @@ -75,7 +75,7 @@ public suspend inline fun Flow.collect(crossinline action: suspend (value /** * Terminal flow operator that collects the given [SharedFlow] with the provided [action]. - * If any exception occurs during collect or in the provided flow, this exception is rethrown from this method. + * If any exception occurs during `collect` or in the provided flow, this exception is rethrown from this method. * * This is a counterpart of a regular [Flow.collect] extension, only different in the return type * so that any code below `collect` produces a compilation warning.