Skip to content

Commit

Permalink
Introduce Task.await and Task.asDeferred with CancellationTokenSource (
Browse files Browse the repository at this point in the history
…Kotlin#2786)

* Support bi-directional cancellation for Task.asDeferred and Task.await via passed in CancellationTokenSource

Fixes Kotlin#2527

Co-authored-by: Alex Vanyo <vanyoalex@gmail.com>
Co-authored-by: dkhalanskyjb <52952525+dkhalanskyjb@users.noreply.github.com>
  • Loading branch information
3 people authored and pablobaxter committed Sep 14, 2022
1 parent f661918 commit 8cf0f36
Show file tree
Hide file tree
Showing 4 changed files with 349 additions and 27 deletions.
10 changes: 10 additions & 0 deletions integration/kotlinx-coroutines-play-services/README.md
Expand Up @@ -6,6 +6,7 @@ Extension functions:

| **Name** | **Description**
| -------- | ---------------
| [Task.asDeferred][asDeferred] | Converts a Task into a Deferred
| [Task.await][await] | Awaits for completion of the Task (cancellable)
| [Deferred.asTask][asTask] | Converts a deferred value to a Task

Expand All @@ -25,5 +26,14 @@ val snapshot = try {
// Do stuff
```

If the `Task` supports cancellation via passing a `CancellationToken`, pass the corresponding `CancellationTokenSource` to `asDeferred` or `await` to support bi-directional cancellation:

```kotlin
val cancellationTokenSource = CancellationTokenSource()
val currentLocationTask = fusedLocationProviderClient.getCurrentLocation(PRIORITY_HIGH_ACCURACY, cancellationTokenSource.token)
val currentLocation = currentLocationTask.await(cancellationTokenSource) // cancelling `await` also cancels `currentLocationTask`, and vice versa
```

[asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/com.google.android.gms.tasks.-task/as-deferred.html
[await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/com.google.android.gms.tasks.-task/await.html
[asTask]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-play-services/kotlinx.coroutines.tasks/kotlinx.coroutines.-deferred/as-task.html
@@ -1,6 +1,8 @@
public final class kotlinx/coroutines/tasks/TasksKt {
public static final fun asDeferred (Lcom/google/android/gms/tasks/Task;)Lkotlinx/coroutines/Deferred;
public static final fun asDeferred (Lcom/google/android/gms/tasks/Task;Lcom/google/android/gms/tasks/CancellationTokenSource;)Lkotlinx/coroutines/Deferred;
public static final fun asTask (Lkotlinx/coroutines/Deferred;)Lcom/google/android/gms/tasks/Task;
public static final fun await (Lcom/google/android/gms/tasks/Task;Lcom/google/android/gms/tasks/CancellationTokenSource;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun await (Lcom/google/android/gms/tasks/Task;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

99 changes: 72 additions & 27 deletions integration/kotlinx-coroutines-play-services/src/Tasks.kt
Expand Up @@ -6,15 +6,8 @@

package kotlinx.coroutines.tasks

import com.google.android.gms.tasks.CancellationTokenSource
import com.google.android.gms.tasks.RuntimeExecutionException
import com.google.android.gms.tasks.Task
import com.google.android.gms.tasks.TaskCompletionSource
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
import kotlinx.coroutines.suspendCancellableCoroutine
import com.google.android.gms.tasks.*
import kotlinx.coroutines.*
import kotlin.coroutines.*

/**
Expand Down Expand Up @@ -45,39 +38,85 @@ public fun <T> Deferred<T>.asTask(): Task<T> {
/**
* Converts this task to an instance of [Deferred].
* If task is cancelled then resulting deferred will be cancelled as well.
* However, the opposite is not true: if the deferred is cancelled, the [Task] will not be cancelled.
* For bi-directional cancellation, an overload that accepts [CancellationTokenSource] can be used.
*/
public fun <T> Task<T>.asDeferred(): Deferred<T> {
public fun <T> Task<T>.asDeferred(): Deferred<T> = asDeferredImpl(null)

/**
* Converts this task to an instance of [Deferred] with a [CancellationTokenSource] to control cancellation.
* The cancellation of this function is bi-directional:
* * If the given task is cancelled, the resulting deferred will be cancelled.
* * If the resulting deferred is cancelled, the provided [cancellationTokenSource] will be cancelled.
*
* Providing a [CancellationTokenSource] that is unrelated to the receiving [Task] is not supported and
* leads to an unspecified behaviour.
*/
@ExperimentalCoroutinesApi // Since 1.5.1, tentatively until 1.6.0
public fun <T> Task<T>.asDeferred(cancellationTokenSource: CancellationTokenSource): Deferred<T> =
asDeferredImpl(cancellationTokenSource)

private fun <T> Task<T>.asDeferredImpl(cancellationTokenSource: CancellationTokenSource?): Deferred<T> {
val deferred = CompletableDeferred<T>()
if (isComplete) {
val e = exception
return if (e == null) {
@Suppress("UNCHECKED_CAST")
CompletableDeferred<T>().apply { if (isCanceled) cancel() else complete(result as T) }
if (e == null) {
if (isCanceled) {
deferred.cancel()
} else {
@Suppress("UNCHECKED_CAST")
deferred.complete(result as T)
}
} else {
CompletableDeferred<T>().apply { completeExceptionally(e) }
deferred.completeExceptionally(e)
}
} else {
addOnCompleteListener {
val e = it.exception
if (e == null) {
@Suppress("UNCHECKED_CAST")
if (it.isCanceled) deferred.cancel() else deferred.complete(it.result as T)
} else {
deferred.completeExceptionally(e)
}
}
}

val result = CompletableDeferred<T>()
addOnCompleteListener {
val e = it.exception
if (e == null) {
@Suppress("UNCHECKED_CAST")
if (isCanceled) result.cancel() else result.complete(it.result as T)
} else {
result.completeExceptionally(e)
if (cancellationTokenSource != null) {
deferred.invokeOnCompletion {
cancellationTokenSource.cancel()
}
}
return result
// Prevent casting to CompletableDeferred and manual completion.
return object : Deferred<T> by deferred {}
}

/**
* Awaits for completion of the task without blocking a thread.
* Awaits the completion of the task without blocking a thread.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the completion stage and immediately resumes with [CancellationException].
*
* For bi-directional cancellation, an overload that accepts [CancellationTokenSource] can be used.
*/
public suspend fun <T> Task<T>.await(): T = awaitImpl(null)

/**
* Awaits the completion of the task that is linked to the given [CancellationTokenSource] to control cancellation.
*
* This suspending function is cancellable and cancellation is bi-directional:
* * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* cancels the [cancellationTokenSource] and throws a [CancellationException].
* * If the task is cancelled, then this function will throw a [CancellationException].
*
* Providing a [CancellationTokenSource] that is unrelated to the receiving [Task] is not supported and
* leads to an unspecified behaviour.
*/
public suspend fun <T> Task<T>.await(): T {
@ExperimentalCoroutinesApi // Since 1.5.1, tentatively until 1.6.0
public suspend fun <T> Task<T>.await(cancellationTokenSource: CancellationTokenSource): T = awaitImpl(cancellationTokenSource)

private suspend fun <T> Task<T>.awaitImpl(cancellationTokenSource: CancellationTokenSource?): T {
// fast path
if (isComplete) {
val e = exception
Expand All @@ -95,13 +134,19 @@ public suspend fun <T> Task<T>.await(): T {

return suspendCancellableCoroutine { cont ->
addOnCompleteListener {
val e = exception
val e = it.exception
if (e == null) {
@Suppress("UNCHECKED_CAST")
if (isCanceled) cont.cancel() else cont.resume(result as T)
if (it.isCanceled) cont.cancel() else cont.resume(it.result as T)
} else {
cont.resumeWithException(e)
}
}

if (cancellationTokenSource != null) {
cont.invokeOnCancellation {
cancellationTokenSource.cancel()
}
}
}
}

0 comments on commit 8cf0f36

Please sign in to comment.