Skip to content

Commit

Permalink
Make SafeCollector platform-specific declaration and enforce exceptio…
Browse files Browse the repository at this point in the history
…n transparency invariant on JVM

    * Make it in allocation-free manner by using crafty trick with casting KSuspendFunction to Function and pass reusable object as a completion

Fixes #1657
  • Loading branch information
qwwdfsad committed Feb 10, 2020
1 parent 09cb4bf commit a07e94c
Show file tree
Hide file tree
Showing 10 changed files with 400 additions and 126 deletions.
Binary file added benchmarks.jar
Binary file not shown.
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/Builders.kt
Expand Up @@ -51,7 +51,12 @@ public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
SafeCollector(collector, coroutineContext).block()
val safeCollector = SafeCollector(collector, coroutineContext)
try {
safeCollector.block()
} finally {
safeCollector.release()
}
}
}

Expand Down
@@ -0,0 +1,112 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.internal.ScopeCoroutine
import kotlin.coroutines.*
import kotlin.jvm.*

internal expect class SafeCollector<T>(
collector: FlowCollector<T>,
collectContext: CoroutineContext
) : FlowCollector<T> {
internal val collector: FlowCollector<T>
internal val collectContext: CoroutineContext
internal val collectContextSize: Int
public fun release()
}

@JvmName("checkContext") // For prettier stack traces
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
val result = currentContext.fold(0) fold@{ count, element ->
val key = element.key
val collectElement = collectContext[key]
if (key !== Job) {
return@fold if (element !== collectElement) Int.MIN_VALUE
else count + 1
}

val collectJob = collectElement as Job?
val emissionParentJob = (element as Job).transitiveCoroutineParent(collectJob)
/*
* Code like
* ```
* coroutineScope {
* launch {
* emit(1)
* }
*
* launch {
* emit(2)
* }
* }
* ```
* is prohibited because 'emit' is not thread-safe by default. Use 'channelFlow' instead if you need concurrent emission
* or want to switch context dynamically (e.g. with `withContext`).
*
* Note that collecting from another coroutine is allowed, e.g.:
* ```
* coroutineScope {
* val channel = produce {
* collect { value ->
* send(value)
* }
* }
* channel.consumeEach { value ->
* emit(value)
* }
* }
* ```
* is a completely valid.
*/
if (emissionParentJob !== collectJob) {
error(
"Flow invariant is violated:\n" +
"\t\tEmission from another coroutine is detected.\n" +
"\t\tChild of $emissionParentJob, expected child of $collectJob.\n" +
"\t\tFlowCollector is not thread-safe and concurrent emissions are prohibited.\n" +
"\t\tTo mitigate this restriction please use 'channelFlow' builder instead of 'flow'"
)
}

/*
* If collect job is null (-> EmptyCoroutineContext, probably run from `suspend fun main`), then invariant is maintained
* (common transitive parent is "null"), but count check will fail, so just do not count job context element when
* flow is collected from EmptyCoroutineContext
*/
if (collectJob == null) count else count + 1
}
if (result != collectContextSize) {
error(
"Flow invariant is violated:\n" +
"\t\tFlow was collected in $collectContext,\n" +
"\t\tbut emission happened in $currentContext.\n" +
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead"
)
}
}

@JvmName("checkContext") // For prettier stack traces
internal tailrec fun Job?.transitiveCoroutineParent(collectJob: Job?): Job? {
if (this === null) return null
if (this === collectJob) return this
if (this !is ScopeCoroutine<*>) return this
return parent.transitiveCoroutineParent(collectJob)
}

/**
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
* Used in our own operators where we trust the context of invocations.
*/
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
124 changes: 0 additions & 124 deletions kotlinx-coroutines-core/common/src/flow/internal/SafeCollector.kt

This file was deleted.

2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/test/TestBase.common.kt
Expand Up @@ -50,7 +50,7 @@ public suspend inline fun <reified T : Throwable> assertFailsWith(flow: Flow<*>)
flow.collect()
fail("Should be unreached")
} catch (e: Throwable) {
assertTrue(e is T)
assertTrue(e is T, "Expected exception ${T::class}, but had $e instead")
}
}

Expand Down
31 changes: 31 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/SafeFlowTest.kt
@@ -0,0 +1,31 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*

class SafeFlowTest : TestBase() {

@Test
fun testEmissionsFromDifferentStateMachine() = runTest {
val result = flow<Int> {
emit1(1)
emit2(2)
}.onEach { yield() }.toList()
assertEquals(listOf(1, 2), result)
finish(3)
}

private suspend fun FlowCollector<Int>.emit1(expect: Int) {
emit(expect)
expect(expect)
}

private suspend fun FlowCollector<Int>.emit2(expect: Int) {
emit(expect)
expect(expect)
}
}
30 changes: 30 additions & 0 deletions kotlinx-coroutines-core/js/src/flow/internal/SafeCollector.kt
@@ -0,0 +1,30 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow.internal

import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal actual class SafeCollector<T> actual constructor(
internal actual val collector: FlowCollector<T>,
internal actual val collectContext: CoroutineContext
) : FlowCollector<T> {

// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
private var lastEmissionContext: CoroutineContext? = null

override suspend fun emit(value: T) {
val currentContext = coroutineContext
if (lastEmissionContext !== currentContext) {
checkContext(currentContext)
lastEmissionContext = currentContext
}
collector.emit(value)
}

public actual fun release() {
}
}

0 comments on commit a07e94c

Please sign in to comment.