diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt index 62cc2e5c50..2573d30da0 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt @@ -6,10 +6,11 @@ package benchmarks.flow.scrabble import kotlinx.coroutines.* import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow import org.openjdk.jmh.annotations.* -import java.lang.Long.* import java.util.* import java.util.concurrent.* +import kotlin.math.* @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt index 87d0e61232..fa944fac84 100644 --- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt +++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt @@ -5,10 +5,11 @@ package benchmarks.flow.scrabble import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* import org.openjdk.jmh.annotations.* import java.lang.Long.* import java.util.* -import java.util.concurrent.* +import java.util.concurrent.TimeUnit @Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) @Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS) diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 0cc9b57dec..1f67dd3c6c 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -85,12 +85,13 @@ internal open class CancellableContinuationImpl( // This method does nothing. Leftover for binary compatibility with old compiled code } - private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable + private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) /** * Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work. * Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state. */ + @JvmName("resetState") // Prettier stack traces internal fun resetState(): Boolean { assert { parentHandle !== NonDisposableHandle } val state = _state.value diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index acb6c48513..50758146af 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -63,8 +63,24 @@ internal class DispatchedContinuation( public val reusableCancellableContinuation: CancellableContinuationImpl<*>? get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*> - public val isReusable: Boolean - get() = _reusableCancellableContinuation.value != null + public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { + /* + * Reusability control: + * `null` -> no reusability at all, false + * If current state is not CCI, then we are within `suspendAtomicCancellableCoroutineReusable`, true + * Else, if result is CCI === requester. + * Identity check my fail for the following pattern: + * ``` + * loop: + * suspendAtomicCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle + * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise + * it will leak because it won't be freed by `releaseInterceptedContinuation` + * ``` + */ + val value = _reusableCancellableContinuation.value ?: return false + if (value is CancellableContinuationImpl<*>) return value === requester + return true + } /** * Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block, diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt index 997f746118..5f5620c632 100644 --- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt +++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt @@ -192,4 +192,17 @@ class ReusableCancellableContinuationTest : TestBase() { FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> } finish(3) } + + @Test + fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest { + val channel = produce { + repeat(10) { + send(Unit) + } + } + for (value in channel) { + delay(1) + } + FieldWalker.assertReachableCount(1, coroutineContext[Job], { it is ChildContinuation }) + } }