Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial prototype for public SafeCollector with Throwable.isFromDowns… #2875

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Expand Up @@ -861,7 +861,7 @@ public final class kotlinx/coroutines/debug/internal/DebuggerInfo : java/io/Seri
public abstract class kotlinx/coroutines/flow/AbstractFlow : kotlinx/coroutines/flow/CancellableFlow, kotlinx/coroutines/flow/Flow {
public fun <init> ()V
public final fun collect (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun collectSafely (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun collectSafely (Lkotlinx/coroutines/flow/SafeCollector;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public abstract interface class kotlinx/coroutines/flow/Flow {
Expand Down Expand Up @@ -1060,6 +1060,11 @@ public abstract interface class kotlinx/coroutines/flow/MutableStateFlow : kotli
public abstract fun setValue (Ljava/lang/Object;)V
}

public abstract interface class kotlinx/coroutines/flow/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
public abstract synthetic fun doNotImplementMe ()V
public abstract fun isFromDownstream (Ljava/lang/Throwable;)Z
}

public abstract interface class kotlinx/coroutines/flow/SharedFlow : kotlinx/coroutines/flow/Flow {
public abstract fun getReplayCache ()Ljava/util/List;
}
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Expand Up @@ -53,11 +53,11 @@ import kotlinx.coroutines.flow.internal.unsafeFlow as flow
*
* If you want to switch the context of execution of a flow, use the [flowOn] operator.
*/
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
public fun <T> flow(@BuilderInference block: suspend SafeCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
private class SafeFlow<T>(private val block: suspend SafeCollector<T>.() -> Unit) : AbstractFlow<T>() {
override suspend fun collectSafely(collector: SafeCollector<T>) {
collector.block()
}
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/flow/Flow.kt
Expand Up @@ -207,7 +207,7 @@ public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {

@InternalCoroutinesApi
public final override suspend fun collect(collector: FlowCollector<T>) {
val safeCollector = SafeCollector(collector, coroutineContext)
val safeCollector = SafeCollectorImpl(collector, coroutineContext)
try {
collectSafely(safeCollector)
} finally {
Expand All @@ -228,5 +228,5 @@ public abstract class AbstractFlow<T> : Flow<T>, CancellableFlow<T> {
*
* @throws IllegalStateException if any of the invariants are violated.
*/
public abstract suspend fun collectSafely(collector: FlowCollector<T>)
public abstract suspend fun collectSafely(collector: SafeCollector<T>)
}
10 changes: 10 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/FlowCollector.kt
Expand Up @@ -19,3 +19,13 @@ public interface FlowCollector<in T> {
*/
public suspend fun emit(value: T)
}

// Name TBD
// Contracts: exception transparency preservation, context transparency preservation, isFromDownstream
public interface SafeCollector<in T> : FlowCollector<T> {

public fun Throwable.isFromDownstream(): Boolean

@Deprecated(message = "", level = DeprecationLevel.HIDDEN)
public fun doNotImplementMe()
}
Expand Up @@ -10,18 +10,18 @@ import kotlinx.coroutines.internal.ScopeCoroutine
import kotlin.coroutines.*
import kotlin.jvm.*

internal expect class SafeCollector<T>(
internal expect class SafeCollectorImpl<T>(
collector: FlowCollector<T>,
collectContext: CoroutineContext
) : FlowCollector<T> {
) : SafeCollector<T> {
internal val collector: FlowCollector<T>
internal val collectContext: CoroutineContext
internal val collectContextSize: Int
public fun releaseIntercepted()
}

@JvmName("checkContext") // For prettier stack traces
internal fun SafeCollector<*>.checkContext(currentContext: CoroutineContext) {
internal fun SafeCollectorImpl<*>.checkContext(currentContext: CoroutineContext) {
val result = currentContext.fold(0) fold@{ count, element ->
val key = element.key
val collectElement = collectContext[key]
Expand Down Expand Up @@ -109,3 +109,9 @@ internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend
}
}
}

private class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>

override val key: CoroutineContext.Key<*> = Key
}
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
Expand Up @@ -74,7 +74,7 @@ internal inline fun <T, R> Flow<T>.unsafeTransform(
public fun <T> Flow<T>.onStart(
action: suspend FlowCollector<T>.() -> Unit
): Flow<T> = unsafeFlow { // Note: unsafe flow is used here, but safe collector is used to invoke start action
val safeCollector = SafeCollector<T>(this, currentCoroutineContext())
val safeCollector = SafeCollectorImpl<T>(this, currentCoroutineContext())
try {
safeCollector.action()
} finally {
Expand Down Expand Up @@ -156,7 +156,7 @@ public fun <T> Flow<T>.onCompletion(
throw e
}
// Normal completion
val sc = SafeCollector(this, currentCoroutineContext())
val sc = SafeCollectorImpl(this, currentCoroutineContext())
try {
sc.action(null)
} finally {
Expand Down Expand Up @@ -185,7 +185,7 @@ public fun <T> Flow<T>.onEmpty(
emit(it)
}
if (isEmpty) {
val collector = SafeCollector(this, currentCoroutineContext())
val collector = SafeCollectorImpl(this, currentCoroutineContext())
try {
collector.action()
} finally {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Share.kt
Expand Up @@ -405,7 +405,7 @@ internal class SubscribedFlowCollector<T>(
private val action: suspend FlowCollector<T>.() -> Unit
) : FlowCollector<T> by collector {
suspend fun onSubscription() {
val safeCollector = SafeCollector(collector, currentCoroutineContext())
val safeCollector = SafeCollectorImpl(collector, currentCoroutineContext())
try {
safeCollector.action()
} finally {
Expand Down
Expand Up @@ -25,7 +25,7 @@ class FlowInvariantsTest : TestBase() {
}

private fun <T> abstractFlow(block: suspend FlowCollector<T>.() -> Unit): Flow<T> = object : AbstractFlow<T>() {
override suspend fun collectSafely(collector: FlowCollector<T>) {
override suspend fun collectSafely(collector: SafeCollector<T>) {
collector.block()
}
}
Expand Down
Expand Up @@ -8,10 +8,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal actual class SafeCollector<T> actual constructor(
internal actual class SafeCollectorImpl<T> actual constructor(
internal actual val collector: FlowCollector<T>,
internal actual val collectContext: CoroutineContext
) : FlowCollector<T> {
) : SafeCollector<T> {

// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
Expand All @@ -27,6 +27,15 @@ internal actual class SafeCollector<T> actual constructor(
collector.emit(value)
}

override fun Throwable.isFromDownstream(): Boolean {
// TODO
return true
}

override fun doNotImplementMe() {
TODO()
}

public actual fun releaseIntercepted() {
}
}
Expand Up @@ -18,10 +18,10 @@ private val emitFun =
* in order to properly control 'intercepted()' lifecycle.
*/
@Suppress("CANNOT_OVERRIDE_INVISIBLE_MEMBER", "INVISIBLE_MEMBER", "INVISIBLE_REFERENCE", "UNCHECKED_CAST")
internal actual class SafeCollector<T> actual constructor(
internal actual class SafeCollectorImpl<T> actual constructor(
@JvmField internal actual val collector: FlowCollector<T>,
@JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
) : SafeCollector<T>, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {

override val callerFrame: CoroutineStackFrame? get() = completion as? CoroutineStackFrame

Expand Down Expand Up @@ -123,12 +123,14 @@ internal actual class SafeCollector<T> actual constructor(
""".trimIndent())
}

}

internal class DownstreamExceptionElement(@JvmField val e: Throwable) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<DownstreamExceptionElement>
override fun Throwable.isFromDownstream(): Boolean {
val ctx = lastEmissionContext
return ctx is DownstreamExceptionElement && ctx.e == this // TODO also check stracktrace recovery
}

override val key: CoroutineContext.Key<*> = Key
override fun doNotImplementMe() {
TODO()
}
}

private object NoOpContinuation : Continuation<Any?> {
Expand Down
28 changes: 28 additions & 0 deletions kotlinx-coroutines-core/jvm/test/flow/ExceptionTransparencyTest.kt
Expand Up @@ -74,4 +74,32 @@ class ExceptionTransparencyTest : TestBase() {
assertTrue { e.message!!.contains("channelFlow") }
finish(3)
}

@Test
fun testExamples() = runTest {
try {
flow {
try {
emit(42)
throw TestException()
} catch (e: Throwable) {
println("E1 " + e.isFromDownstream())
}
}.collect()
} catch (e: TestException) {
println("Caught")
}

try {
flow {
try {
emit(42)
} catch (e: Throwable) {
println("E2 " + e.isFromDownstream())
}
}.onEach { throw TestException() }.collect()
} catch (e: TestException) {
println("Caught 2")
}
}
}
Expand Up @@ -8,10 +8,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.coroutines.*

internal actual class SafeCollector<T> actual constructor(
internal actual class SafeCollectorImpl<T> actual constructor(
internal actual val collector: FlowCollector<T>,
internal actual val collectContext: CoroutineContext
) : FlowCollector<T> {
) : SafeCollector<T> {

// Note, it is non-capturing lambda, so no extra allocation during init of SafeCollector
internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
Expand All @@ -24,9 +24,19 @@ internal actual class SafeCollector<T> actual constructor(
checkContext(currentContext)
lastEmissionContext = currentContext
}

collector.emit(value)
}

override fun Throwable.isFromDownstream(): Boolean {
// TODO
return true
}

override fun doNotImplementMe() {
TODO()
}

public actual fun releaseIntercepted() {
}
}