Skip to content

Commit

Permalink
Prevent StackOverflowError in CompletableFuture.asDeferred and proper… (
Browse files Browse the repository at this point in the history
#2731)

Prevent StackOverflowError in CompletableFuture.asDeferred and properly report exceptions from completion handlers

* It turned out that 'cancel' on completed future tries to help and invoke 'whenComplete' handlers that also invoke 'cancel' on the very same future
* Use top-level exception handler as a last resort to deliver an exception

Fixes #2730
  • Loading branch information
qwwdfsad committed May 27, 2021
1 parent 5121005 commit 937180f
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 14 deletions.
6 changes: 4 additions & 2 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Expand Up @@ -136,11 +136,13 @@ public fun <T> ListenableFuture<T>.asDeferred(): Deferred<T> {
override fun onSuccess(result: T?) {
// Here we work with flexible types, so we unchecked cast to trick the type system
@Suppress("UNCHECKED_CAST")
deferred.complete(result as T)
runCatching { deferred.complete(result as T) }
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}

override fun onFailure(t: Throwable) {
deferred.completeExceptionally(t)
runCatching { deferred.completeExceptionally(t) }
.onFailure { handleCoroutineException(EmptyCoroutineContext, it) }
}
}, MoreExecutors.directExecutor())

Expand Down
@@ -0,0 +1,46 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.guava

import com.google.common.util.concurrent.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import kotlin.test.*

class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {

// This is a separate test in order to avoid interference with uncaught exception handlers in other tests
private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
private lateinit var caughtException: Throwable

@Before
fun setUp() {
Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
}

@After
fun tearDown() {
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}

@Test
fun testLostExceptionOnSuccess() = runTest {
val future = SettableFuture.create<Int>()
val deferred = future.asDeferred()
deferred.invokeOnCompletion { throw TestException() }
future.set(1)
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
}

@Test
fun testLostExceptionOnFailure() = runTest {
val future = SettableFuture.create<Int>()
val deferred = future.asDeferred()
deferred.invokeOnCompletion { throw TestException() }
future.setException(TestException2())
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
}
}
20 changes: 20 additions & 0 deletions integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Expand Up @@ -11,6 +11,7 @@ import org.junit.Ignore
import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.*
import kotlin.test.*

class ListenableFutureTest : TestBase() {
Expand Down Expand Up @@ -755,4 +756,23 @@ class ListenableFutureTest : TestBase() {
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}

@Test
fun testStackOverflow() = runTest {
val future = SettableFuture.create<Int>()
val completed = AtomicLong()
val count = 10000L
val children = ArrayList<Job>()
for (i in 0 until count) {
children += launch(Dispatchers.Default) {
future.asDeferred().await()
completed.incrementAndGet()
}
}
future.set(1)
withTimeout(60_000) {
children.forEach { it.join() }
assertEquals(count, completed.get())
}
}
}
19 changes: 12 additions & 7 deletions integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Expand Up @@ -126,13 +126,18 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
}
val result = CompletableDeferred<T>()
whenComplete { value, exception ->
if (exception == null) {
// the future has completed normally
result.complete(value)
} else {
// the future has completed with an exception, unwrap it consistently with fast path
// Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
try {
if (exception == null) {
// the future has completed normally
result.complete(value)
} else {
// the future has completed with an exception, unwrap it consistently with fast path
// Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping
result.completeExceptionally((exception as? CompletionException)?.cause ?: exception)
}
} catch (e: Throwable) {
// We come here iff the internals of Deferred threw an exception during its completion
handleCoroutineException(EmptyCoroutineContext, e)
}
}
result.cancelFutureOnCompletion(future)
Expand Down
@@ -0,0 +1,38 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package future

import kotlinx.coroutines.*
import kotlinx.coroutines.future.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.*
import kotlin.test.*

class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() {

// This is a separate test in order to avoid interference with uncaught exception handlers in other tests
private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler()
private lateinit var caughtException: Throwable

@Before
fun setUp() {
Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e }
}

@After
fun tearDown() {
Thread.setDefaultUncaughtExceptionHandler(exceptionHandler)
}

@Test
fun testLostException() = runTest {
val future = CompletableFuture<Int>()
val deferred = future.asDeferred()
deferred.invokeOnCompletion { throw TestException() }
future.complete(1)
assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException }
}
}
19 changes: 19 additions & 0 deletions integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
Expand Up @@ -575,4 +575,23 @@ class FutureTest : TestBase() {
future(start = CoroutineStart.ATOMIC) { }
future(start = CoroutineStart.UNDISPATCHED) { }
}

@Test
fun testStackOverflow() = runTest {
val future = CompletableFuture<Int>()
val completed = AtomicLong()
val count = 10000L
val children = ArrayList<Job>()
for (i in 0 until count) {
children += launch(Dispatchers.Default) {
future.asDeferred().await()
completed.incrementAndGet()
}
}
future.complete(1)
withTimeout(60_000) {
children.forEach { it.join() }
assertEquals(count, completed.get())
}
}
}
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/jvm/src/Future.kt
Expand Up @@ -13,20 +13,20 @@ import java.util.concurrent.*
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCompletion { future.cancel(false) }
* invokeOnCompletion { if (it != null) future.cancel(false) }
* ```
*
* @suppress **This an internal API and should not be used from general code.**
*/
@InternalCoroutinesApi
public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well?
invokeOnCompletion(handler = CancelFutureOnCompletion(future))

/**
* Cancels a specified [future] when this job is cancelled.
* This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
* ```
* invokeOnCancellation { future.cancel(false) }
* invokeOnCancellation { if (it != null) future.cancel(false) }
* ```
*/
public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit =
Expand All @@ -38,15 +38,15 @@ private class CancelFutureOnCompletion(
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
if (cause != null) future.cancel(false)
}
}

private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() {
override fun invoke(cause: Throwable?) {
// Don't interrupt when cancelling future on completion, because no one is going to reset this
// interruption flag and it will cause spurious failures elsewhere
future.cancel(false)
if (cause != null) future.cancel(false)
}
override fun toString() = "CancelFutureOnCancel[$future]"
}

0 comments on commit 937180f

Please sign in to comment.