diff --git a/benchmarks.jar b/benchmarks.jar new file mode 100644 index 0000000000..d5fc805e76 Binary files /dev/null and b/benchmarks.jar differ diff --git a/kotlinx-coroutines-core/common/src/flow/Builders.kt b/kotlinx-coroutines-core/common/src/flow/Builders.kt index 49ad2922e9..194b11ee13 100644 --- a/kotlinx-coroutines-core/common/src/flow/Builders.kt +++ b/kotlinx-coroutines-core/common/src/flow/Builders.kt @@ -51,7 +51,12 @@ public fun flow(@BuilderInference block: suspend FlowCollector.() -> Unit // Named anonymous object private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : Flow { override suspend fun collect(collector: FlowCollector) { - SafeCollector(collector, coroutineContext).block() + val safeCollector = SafeCollector(collector, coroutineContext) + try { + safeCollector.block() + } finally { + safeCollector.release() + } } } diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt new file mode 100644 index 0000000000..629e8caa52 --- /dev/null +++ b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.common.kt @@ -0,0 +1,112 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.internal.ScopeCoroutine +import kotlin.coroutines.* +import kotlin.jvm.* + +internal expect class SafeCollector( + collector: FlowCollector, + collectContext: CoroutineContext +) : FlowCollector { + internal val collector: FlowCollector + internal val collectContext: CoroutineContext + internal val collectContextSize: Int + public fun release() +} + +@JvmName("checkContext") // For prettier stack traces +internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) { + val result = currentContext.fold(0) fold@{ count, element -> + val key = element.key + val collectElement = collectContext[key] + if (key !== Job) { + return@fold if (element !== collectElement) Int.MIN_VALUE + else count + 1 + } + + val collectJob = collectElement as Job? + val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob) + /* + * Code like + * ``` + * coroutineScope { + * launch { + * emit(1) + * } + * + * launch { + * emit(2) + * } + * } + * ``` + * is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission + * or want to switch context dynamically (e.g. with `withContext`). + * + * Note that collecting from another coroutine is allowed, e.g.: + * ``` + * coroutineScope { + * val channel = produce { + * collect { value -> + * send(value) + * } + * } + * channel.consumeEach { value -> + * emit(value) + * } + * } + * ``` + * is a completely valid. + */ + if (emissionParentJob !== collectJob) { + error( + "Flow invariant is violated:\n" + + "\t\tEmission from another coroutine is detected.\n" + + "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" + + "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" + + "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'" + ) + } + + /* + * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained + * (common transitive parent is "null"), but count check will fail, so just do not count job context element when + * flow is collected from EmptyCoroutineContext + */ + if (collectJob == null) count else count + 1 + } + if (result != collectContextSize) { + error( + "Flow invariant is violated:\n" + + "\t\tFlow was collected in $collectContext,\n" + + "\t\tbut emission happened in $currentContext.\n" + + "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead" + ) + } +} + +@JvmName("checkContext") // For prettier stack traces +internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? { + if (this === null) return null + if (this === collectJob) return this + if (this !is ScopeCoroutine<*>) return this + return parent.transitiveCoroutineParent(collectJob) +} + +/** + * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. + * Used in our own operators where we trust the context of invocations. + */ +@PublishedApi +internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { + return object : Flow { + override suspend fun collect(collector: FlowCollector) { + collector.block() + } + } +} diff --git a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt deleted file mode 100644 index fec0ee96e0..0000000000 --- a/kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. - */ - -package kotlinx.coroutines.flow.internal - -import kotlinx.coroutines.* -import kotlinx.coroutines.flow.* -import kotlinx.coroutines.internal.* -import kotlin.coroutines.* - -internal class SafeCollector( - private val collector: FlowCollector, - private val collectContext: CoroutineContext -) : FlowCollector { - - // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector - private val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } - private var lastEmissionContext: CoroutineContext? = null - - override suspend fun emit(value: T) { - /* - * Benign data-race here: - * We read potentially racy published coroutineContext, but we only use it for - * referential comparison (=> thus safe) and are not using it for structural comparisons. - */ - val currentContext = coroutineContext - // This check is triggered once per flow on happy path. - if (lastEmissionContext !== currentContext) { - checkContext(currentContext) - lastEmissionContext = currentContext - } - collector.emit(value) // TCE - } - - private fun checkContext(currentContext: CoroutineContext) { - val result = currentContext.fold(0) fold@{ count, element -> - val key = element.key - val collectElement = collectContext[key] - if (key !== Job) { - return@fold if (element !== collectElement) Int.MIN_VALUE - else count + 1 - } - - val collectJob = collectElement as Job? - val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob) - /* - * Things like - * ``` - * coroutineScope { - * launch { - * emit(1) - * } - * - * launch { - * emit(2) - * } - * } - * ``` - * are prohibited because 'emit' is not thread-safe by default. Use channelFlow instead if you need concurrent emission - * or want to switch context dynamically (e.g. with `withContext`). - * - * Note that collecting from another coroutine is allowed, e.g.: - * ``` - * coroutineScope { - * val channel = produce { - * collect { value -> - * send(value) - * } - * } - * channel.consumeEach { value -> - * emit(value) - * } - * } - * ``` - * is a completely valid. - */ - if (emissionParentJob !== collectJob) { - error( - "Flow invariant is violated:\n" + - "\t\tEmission from another coroutine is detected.\n" + - "\t\tChild of $emissionParentJob, expected child of $collectJob.\n" + - "\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" + - "\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'" - ) - } - - /* - * If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained - * (common transitive parent is "null"), but count check will fail, so just do not count job context element when - * flow is collected from EmptyCoroutineContext - */ - if (collectJob == null) count else count + 1 - } - if (result != collectContextSize) { - error( - "Flow invariant is violated:\n" + - "\t\tFlow was collected in $collectContext,\n" + - "\t\tbut emission happened in $currentContext.\n" + - "\t\tPlease refer to 'flow' documentation or use 'flowOn' instead" - ) - } - } - - private tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? { - if (this === null) return null - if (this === collectJob) return this - if (this !is ScopeCoroutine<*>) return this - return parent.transitiveCoroutineParent(collectJob) - } -} - -/** - * An analogue of the [flow] builder that does not check the context of execution of the resulting flow. - * Used in our own operators where we trust the context of invocations. - */ -@PublishedApi -internal inline fun unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector.() -> Unit): Flow { - return object : Flow { - override suspend fun collect(collector: FlowCollector) { - collector.block() - } - } -} diff --git a/kotlinx-coroutines-core/common/test/TestBase.common.kt b/kotlinx-coroutines-core/common/test/TestBase.common.kt index 0fdce91fb4..a6119ee8a6 100644 --- a/kotlinx-coroutines-core/common/test/TestBase.common.kt +++ b/kotlinx-coroutines-core/common/test/TestBase.common.kt @@ -50,7 +50,7 @@ public suspend inline fun assertFailsWith(flow: Flow<*>) flow.collect() fail("Should be unreached") } catch (e: Throwable) { - assertTrue(e is T) + assertTrue(e is T, "Expected exception ${T::class}, but had $e instead") } } diff --git a/kotlinx-coroutines-core/common/test/flow/SafeFlowTest.kt b/kotlinx-coroutines-core/common/test/flow/SafeFlowTest.kt new file mode 100644 index 0000000000..eaba11b950 --- /dev/null +++ b/kotlinx-coroutines-core/common/test/flow/SafeFlowTest.kt @@ -0,0 +1,31 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class SafeFlowTest : TestBase() { + + @Test + fun testEmissionsFromDifferentStateMachine() = runTest { + val result = flow { + emit1(1) + emit2(2) + }.onEach { yield() }.toList() + assertEquals(listOf(1, 2), result) + finish(3) + } + + private suspend fun FlowCollector.emit1(expect: Int) { + emit(expect) + expect(expect) + } + + private suspend fun FlowCollector.emit2(expect: Int) { + emit(expect) + expect(expect) + } +} diff --git a/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt new file mode 100644 index 0000000000..e42815d7ef --- /dev/null +++ b/kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* + +internal actual class SafeCollector actual constructor( + internal actual val collector: FlowCollector, + internal actual val collectContext: CoroutineContext +) : FlowCollector { + + // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector + internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } + private var lastEmissionContext: CoroutineContext? = null + + override suspend fun emit(value: T) { + val currentContext = coroutineContext + if (lastEmissionContext !== currentContext) { + checkContext(currentContext) + lastEmissionContext = currentContext + } + collector.emit(value) + } + + public actual fun release() { + } +} diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt new file mode 100644 index 0000000000..a499ed4ed1 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt @@ -0,0 +1,113 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* +import kotlin.coroutines.intrinsics.* +import kotlin.coroutines.jvm.internal.* + +@Suppress("UNCHECKED_CAST") +internal actual class SafeCollector actual constructor( + @JvmField internal actual val collector: FlowCollector, + @JvmField internal actual val collectContext: CoroutineContext +) : FlowCollector { + + // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector + @JvmField + internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } + private var lastEmissionContext: CoroutineContext? = null + private var intercepted: InterceptedContinuationOwner = InterceptedContinuationOwner() + private val emitFun = run { + collector::emit as Function2, Any?> + } + + /* + * Implementor of ContinuationImpl (that will be preserved by ABI implementation nearly forever) + * in order to properly control 'intercepted()' lifecycle. + */ + @Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE") + private inner class InterceptedContinuationOwner : ContinuationImpl(NoOpContinuation, EmptyCoroutineContext) { + @JvmField + public var realCompletion: Continuation? = null + + override val context: CoroutineContext + get() = realCompletion?.context ?: EmptyCoroutineContext + + override fun invokeSuspend(result: Result): Any? { + result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) } + realCompletion?.resumeWith(result as Result) + return COROUTINE_SUSPENDED + } + + // Escalate visibility to manually release intercepted continuation + public override fun releaseIntercepted() { + super.releaseIntercepted() + } + } + + /** + * This is a crafty implementation of state-machine reusing. + * First it checks that it is not used concurrently (which we explicitly prohibit) and + * then just cache an instance of the completion in order to avoid extra allocation on each emit, + * making it effectively garbage-free on its hot-path. + */ + override suspend fun emit(value: T) { + return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> + try { + emit(uCont, value) + } catch (e: Throwable) { + // Save the fact that exception from emit (or even check context) has been thrown + lastEmissionContext = DownstreamExceptionElement(e) + throw e + } + } + } + + private fun emit(uCont: Continuation, value: T): Any? { + val currentContext = uCont.context + // This check is triggered once per flow on happy path. + val previousContext = lastEmissionContext + if (previousContext !== currentContext) { + if (previousContext is DownstreamExceptionElement) { + exceptionTransparencyViolated(previousContext, value) + } + checkContext(currentContext) + lastEmissionContext = currentContext + } + intercepted.realCompletion = uCont + return emitFun(value, intercepted as Continuation) + } + + public actual fun release() { + intercepted.releaseIntercepted() + } + + private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) { + error( + """ + Flow exception transparency is violated: + Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected. + Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead. + For a more detailed explanation, please refer to Flow documentation. + """.trimIndent() + ) + } + +} + +internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element { + companion object Key : CoroutineContext.Key + + override val key: CoroutineContext.Key<*> = Key +} + +private object NoOpContinuation : Continuation { + override val context: CoroutineContext = EmptyCoroutineContext + + override fun resumeWith(result: Result) { + // Nothing + } +} diff --git a/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt b/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt new file mode 100644 index 0000000000..aca9eb9a1f --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt @@ -0,0 +1,77 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import kotlin.test.* + +class ExceptionTransparencyTest : TestBase() { + + @Test + fun testViolation() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + expectUnreached() + } catch (e: CancellationException) { + expect(3) + emit(2) + } + }.take(1) + + assertFailsWith { flow.collect { expect(2) } } + finish(4) + } + + @Test + fun testViolationResumeWith() = runTest { + val flow = flow { + try { + expect(1) + emit(1) + yield() + expectUnreached() + } catch (e: CancellationException) { + expect(3) + emit(2) + } + }.take(1) + + assertFailsWith { + flow.collect { + yield() + expect(2) + } + } + finish(4) + } + + @Test + fun testViolationAfterInvariantVariation() = runTest { + val flow = flow { + coroutineScope { + try { + expect(1) + launch { + expect(2) + emit(1) + }.join() + expectUnreached() + } catch (e: Throwable) { + try { + emit(2) + } catch (e: IllegalStateException) { + assertTrue { e.message!!.contains("exception transparency") } + emit(3) + } + } + } + } + val e = assertFailsWith { flow.collect { expectUnreached() } } + assertTrue { e.message!!.contains("channelFlow") } + finish(3) + } +} diff --git a/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt new file mode 100644 index 0000000000..e42815d7ef --- /dev/null +++ b/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow.internal + +import kotlinx.coroutines.flow.* +import kotlin.coroutines.* + +internal actual class SafeCollector actual constructor( + internal actual val collector: FlowCollector, + internal actual val collectContext: CoroutineContext +) : FlowCollector { + + // Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector + internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 } + private var lastEmissionContext: CoroutineContext? = null + + override suspend fun emit(value: T) { + val currentContext = coroutineContext + if (lastEmissionContext !== currentContext) { + checkContext(currentContext) + lastEmissionContext = currentContext + } + collector.emit(value) + } + + public actual fun release() { + } +}