Skip to content

Commit

Permalink
Breaking change: Guava future coroutine builder shouldn't report to C…
Browse files Browse the repository at this point in the history
…oroutineExceptionHandler (Kotlin#2840)

This change makes `future` coroutine builder consistent with `java.util.concurrent.FutureTask` which also drops exceptions that happen after successful cancellation.

Fixes Kotlin#2774
Fixes Kotlin#2791
  • Loading branch information
vadimsemenov authored and pablobaxter committed Sep 14, 2022
1 parent f790768 commit bbf4ade
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 44 deletions.
45 changes: 24 additions & 21 deletions integration/kotlinx-coroutines-guava/src/ListenableFuture.kt
Expand Up @@ -14,7 +14,7 @@ import kotlin.coroutines.*
/**
* Starts [block] in a new coroutine and returns a [ListenableFuture] pointing to its result.
*
* The coroutine is immediately started. Passing [CoroutineStart.LAZY] to [start] throws
* The coroutine is started immediately. Passing [CoroutineStart.LAZY] to [start] throws
* [IllegalArgumentException], because Futures don't have a way to start lazily.
*
* When the created coroutine [isCompleted][Job.isCompleted], it will try to
Expand All @@ -35,10 +35,12 @@ import kotlin.coroutines.*
* See [newCoroutineContext][CoroutineScope.newCoroutineContext] for a description of debugging
* facilities.
*
* Note that the error and cancellation semantics of [future] are _subtly different_ than [asListenableFuture]'s.
* In particular, any exception that happens in the coroutine after returned future is
* successfully cancelled will be passed to the [CoroutineExceptionHandler] from the [context].
* See [ListenableFutureCoroutine] for details.
* Note that the error and cancellation semantics of [future] are _different_ than [async]'s.
* In contrast to [Deferred], [Future] doesn't have an intermediate `Cancelling` state. If
* the returned `Future` is successfully cancelled, and `block` throws afterward, the thrown
* error is dropped, and getting the `Future`'s value will throw a `CancellationException` with
* no cause. This is to match the specification and behavior of
* `java.util.concurrent.FutureTask`.
*
* @param context added overlaying [CoroutineScope.coroutineContext] to form the new context.
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
Expand Down Expand Up @@ -241,8 +243,8 @@ public suspend fun <T> ListenableFuture<T>.await(): T {

return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
addListener(
ToContinuation(this, cont),
MoreExecutors.directExecutor())
ToContinuation(this, cont),
MoreExecutors.directExecutor())
cont.invokeOnCancellation {
cancel(false)
}
Expand Down Expand Up @@ -284,16 +286,13 @@ private class ToContinuation<T>(
* By documented contract, a [Future] has been cancelled if
* and only if its `isCancelled()` method returns true.
*
* Any error that occurs after successfully cancelling a [ListenableFuture] will be passed
* to the [CoroutineExceptionHandler] from the context. The contract of [Future] does not permit
* it to return an error after it is successfully cancelled.
*
* By calling [asListenableFuture] on a [Deferred], any error that occurs after successfully
* cancelling the [ListenableFuture] representation of the [Deferred] will _not_ be passed to
* the [CoroutineExceptionHandler]. Cancelling a [Deferred] places that [Deferred] in the
* cancelling/cancelled states defined by [Job], which _can_ show the error. It's assumed that
* the [Deferred] pointing to the task will be used to observe any error outcome occurring after
* cancellation.
* Any error that occurs after successfully cancelling a [ListenableFuture] is lost.
* The contract of [Future] does not permit it to return an error after it is successfully cancelled.
* On the other hand, we can't report an unhandled exception to [CoroutineExceptionHandler],
* otherwise [Future.cancel] can lead to an app crash which arguably is a contract violation.
* In contrast to [Future] which can't change its outcome after a successful cancellation,
* cancelling a [Deferred] places that [Deferred] in the cancelling/cancelled states defined by [Job],
* which _can_ show the error.
*
* This may be counterintuitive, but it maintains the error and cancellation contracts of both
* the [Deferred] and [ListenableFuture] types, while permitting both kinds of promise to point
Expand All @@ -312,10 +311,14 @@ private class ListenableFutureCoroutine<T>(
}

override fun onCancelled(cause: Throwable, handled: Boolean) {
if (!future.completeExceptionallyOrCancel(cause) && !handled) {
// prevents loss of exception that was not handled by parent & could not be set to JobListenableFuture
handleCoroutineException(context, cause)
}
// Note: if future was cancelled in a race with a cancellation of this
// coroutine, and the future was successfully cancelled first, the cause of coroutine
// cancellation is dropped in this promise. A Future can only be completed once.
//
// This is consistent with FutureTask behaviour. A race between a Future.cancel() and
// a FutureTask.setException() for the same Future will similarly drop the
// cause of a failure-after-cancellation.
future.completeExceptionallyOrCancel(cause)
}
}

Expand Down
79 changes: 56 additions & 23 deletions integration/kotlinx-coroutines-guava/test/ListenableFutureTest.kt
Expand Up @@ -555,19 +555,15 @@ class ListenableFutureTest : TestBase() {
}

@Test
fun testUnhandledExceptionOnExternalCancellation() = runTest(
unhandled = listOf(
{ it -> it is TestException } // exception is unhandled because there is no parent
)
) {
fun testUnhandledExceptionOnExternalCancellation() = runTest {
expect(1)
// No parent here (NonCancellable), so nowhere to propagate exception
val result = future(NonCancellable + Dispatchers.Unconfined) {
try {
delay(Long.MAX_VALUE)
} finally {
expect(2)
throw TestException() // this exception cannot be handled
throw TestException() // this exception cannot be handled and is set to be lost.
}
}
result.cancel(true)
Expand Down Expand Up @@ -708,23 +704,6 @@ class ListenableFutureTest : TestBase() {
assertEquals(testException, thrown.cause)
}

@Test
fun stressTestJobListenableFutureIsCancelledDoesNotThrow() = runTest {
repeat(1000) {
val deferred = CompletableDeferred<String>()
val asListenableFuture = deferred.asListenableFuture()
// We heed two threads to test a race condition.
withContext(Dispatchers.Default) {
val cancellationJob = launch {
asListenableFuture.cancel(false)
}
while (!cancellationJob.isCompleted) {
asListenableFuture.isCancelled // Shouldn't throw.
}
}
}
}

private inline fun <reified T: Throwable> ListenableFuture<*>.checkFutureException() {
val e = assertFailsWith<ExecutionException> { get() }
val cause = e.cause!!
Expand Down Expand Up @@ -775,4 +754,58 @@ class ListenableFutureTest : TestBase() {
assertEquals(count, completed.get())
}
}

@Test
fun futurePropagatesExceptionToParentAfterCancellation() = runTest {
val latch = CompletableDeferred<Boolean>()
val parent = Job()
val scope = CoroutineScope(parent)
val exception = TestException("propagated to parent")
val future = scope.future {
withContext(NonCancellable) {
latch.await()
throw exception
}
}
future.cancel(true)
latch.complete(true)
parent.join()
assertTrue(parent.isCancelled)
assertEquals(exception, parent.getCancellationException().cause)
}

// Stress tests.

@Test
fun testFutureDoesNotReportToCoroutineExceptionHandler() = runTest {
repeat(1000) {
supervisorScope { // Don't propagate failures in children to parent and other children.
val innerFuture = SettableFuture.create<Unit>()
val outerFuture = async { innerFuture.await() }

withContext(Dispatchers.Default) {
launch { innerFuture.setException(TestException("can be lost")) }
launch { outerFuture.cancel() }
// nothing should be reported to CoroutineExceptionHandler, otherwise `Future.cancel` contract violation.
}
}
}
}

@Test
fun testJobListenableFutureIsCancelledDoesNotThrow() = runTest {
repeat(1000) {
val deferred = CompletableDeferred<String>()
val asListenableFuture = deferred.asListenableFuture()
// We heed two threads to test a race condition.
withContext(Dispatchers.Default) {
val cancellationJob = launch {
asListenableFuture.cancel(false)
}
while (!cancellationJob.isCompleted) {
asListenableFuture.isCancelled // Shouldn't throw.
}
}
}
}
}

0 comments on commit bbf4ade

Please sign in to comment.