diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index f74155eb8f..1a0169b65d 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -107,7 +107,7 @@ internal open class CancellableContinuationImpl( } } - private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this) + private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable() /** * Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work. @@ -115,7 +115,7 @@ internal open class CancellableContinuationImpl( */ @JvmName("resetStateReusable") // Prettier stack traces internal fun resetStateReusable(): Boolean { - assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl + assert { resumeMode == MODE_CANCELLABLE_REUSABLE } assert { parentHandle !== NonDisposableHandle } val state = _state.value assert { state !is NotCompleted } @@ -164,8 +164,7 @@ internal open class CancellableContinuationImpl( * Attempt to postpone cancellation for reusable cancellable continuation */ private fun cancelLater(cause: Throwable): Boolean { - if (!resumeMode.isReusableMode) return false - // Ensure that we are postponing cancellation to the right instance + // Ensure that we are postponing cancellation to the right reusable instance if (!isReusable()) return false val dispatched = delegate as DispatchedContinuation<*> return dispatched.postponeCancellation(cause) diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt index 45b9699c84..c689a38186 100644 --- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt +++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt @@ -58,40 +58,30 @@ internal class DispatchedContinuation( */ private val _reusableCancellableContinuation = atomic(null) - public val reusableCancellableContinuation: CancellableContinuationImpl<*>? + private val reusableCancellableContinuation: CancellableContinuationImpl<*>? get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*> - public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean { + fun isReusable(): Boolean { /* + Invariant: caller.resumeMode.isReusableMode * Reusability control: * `null` -> no reusability at all, `false` - * If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true - * Else, if result is CCI === requester, then it's our reusable continuation - * Identity check my fail for the following pattern: - * ``` - * loop: - * suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle - * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise - * it will leak because it won't be freed by `releaseInterceptedContinuation` - * ``` + * anything else -> reusable. */ - val value = _reusableCancellableContinuation.value ?: return false - if (value is CancellableContinuationImpl<*>) return value === requester - return true + return _reusableCancellableContinuation.value != null } - /** * Awaits until previous call to `suspendCancellableCoroutineReusable` will * stop mutating cached instance */ - public fun awaitReusability() { - _reusableCancellableContinuation.loop { it -> + fun awaitReusability() { + _reusableCancellableContinuation.loop { if (it !== REUSABLE_CLAIMED) return } } - public fun release() { + fun release() { /* * Called from `releaseInterceptedContinuation`, can be concurrent with * the code in `getResult` right after `trySuspend` returned `true`, so we have diff --git a/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt new file mode 100644 index 0000000000..a256815db3 --- /dev/null +++ b/kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines + +import kotlinx.coroutines.flow.* +import org.junit.* + +class ReusableContinuationStressTest : TestBase() { + + private val iterations = 1000 * stressTestMultiplierSqrt + + @Test // Originally reported by @denis-bezrukov in #2736 + fun testDebounceWithStateFlow() = runBlocking { + withContext(Dispatchers.Default) { + repeat(iterations) { + launch { // <- load the dispatcher and OS scheduler + runStressTestOnce(1, 1) + } + } + } + } + + private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope { + val stateFlow = MutableStateFlow(0) + val emitter = launch { + repeat(1000) { i -> + stateFlow.emit(i) + delay(delay.toLong()) + } + } + var last = 0 + stateFlow.debounce(debounce.toLong()).take(100).collect { i -> + if (i - last > 100) { + last = i + } + } + emitter.cancel() + } +}