Skip to content

Commit

Permalink
Properly detect non-released reusable continuations in non-reusable o… (
Browse files Browse the repository at this point in the history
Kotlin#2772)

* Properly detect non-released reusable continuations in non-reusable ones and await for reusability to have a consistent state
* Ensure that the caller to DispatchedContinuation.isReusable is reusable itself
* Using the previous invariant, simplify DispatchedContinuation.isReusable to a single null-check
* It also restores the invariant that `cc.isReusable() == cc.resumeMode.isReusableMode`

Fixes Kotlin#2736
Fixes Kotlin#2768
  • Loading branch information
qwwdfsad authored and pablobaxter committed Sep 14, 2022
1 parent b6ff228 commit f661918
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 22 deletions.
Expand Up @@ -107,15 +107,15 @@ internal open class CancellableContinuationImpl<in T>(
}
}

private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
private fun isReusable(): Boolean = resumeMode.isReusableMode && (delegate as DispatchedContinuation<*>).isReusable()

/**
* Resets cancellability state in order to [suspendCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
@JvmName("resetStateReusable") // Prettier stack traces
internal fun resetStateReusable(): Boolean {
assert { resumeMode == MODE_CANCELLABLE_REUSABLE } // invalid mode for CancellableContinuationImpl
assert { resumeMode == MODE_CANCELLABLE_REUSABLE }
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
assert { state !is NotCompleted }
Expand Down Expand Up @@ -164,8 +164,7 @@ internal open class CancellableContinuationImpl<in T>(
* Attempt to postpone cancellation for reusable cancellable continuation
*/
private fun cancelLater(cause: Throwable): Boolean {
if (!resumeMode.isReusableMode) return false
// Ensure that we are postponing cancellation to the right instance
// Ensure that we are postponing cancellation to the right reusable instance
if (!isReusable()) return false
val dispatched = delegate as DispatchedContinuation<*>
return dispatched.postponeCancellation(cause)
Expand Down
Expand Up @@ -58,40 +58,30 @@ internal class DispatchedContinuation<in T>(
*/
private val _reusableCancellableContinuation = atomic<Any?>(null)

public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
private val reusableCancellableContinuation: CancellableContinuationImpl<*>?
get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>

public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
fun isReusable(): Boolean {
/*
Invariant: caller.resumeMode.isReusableMode
* Reusability control:
* `null` -> no reusability at all, `false`
* If current state is not CCI, then we are within `suspendCancellableCoroutineReusable`, true
* Else, if result is CCI === requester, then it's our reusable continuation
* Identity check my fail for the following pattern:
* ```
* loop:
* suspendCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
* suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
* it will leak because it won't be freed by `releaseInterceptedContinuation`
* ```
* anything else -> reusable.
*/
val value = _reusableCancellableContinuation.value ?: return false
if (value is CancellableContinuationImpl<*>) return value === requester
return true
return _reusableCancellableContinuation.value != null
}


/**
* Awaits until previous call to `suspendCancellableCoroutineReusable` will
* stop mutating cached instance
*/
public fun awaitReusability() {
_reusableCancellableContinuation.loop { it ->
fun awaitReusability() {
_reusableCancellableContinuation.loop {
if (it !== REUSABLE_CLAIMED) return
}
}

public fun release() {
fun release() {
/*
* Called from `releaseInterceptedContinuation`, can be concurrent with
* the code in `getResult` right after `trySuspend` returned `true`, so we have
Expand Down
41 changes: 41 additions & 0 deletions kotlinx-coroutines-core/jvm/test/ReusableContinuationStressTest.kt
@@ -0,0 +1,41 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.flow.*
import org.junit.*

class ReusableContinuationStressTest : TestBase() {

private val iterations = 1000 * stressTestMultiplierSqrt

@Test // Originally reported by @denis-bezrukov in #2736
fun testDebounceWithStateFlow() = runBlocking<Unit> {
withContext(Dispatchers.Default) {
repeat(iterations) {
launch { // <- load the dispatcher and OS scheduler
runStressTestOnce(1, 1)
}
}
}
}

private suspend fun runStressTestOnce(delay: Int, debounce: Int) = coroutineScope {
val stateFlow = MutableStateFlow(0)
val emitter = launch {
repeat(1000) { i ->
stateFlow.emit(i)
delay(delay.toLong())
}
}
var last = 0
stateFlow.debounce(debounce.toLong()).take(100).collect { i ->
if (i - last > 100) {
last = i
}
}
emitter.cancel()
}
}

0 comments on commit f661918

Please sign in to comment.