Skip to content

Commit

Permalink
Initial prototype for public SafeCollector with Throwable.isFromDowns…
Browse files Browse the repository at this point in the history
…tream (JVM-only yet) with explicit guarantees on context/exception transparency enforcement

Pre-design for #2860
  • Loading branch information
qwwdfsad committed Aug 12, 2021
1 parent e123c8a commit 02eb7bf
Show file tree
Hide file tree
Showing 12 changed files with 100 additions and 25 deletions.
12 changes: 11 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -861,7 +861,12 @@ public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Seri
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow {
public fun <init> ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun collectSafely (Lkotlinx/coroutines/flow/SafeCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/flow/ExceptionAwareFlowCollector : kotlinx/coroutines/flow/FlowCollector {
public abstract synthetic fun doNotImplementMe ()V
public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z
}

public abstract interface class kotlinx/coroutines/flow/Flow {
Expand Down Expand Up @@ -1060,6 +1065,11 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
public abstract fun setValue (Ljava/lang/Object;)V
}

public abstract interface class kotlinx/coroutines/flow/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
public abstract synthetic fun doNotImplementMe ()V
public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z
}

public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
public abstract fun getReplayCache ()Ljava/util/List;
}
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Expand Up @@ -53,11 +53,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
*/
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
public fun <T> flow(@BuilderInference block: suspend SafeCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
private class SafeFlow<T>(private val block: suspend SafeCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: SafeCollector<T>) {
collector.block()
}
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Flow.kt
Expand Up @@ -207,7 +207,7 @@ public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
val safeCollector = SafeCollectorImpl(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
Expand All @@ -228,5 +228,5 @@ public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
*
* @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
public abstract suspend fun collectSafely(collector: SafeCollector<T>)
}
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/FlowCollector.kt
Expand Up @@ -19,3 +19,13 @@ public interface FlowCollector<in T> {
*/
public suspend fun emit(value: T)
}

// Name TBD
// Contracts: exception transparency preservation, context transparency preservation, isFromDownstream
public interface SafeCollector<in T> : FlowCollector<T> {

public fun Throwable.isFromDownstream(): Boolean

@Deprecated(message = "", level = DeprecationLevel.HIDDEN)
public fun doNotImplementMe()
}
Expand Up @@ -10,18 +10,18 @@ import kotlinx.coroutines.internal.ScopeCoroutine
import kotlin.coroutines.*
import kotlin.jvm.*

internal expect class SafeCollector<T>(
internal expect class SafeCollectorImpl<T>(
collector: FlowCollector<T>,
collectContext: CoroutineContext
) : FlowCollector<T> {
) : SafeCollector<T> {
internal val collector: FlowCollector<T>
internal val collectContext: CoroutineContext
internal val collectContextSize: Int
public fun releaseIntercepted()
}

@JvmName("checkContext") // For prettier stack traces
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
internal fun SafeCollectorImpl<*>.checkContext(currentContext: CoroutineContext) {
val result = currentContext.fold(0) fold@{ count, element ->
val key = element.key
val collectElement = collectContext[key]
Expand Down Expand Up @@ -109,3 +109,9 @@ internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend
}
}
}

private class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>

override val key: CoroutineContext.Key<*> = Key
}
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Expand Up @@ -74,7 +74,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
val safeCollector = SafeCollectorImpl<T>(this, currentCoroutineContext())
try {
safeCollector.action()
} finally {
Expand Down Expand Up @@ -156,7 +156,7 @@ public fun <T> Flow<T>.onCompletion(
throw e
}
// Normal completion
val sc = SafeCollector(this, currentCoroutineContext())
val sc = SafeCollectorImpl(this, currentCoroutineContext())
try {
sc.action(null)
} finally {
Expand Down Expand Up @@ -185,7 +185,7 @@ public fun <T> Flow<T>.onEmpty(
emit(it)
}
if (isEmpty) {
val collector = SafeCollector(this, currentCoroutineContext())
val collector = SafeCollectorImpl(this, currentCoroutineContext())
try {
collector.action()
} finally {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Share.kt
Expand Up @@ -405,7 +405,7 @@ internal class SubscribedFlowCollector<T>(
private val action: suspend FlowCollector<T>.() -> Unit
) : FlowCollector<T> by collector {
suspend fun onSubscription() {
val safeCollector = SafeCollector(collector, currentCoroutineContext())
val safeCollector = SafeCollectorImpl(collector, currentCoroutineContext())
try {
safeCollector.action()
} finally {
Expand Down
Expand Up @@ -25,7 +25,7 @@ class FlowInvariantsTest : TestBase() {
}

private fun <T> abstractFlow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = object : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
override suspend fun collectSafely(collector: SafeCollector<T>) {
collector.block()
}
}
Expand Down
Expand Up @@ -8,10 +8,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal actual class SafeCollector<T> actual constructor(
internal actual class SafeCollectorImpl<T> actual constructor(
internal actual val collector: FlowCollector<T>,
internal actual val collectContext: CoroutineContext
) : FlowCollector<T> {
) : SafeCollector<T> {

// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
Expand All @@ -27,6 +27,15 @@ internal actual class SafeCollector<T> actual constructor(
collector.emit(value)
}

override fun Throwable.isFromDownstream(): Boolean {
// TODO
return true
}

override fun doNotImplementMe() {
TODO()
}

public actual fun releaseIntercepted() {
}
}
Expand Up @@ -18,10 +18,10 @@ private val emitFun =
* in order to properly control 'intercepted()' lifecycle.
*/
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
internal actual class SafeCollectorImpl<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
) : SafeCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {

override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame

Expand Down Expand Up @@ -123,12 +123,14 @@ internal actual class SafeCollector<T> actual constructor(
""".trimIndent())
}

}

internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>
override fun Throwable.isFromDownstream(): Boolean {
val ctx = lastEmissionContext
return ctx is DownstreamExceptionElement && ctx.e == this // TODO also check stracktrace recovery
}

override val key: CoroutineContext.Key<*> = Key
override fun doNotImplementMe() {
TODO()
}
}

private object NoOpContinuation : Continuation<Any?> {
Expand Down
28 changes: 28 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt
Expand Up @@ -74,4 +74,32 @@ class ExceptionTransparencyTest : TestBase() {
assertTrue { e.message!!.contains("channelFlow") }
finish(3)
}

@Test
fun testExamples() = runTest {
try {
flow {
try {
emit(42)
throw TestException()
} catch (e: Throwable) {
println("E1 " + e.isFromDownstream())
}
}.collect()
} catch (e: TestException) {
println("Caught")
}

try {
flow {
try {
emit(42)
} catch (e: Throwable) {
println("E2 " + e.isFromDownstream())
}
}.onEach { throw TestException() }.collect()
} catch (e: TestException) {
println("Caught 2")
}
}
}
Expand Up @@ -8,10 +8,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal actual class SafeCollector<T> actual constructor(
internal actual class SafeCollectorImpl<T> actual constructor(
internal actual val collector: FlowCollector<T>,
internal actual val collectContext: CoroutineContext
) : FlowCollector<T> {
) : SafeCollector<T> {

// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
Expand All @@ -24,9 +24,19 @@ internal actual class SafeCollector<T> actual constructor(
checkContext(currentContext)
lastEmissionContext = currentContext
}

collector.emit(value)
}

override fun Throwable.isFromDownstream(): Boolean {
// TODO
return true
}

override fun doNotImplementMe() {
TODO()
}

public actual fun releaseIntercepted() {
}
}

0 comments on commit 02eb7bf

Please sign in to comment.