From 1901a16e0b1a4ba64fdadfba5ae8784664884b3f Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 12 Aug 2021 14:54:40 +0300 Subject: [PATCH 1/2] Initial prototype for public SafeCollector with Throwable.isFromDownstream (JVM-only yet) with explicit guarantees on context/exception transparency enforcement Pre-design for #2860 --- .../api/kotlinx-coroutines-core.api | 12 +++++++- .../common/src/flow/Builders.kt | 6 ++-- .../common/src/flow/Flow.kt | 4 +-- .../common/src/flow/FlowCollector.kt | 10 +++++++ .../src/flow/internal/SafeCollector.common.kt | 12 ++++++-- .../common/src/flow/operators/Emitters.kt | 6 ++-- .../common/src/flow/operators/Share.kt | 2 +- .../common/test/flow/FlowInvariantsTest.kt | 2 +- ...{SafeCollector.kt => SafeCollectorImpl.kt} | 13 +++++++-- ...{SafeCollector.kt => SafeCollectorImpl.kt} | 16 ++++++----- .../test/flow/ExceptionTransparencyTest.kt | 28 +++++++++++++++++++ ...{SafeCollector.kt => SafeCollectorImpl.kt} | 14 ++++++++-- 12 files changed, 100 insertions(+), 25 deletions(-) rename kotlinx-coroutines-core/js/src/flow/internal/{SafeCollector.kt => SafeCollectorImpl.kt} (79%) rename kotlinx-coroutines-core/jvm/src/flow/internal/{SafeCollector.kt => SafeCollectorImpl.kt} (93%) rename kotlinx-coroutines-core/native/src/flow/internal/{SafeCollector.kt => SafeCollectorImpl.kt} (79%) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 50bfb60d62..451145a43a 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -861,7 +861,12 @@ public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Seri public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow { public fun ()V public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; - public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; + public abstract fun collectSafely (Lkotlinx/coroutines/flow/SafeCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; +} + +public abstract interface class kotlinx/coroutines/flow/ExceptionAwareFlowCollector : kotlinx/coroutines/flow/FlowCollector { + public abstract synthetic fun doNotImplementMe ()V + public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z } public abstract interface class kotlinx/coroutines/flow/Flow { @@ -1060,6 +1065,11 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli public abstract fun setValue (Ljava/lang/Object;)V } +public abstract interface class kotlinx/coroutines/flow/SafeCollector : kotlinx/coroutines/flow/FlowCollector { + public abstract synthetic fun doNotImplementMe ()V + public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z +} + public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow { public abstract fun getReplayCache ()Ljava/util/List; } diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 66b55a90c0..cb02eedb2e 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -53,11 +53,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow * * If you want to switch the context of execution of a flow, use the [flowOn] operator. */ -public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block) +public fun flow(@BuilderInference block: suspend SafeCollector.() -> Unit): Flow = SafeFlow(block) // Named anonymous object -private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() { - override suspend fun collectSafely(collector: FlowCollector) { +private class SafeFlow(private val block: suspend SafeCollector.() -> Unit) : AbstractFlow() { + override suspend fun collectSafely(collector: SafeCollector) { collector.block() } } diff --git a/kotlinx-coroutines-core/common/src/flow/Flow.kt b/kotlinx-coroutines-core/common/src/flow/Flow.kt index 0ccd343ead..be3eb86800 100644 --- a/kotlinx-coroutines-core/common/src/flow/Flow.kt +++ b/kotlinx-coroutines-core/common/src/flow/Flow.kt @@ -207,7 +207,7 @@ public abstract class AbstractFlow : Flow, CancellableFlow { @InternalCoroutinesApi public final override suspend fun collect(collector: FlowCollector) { - val safeCollector = SafeCollector(collector, coroutineContext) + val safeCollector = SafeCollectorImpl(collector, coroutineContext) try { collectSafely(safeCollector) } finally { @@ -228,5 +228,5 @@ public abstract class AbstractFlow : Flow, CancellableFlow { * * @throws IllegalStateException if any of the invariants are violated. */ - public abstract suspend fun collectSafely(collector: FlowCollector) + public abstract suspend fun collectSafely(collector: SafeCollector) } diff --git a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt index d1c1565cb0..8d39190bc6 100644 --- a/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt +++ b/kotlinx-coroutines-core/common/src/flow/FlowCollector.kt @@ -19,3 +19,13 @@ public interface FlowCollector { */ public suspend fun emit(value: T) } + +// Name TBD +// Contracts: exception transparency preservation, context transparency preservation, isFromDownstream +public interface SafeCollector : FlowCollector { + + public fun Throwable.isFromDownstream(): Boolean + + @Deprecated(message = "", level = DeprecationLevel.HIDDEN) + public fun doNotImplementMe() +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt index 006da41f1a..380fb81d20 100644 --- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt +++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt @@ -10,10 +10,10 @@ import kotlinx.coroutines.internal.ScopeCoroutine import kotlin.coroutines.* import kotlin.jvm.* -internal expect class SafeCollector( +internal expect class SafeCollectorImpl( collector: FlowCollector, collectContext: CoroutineContext -) : FlowCollector { +) : SafeCollector { internal val collector: FlowCollector internal val collectContext: CoroutineContext internal val collectContextSize: Int @@ -21,7 +21,7 @@ internal expect class SafeCollector( } @JvmName("checkContext") // For prettier stack traces -internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) { +internal fun SafeCollectorImpl<*>.checkContext(currentContext: CoroutineContext) { val result = currentContext.fold(0) fold@{ count, element -> val key = element.key val collectElement = collectContext[key] @@ -109,3 +109,9 @@ internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend } } } + +private class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element { + companion object Key : CoroutineContext.Key + + override val key: CoroutineContext.Key<*> = Key +} diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index 90879a97f3..0ab6766420 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -74,7 +74,7 @@ internal inline fun Flow.unsafeTransform( public fun Flow.onStart( action: suspend FlowCollector.() -> Unit ): Flow = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action - val safeCollector = SafeCollector(this, currentCoroutineContext()) + val safeCollector = SafeCollectorImpl(this, currentCoroutineContext()) try { safeCollector.action() } finally { @@ -156,7 +156,7 @@ public fun Flow.onCompletion( throw e } // Normal completion - val sc = SafeCollector(this, currentCoroutineContext()) + val sc = SafeCollectorImpl(this, currentCoroutineContext()) try { sc.action(null) } finally { @@ -185,7 +185,7 @@ public fun Flow.onEmpty( emit(it) } if (isEmpty) { - val collector = SafeCollector(this, currentCoroutineContext()) + val collector = SafeCollectorImpl(this, currentCoroutineContext()) try { collector.action() } finally { diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt index 4fa74d8e50..ebcfb59585 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Share.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Share.kt @@ -405,7 +405,7 @@ internal class SubscribedFlowCollector( private val action: suspend FlowCollector.() -> Unit ) : FlowCollector by collector { suspend fun onSubscription() { - val safeCollector = SafeCollector(collector, currentCoroutineContext()) + val safeCollector = SafeCollectorImpl(collector, currentCoroutineContext()) try { safeCollector.action() } finally { diff --git a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt index b5f1bf7bb8..7edd71e95c 100644 --- a/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/FlowInvariantsTest.kt @@ -25,7 +25,7 @@ class FlowInvariantsTest : TestBase() { } private fun abstractFlow(block: suspend FlowCollector.() -> Unit): Flow = object : AbstractFlow() { - override suspend fun collectSafely(collector: FlowCollector) { + override suspend fun collectSafely(collector: SafeCollector) { collector.block() } } diff --git a/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/js/src/flow/internal/SafeCollectorImpl.kt similarity index 79% rename from kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt rename to kotlinx-coroutines-core/js/src/flow/internal/SafeCollectorImpl.kt index b74b547d31..db5ba5054e 100644 --- a/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/js/src/flow/internal/SafeCollectorImpl.kt @@ -8,10 +8,10 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.coroutines.* -internal actual class SafeCollector actual constructor( +internal actual class SafeCollectorImpl actual constructor( internal actual val collector: FlowCollector, internal actual val collectContext: CoroutineContext -) : FlowCollector { +) : SafeCollector { // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } @@ -27,6 +27,15 @@ internal actual class SafeCollector actual constructor( collector.emit(value) } + override fun Throwable.isFromDownstream(): Boolean { + // TODO + return true + } + + override fun doNotImplementMe() { + TODO() + } + public actual fun releaseIntercepted() { } } diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollectorImpl.kt similarity index 93% rename from kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt rename to kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollectorImpl.kt index ea973287a7..6e4c2c5f10 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollectorImpl.kt @@ -18,10 +18,10 @@ private val emitFun = * in order to properly control 'intercepted()' lifecycle. */ @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST") -internal actual class SafeCollector actual constructor( +internal actual class SafeCollectorImpl actual constructor( @JvmField internal actual val collector: FlowCollector, @JvmField internal actual val collectContext: CoroutineContext -) : FlowCollector, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame { +) : SafeCollector, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame { override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame @@ -123,12 +123,14 @@ internal actual class SafeCollector actual constructor( """.trimIndent()) } -} - -internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element { - companion object Key : CoroutineContext.Key + override fun Throwable.isFromDownstream(): Boolean { + val ctx = lastEmissionContext + return ctx is DownstreamExceptionElement && ctx.e == this // TODO also check stracktrace recovery + } - override val key: CoroutineContext.Key<*> = Key + override fun doNotImplementMe() { + TODO() + } } private object NoOpContinuation : Continuation { diff --git a/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt b/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt index aca9eb9a1f..10af603385 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt @@ -74,4 +74,32 @@ class ExceptionTransparencyTest : TestBase() { assertTrue { e.message!!.contains("channelFlow") } finish(3) } + + @Test + fun testExamples() = runTest { + try { + flow { + try { + emit(42) + throw TestException() + } catch (e: Throwable) { + println("E1 " + e.isFromDownstream()) + } + }.collect() + } catch (e: TestException) { + println("Caught") + } + + try { + flow { + try { + emit(42) + } catch (e: Throwable) { + println("E2 " + e.isFromDownstream()) + } + }.onEach { throw TestException() }.collect() + } catch (e: TestException) { + println("Caught 2") + } + } } diff --git a/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/native/src/flow/internal/SafeCollectorImpl.kt similarity index 79% rename from kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt rename to kotlinx-coroutines-core/native/src/flow/internal/SafeCollectorImpl.kt index b74b547d31..b9f456ee28 100644 --- a/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/native/src/flow/internal/SafeCollectorImpl.kt @@ -8,10 +8,10 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.* import kotlin.coroutines.* -internal actual class SafeCollector actual constructor( +internal actual class SafeCollectorImpl actual constructor( internal actual val collector: FlowCollector, internal actual val collectContext: CoroutineContext -) : FlowCollector { +) : SafeCollector { // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } @@ -24,9 +24,19 @@ internal actual class SafeCollector actual constructor( checkContext(currentContext) lastEmissionContext = currentContext } + collector.emit(value) } + override fun Throwable.isFromDownstream(): Boolean { + // TODO + return true + } + + override fun doNotImplementMe() { + TODO() + } + public actual fun releaseIntercepted() { } } From aa09499f22c18a8888c51bb910cae5895446f0ec Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 12 Aug 2021 14:56:28 +0300 Subject: [PATCH 2/2] ~api dump --- kotlinx-coroutines-core/api/kotlinx-coroutines-core.api | 5 ----- 1 file changed, 5 deletions(-) diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 451145a43a..161942b3c7 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -864,11 +864,6 @@ public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/ public abstract fun collectSafely (Lkotlinx/coroutines/flow/SafeCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; } -public abstract interface class kotlinx/coroutines/flow/ExceptionAwareFlowCollector : kotlinx/coroutines/flow/FlowCollector { - public abstract synthetic fun doNotImplementMe ()V - public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z -} - public abstract interface class kotlinx/coroutines/flow/Flow { public abstract fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; }