Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support optional thread interrupt in newSingleThreadContext as its friends #57

Closed
elizarov opened this issue Apr 17, 2017 · 14 comments
Closed

Comments

@elizarov
Copy link
Contributor

See discussion here: https://discuss.kotlinlang.org/t/calling-blocking-code-in-coroutines/2368

@elizarov elizarov changed the title Support optional thread interrupt in newSingleThreadContext/newFixedThreadPoolContext Support optional thread interrupt in newSingleThreadContext as its friends Apr 17, 2017
@Tolriq
Copy link

Tolriq commented Oct 28, 2018

@elizarov sorry to post here but I'm sure this could help a lot of other people.

I really struggle to understand how to properly cancel blocking IO calls since normal usage does not interrupt thread.

Goal would be convert some functions in a module to coroutines with blocking IO and some computation.

I get the isActive/yield and ways to cancel the computations, but not how to push the cancel to the blocking IO call.

If I understand correctly how to properly do the coroutines stuff it should be something like this

suspend fun x(): y {
   ....
   val result = withContext(IO) {
   }
   do CPU computation checking for isActive()/yield
}

But from all that I read, cancelling the coroutine that call x() would still have to wait for the result to finish it's IO blocking call that only support thread interrupt)

Is there any usable documentation / sample that fully cover this need (for Android so without Guava if possible)

(Plan is not to use IO but an existing executor with asCoroutineDispatcher())

@khovanskiy
Copy link

khovanskiy commented Oct 15, 2019

I meet the issue like in the references, so the following workaround was implemented:

val externalThreadPool = Executors.newCachedThreadPool()
suspend fun <T> withTimeoutOrInterrupt(timeMillis: Long, block: () -> T) {
    withTimeout(timeMillis) {
        suspendCancellableCoroutine<T> { cont ->
            val future = externalThreadPool.submit {
                try {
                    val result = block()
                    cont.resumeWith(Result.success(result))
                } catch (e: InterruptedException) {
                    cont.resumeWithException(CancellationException())
                } catch (e: Throwable) {
                    cont.resumeWithException(e);
                }
            }
            cont.invokeOnCancellation {
                future.cancel(true)
            }
        }
    }
}

It provides the similar behaviour like usual withTimeout, but additionally it supports running code with blocking.
Note: It should be called only when you know, that the inner code use blocking and can correctly process throwed InterruptedException. In most cases withTimeout function is prefered.

@elizarov
Copy link
Contributor Author

elizarov commented Oct 16, 2019

@khovanskiy This "works", but it is extremely dangerous code to use in a serious high-load production. Here's the implementation of that future.cancel method from Open JDK for your reference:
https://github.com/openjdk/jdk/blob/67a89143dde6e545adbfc3c79bb89d954307f8bc/src/java.base/share/classes/java/util/concurrent/FutureTask.java#L164-L182

Ask you can see it does a classic "check & act" concurrent programming mistake: It interrupts a thread that was running this task at the time it had checked it, so under load it may, in fact, interrupt some other, unrelated piece of code.

@elizarov
Copy link
Contributor Author

UPDATE: I stand corrected. False alarm. This particular usage of Executors.newCachedThreadPool() with future.cancel(true) seems to be fine. All bases are covered. See here: https://twitter.com/relizarov/status/1184460504238100480

@jxdabc
Copy link
Contributor

jxdabc commented Apr 19, 2020

@elizarov

I think this ability is of great need on Android.

According to

The practical usage on Android would be something like:

class MyAndroidActivity {
    private val scope = MainScope()

    override fun onResume {
         scope.launch {
             var info: Info
             withContext(Dispatchers.IO) {
                 // #1 may block
                info = loadFromInternet()
             }
             showOnUI(info)
         }
    }

    override fun onDestroy() {
        super.onDestroy()
        scope.cancel()
    }
}

Line #1 may block for a relatively long time. If a lot of activities like this are entered and exited in a relatively short time, eg., someone is looking for interesting news and entering exiting news detail pages quickly without long stay, or someone is doing monkey test. The Dispatcher will be filled with canceled but blocked coroutines, so that new info loading of new activities are starving and unable to run.

jxdabc pushed a commit to jxdabc/kotlinx.coroutines that referenced this issue Apr 19, 2020
)

See issue Kotlin#57 for details

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
jxdabc pushed a commit to jxdabc/kotlinx.coroutines that referenced this issue Apr 19, 2020
)

See issue Kotlin#57 for details

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
@jxdabc
Copy link
Contributor

jxdabc commented Apr 19, 2020

@elizarov

