From 6843648f14867ea52837fb5d71a9c2a3407f8e75 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Thu, 22 Oct 2020 04:09:45 -0700 Subject: [PATCH] Release intercepted SafeCollector when onCompletion block is done (#2323) * Do not use invokeSafely in onCompletion Co-authored-by: Roman Elizarov --- .../common/src/flow/operators/Emitters.kt | 7 ++- .../jvm/src/flow/internal/SafeCollector.kt | 1 - .../OnCompletionInterceptedReleaseTest.kt | 45 +++++++++++++++++++ 3 files changed, 51 insertions(+), 2 deletions(-) create mode 100644 kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt index 3ffe5fe943..8be19f08e0 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt @@ -158,7 +158,12 @@ public fun Flow.onCompletion( throw e } // Normal completion - SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null) + val sc = SafeCollector(this, currentCoroutineContext()) + try { + sc.action(null) + } finally { + sc.releaseIntercepted() + } } /** diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt index b275a481cc..ab42b6345f 100644 --- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt +++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt @@ -55,7 +55,6 @@ internal actual class SafeCollector actual constructor( */ override suspend fun emit(value: T) { return suspendCoroutineUninterceptedOrReturn sc@{ uCont -> - // Update information about caller for stackwalking try { emit(uCont, value) } catch (e: Throwable) { diff --git a/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt new file mode 100644 index 0000000000..a6268b5156 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.flow + +import kotlinx.coroutines.* +import org.junit.Test +import kotlin.coroutines.* +import kotlin.test.* + +class OnCompletionInterceptedReleaseTest : TestBase() { + @Test + fun testLeak() = runTest { + expect(1) + var cont: Continuation? = null + val interceptor = CountingInterceptor() + val job = launch(interceptor, start = CoroutineStart.UNDISPATCHED) { + emptyFlow() + .onCompletion { emit(1) } + .collect { value -> + expect(2) + assertEquals(1, value) + suspendCoroutine { cont = it } + } + } + cont!!.resume(Unit) + assertTrue(job.isCompleted) + assertEquals(interceptor.intercepted, interceptor.released) + finish(3) + } + + class CountingInterceptor : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { + var intercepted = 0 + var released = 0 + override fun interceptContinuation(continuation: Continuation): Continuation { + intercepted++ + return Continuation(continuation.context) { continuation.resumeWith(it) } + } + + override fun releaseInterceptedContinuation(continuation: Continuation<*>) { + released++ + } + } +} \ No newline at end of file