diff --git a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt index 35e0aeb379..8f11e0a916 100644 --- a/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt +++ b/integration/kotlinx-coroutines-guava/src/ListenableFuture.kt @@ -136,11 +136,13 @@ public fun ListenableFuture.asDeferred(): Deferred { override fun onSuccess(result: T?) { // Here we work with flexible types, so we unchecked cast to trick the type system @Suppress("UNCHECKED_CAST") - deferred.complete(result as T) + runCatching { deferred.complete(result as T) } + .onFailure { handleCoroutineException(EmptyCoroutineContext, it) } } override fun onFailure(t: Throwable) { - deferred.completeExceptionally(t) + runCatching { deferred.completeExceptionally(t) } + .onFailure { handleCoroutineException(EmptyCoroutineContext, it) } } }, MoreExecutors.directExecutor()) diff --git a/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt new file mode 100644 index 0000000000..d6469a947e --- /dev/null +++ b/integration/kotlinx-coroutines-guava/test/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -0,0 +1,46 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.guava + +import com.google.common.util.concurrent.* +import kotlinx.coroutines.* +import org.junit.* +import org.junit.Test +import kotlin.test.* + +class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { + + // This is a separate test in order to avoid interference with uncaught exception handlers in other tests + private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + private lateinit var caughtException: Throwable + + @Before + fun setUp() { + Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e } + } + + @After + fun tearDown() { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + + @Test + fun testLostExceptionOnSuccess() = runTest { + val future = SettableFuture.create() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.set(1) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } + + @Test + fun testLostExceptionOnFailure() = runTest { + val future = SettableFuture.create() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.setException(TestException2()) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } +} diff --git a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt index c463174a8d..69ba193071 100644 --- a/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt +++ b/integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt @@ -11,6 +11,7 @@ import org.junit.Ignore import org.junit.Test import java.util.concurrent.* import java.util.concurrent.CancellationException +import java.util.concurrent.atomic.* import kotlin.test.* class ListenableFutureTest : TestBase() { @@ -755,4 +756,23 @@ class ListenableFutureTest : TestBase() { future(start = CoroutineStart.ATOMIC) { } future(start = CoroutineStart.UNDISPATCHED) { } } + + @Test + fun testStackOverflow() = runTest { + val future = SettableFuture.create() + val completed = AtomicLong() + val count = 10000L + val children = ArrayList() + for (i in 0 until count) { + children += launch(Dispatchers.Default) { + future.asDeferred().await() + completed.incrementAndGet() + } + } + future.set(1) + withTimeout(60_000) { + children.forEach { it.join() } + assertEquals(count, completed.get()) + } + } } diff --git a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt index 7e9c349c66..caf2a3c359 100644 --- a/integration/kotlinx-coroutines-jdk8/src/future/Future.kt +++ b/integration/kotlinx-coroutines-jdk8/src/future/Future.kt @@ -126,13 +126,18 @@ public fun CompletionStage.asDeferred(): Deferred { } val result = CompletableDeferred() whenComplete { value, exception -> - if (exception == null) { - // the future has completed normally - result.complete(value) - } else { - // the future has completed with an exception, unwrap it consistently with fast path - // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping - result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) + try { + if (exception == null) { + // the future has completed normally + result.complete(value) + } else { + // the future has completed with an exception, unwrap it consistently with fast path + // Note: In the fast-path the implementation of CompletableFuture.get() does unwrapping + result.completeExceptionally((exception as? CompletionException)?.cause ?: exception) + } + } catch (e: Throwable) { + // We come here iff the internals of Deferred threw an exception during its completion + handleCoroutineException(EmptyCoroutineContext, e) } } result.cancelFutureOnCompletion(future) diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt new file mode 100644 index 0000000000..bf810af7aa --- /dev/null +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureAsDeferredUnhandledCompletionExceptionTest.kt @@ -0,0 +1,38 @@ +/* + * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package future + +import kotlinx.coroutines.* +import kotlinx.coroutines.future.* +import org.junit.* +import org.junit.Test +import java.util.concurrent.* +import kotlin.test.* + +class FutureAsDeferredUnhandledCompletionExceptionTest : TestBase() { + + // This is a separate test in order to avoid interference with uncaught exception handlers in other tests + private val exceptionHandler = Thread.getDefaultUncaughtExceptionHandler() + private lateinit var caughtException: Throwable + + @Before + fun setUp() { + Thread.setDefaultUncaughtExceptionHandler { _, e -> caughtException = e } + } + + @After + fun tearDown() { + Thread.setDefaultUncaughtExceptionHandler(exceptionHandler) + } + + @Test + fun testLostException() = runTest { + val future = CompletableFuture() + val deferred = future.asDeferred() + deferred.invokeOnCompletion { throw TestException() } + future.complete(1) + assertTrue { caughtException is CompletionHandlerException && caughtException.cause is TestException } + } +} diff --git a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt index 08e5cdad93..372e79ef1d 100644 --- a/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt +++ b/integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt @@ -575,4 +575,23 @@ class FutureTest : TestBase() { future(start = CoroutineStart.ATOMIC) { } future(start = CoroutineStart.UNDISPATCHED) { } } + + @Test + fun testStackOverflow() = runTest { + val future = CompletableFuture() + val completed = AtomicLong() + val count = 10000L + val children = ArrayList() + for (i in 0 until count) { + children += launch(Dispatchers.Default) { + future.asDeferred().await() + completed.incrementAndGet() + } + } + future.complete(1) + withTimeout(60_000) { + children.forEach { it.join() } + assertEquals(count, completed.get()) + } + } } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index 948ef6065c..b27a970845 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -13,20 +13,20 @@ import java.util.concurrent.* * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCompletion { future.cancel(false) } + * invokeOnCompletion { if (it != null) future.cancel(false) } * ``` * * @suppress **This an internal API and should not be used from general code.** */ @InternalCoroutinesApi public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle = - invokeOnCompletion(handler = CancelFutureOnCompletion(future)) // TODO make it work only on cancellation as well? + invokeOnCompletion(handler = CancelFutureOnCompletion(future)) /** * Cancels a specified [future] when this job is cancelled. * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created). * ``` - * invokeOnCancellation { future.cancel(false) } + * invokeOnCancellation { if (it != null) future.cancel(false) } * ``` */ public fun CancellableContinuation<*>.cancelFutureOnCancellation(future: Future<*>): Unit = @@ -38,7 +38,7 @@ private class CancelFutureOnCompletion( override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } } @@ -46,7 +46,7 @@ private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandle override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere - future.cancel(false) + if (cause != null) future.cancel(false) } override fun toString() = "CancelFutureOnCancel[$future]" }