To implement this, I think, we could

  1. Make it optional and default off -- explicitly use only.
  2. Watch the job for cancellation and interrupt the executing thread when, and only when, continuations of a coroutine is running.
  3. Transform InterruptedException to CancellationException at the end of the coroutine so that everything else goes as before.
  • update/restore of ThreadContextElement is perfect place to implement 2
  • To implement 3, transform before AbstractCoroutine#makeCompletingOnce requiring a little modification
  • Implementing 1 with a context element is easy

@jxdabc
Copy link
Contributor

jxdabc commented Apr 19, 2020

I made a PR to implement this #1922
The added context element is something like:

/**
 * This [CoroutineContext] element makes a coroutine interruptible.
 *
 * With this element, the thread executing the coroutine is interrupted when the coroutine is canceled, making
 * blocking procedures stop. Exceptions that indicate an interrupted procedure, eg., InterruptedException on JVM
 * are transformed into [CancellationException] at the end of the coroutine. Thus, everything else goes as if this
 * element is not present. In particular, the parent coroutine won't be canceled by those exceptions.
 *
 * This is an abstract element and will be implemented by each individual platform (or won't be implemented).
 * The JVM implementation is named CoroutineInterruptible.
 *
 * Example:
 * ```
 * GlobalScope.launch(Dispatchers.IO + CoroutineInterruptible) {
 *     async {
 *         // This block will throw [CancellationException] instead of an exception indicating
 *         // interruption, such as InterruptedException on JVM.
 *         withContext(CoroutineName) {
 *             doSomethingUseful()
 *
 *             // This blocking procedure will be interrupted when this coroutine is canceled
 *             // by Exception thrown by the below async block.
 *             doSomethingElseUsefulInterruptible()
 *         }
 *     }
 *
 *     async {
 *        delay(500L)
 *        throw Exception()
 *     }
 * }
 * ```
 */
abstract class CoroutineInterruptController : AbstractCoroutineContextElement(Key)

jxdabc pushed a commit to jxdabc/kotlinx.coroutines that referenced this issue Apr 22, 2020
…ellation (Kotlin#57)

This is implementation of issue Kotlin#57 and non-intrusive variant of Kotlin#1922

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
@jxdabc
Copy link
Contributor

jxdabc commented Apr 22, 2020

PR (non-intrusive implementation of this feature): #1934

As mentioned above. This feature is of great use at least on Android as I could see. There is still another non-intrusive implementation of this feature: Add a function that makes a blocking code block cancellable (become a cancellation point of the coroutine) by interrupting the blocking code block and throwing CancellationException on coroutine cancellation.

The added function is like:

/**
 * Makes a blocking code block cancellable (become a cancellation point of the coroutine).
 *
 * The blocking code block will be interrupted and this function will throw [CancellationException]
 * if the coroutine is cancelled.
 *
 * Example:
 * ```
 * GlobalScope.launch(Dispatchers.IO) {
 *     async {
 *         // This function will throw [CancellationException].
 *         interruptible {
 *             doSomethingUseful()
 *
 *             // This blocking procedure will be interrupted when this coroutine is canceled
 *             // by Exception thrown by the below async block.
 *             doSomethingElseUsefulInterruptible()
 *         }
 *     }
 *
 *     async {
 *        delay(500L)
 *        throw Exception()
 *     }
 * }
 * ```
 */
public suspend fun <T> interruptible(block: () -> T): T

@elizarov
Copy link
Contributor Author

👍 interruptible { ... } block seems to be lean an isolated way to implement it. Let's move it to a separate issue, though -> #1947 . I'm closing this one.

@AndroidDeveloperLB
Copy link

Is it possible for Kotlin Coroutines to handle thread interruption?
If I call some function that has "sleep", for example, is it possible to cancel it so that it will also call "interrupt" ?

@qwwdfsad
Copy link
Member

qwwdfsad commented Jul 13, 2020

@AndroidDeveloperLB yes, you could use runInterruptible builder for this purpose

@AndroidDeveloperLB
Copy link

@qwwdfsad This almost works.
Almost because then it forces you to cancel by interruption, always.
Via AsyncTask, you always had a choice of whether cancelling normally or by interruptions.
Here it seems you have to set it up and stay with one of them, forever.

@elizarov
Copy link
Contributor Author

What do you mean by "forces you to cancel by interruption"? Can you elaborate, please?

@AndroidDeveloperLB
Copy link

@elizarov Either you use runInterruptible or you don't.
If you use it, cancel will interrupt.
If you don't, cancel will not interrupt.
The creation of the instance forces cancel to work in one way or the other. You have no choice after it's created.

On AsyncTask, you have a choice for every instance. cancel(true) will interrupt, and cancel(false) will not.
The creation of the instance doesn't force you to choose how it will be canceled.
You can choose to cancel with interruption in some case, and without in another.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants