Skip to content

Commit

Permalink
Reusable continuation leak (#1858)
Browse files Browse the repository at this point in the history
Detect suspendCancellableCoroutine right after suspendCancellableCoroutineReusable within the same state machine and properly cleanup its child handle when its block completes

Fixes #1855
  • Loading branch information
qwwdfsad committed Mar 11, 2020
1 parent aff8202 commit 03f4e84
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 5 deletions.
Expand Up @@ -6,10 +6,11 @@ package benchmarks.flow.scrabble

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import org.openjdk.jmh.annotations.*
import java.lang.Long.*
import java.util.*
import java.util.concurrent.*
import kotlin.math.*

@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
Expand Down
Expand Up @@ -5,10 +5,11 @@
package benchmarks.flow.scrabble

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.openjdk.jmh.annotations.*
import java.lang.Long.*
import java.util.*
import java.util.concurrent.*
import java.util.concurrent.TimeUnit

@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
Expand Down
Expand Up @@ -85,12 +85,13 @@ internal open class CancellableContinuationImpl<in T>(
// This method does nothing. Leftover for binary compatibility with old compiled code
}

private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable
private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)

/**
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
@JvmName("resetState") // Prettier stack traces
internal fun resetState(): Boolean {
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
Expand Down
Expand Up @@ -63,8 +63,24 @@ internal class DispatchedContinuation<in T>(
public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>

public val isReusable: Boolean
get() = _reusableCancellableContinuation.value != null
public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
/*
* Reusability control:
* `null` -> no reusability at all, false
* If current state is not CCI, then we are within `suspendAtomicCancellableCoroutineReusable`, true
* Else, if result is CCI === requester.
* Identity check my fail for the following pattern:
* ```
* loop:
* suspendAtomicCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
* suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
* it will leak because it won't be freed by `releaseInterceptedContinuation`
* ```
*/
val value = _reusableCancellableContinuation.value ?: return false
if (value is CancellableContinuationImpl<*>) return value === requester
return true
}

/**
* Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block,
Expand Down
Expand Up @@ -192,4 +192,17 @@ class ReusableCancellableContinuationTest : TestBase() {
FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> }
finish(3)
}

@Test
fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest {
val channel = produce {
repeat(10) {
send(Unit)
}
}
for (value in channel) {
delay(1)
}
FieldWalker.assertReachableCount(1, coroutineContext[Job], { it is ChildContinuation })
}
}

0 comments on commit 03f4e84

Please sign in to comment.