Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support thread interrupting blocking functions #1947

Closed
elizarov opened this issue Apr 23, 2020 · 9 comments
Closed

Support thread interrupting blocking functions #1947

elizarov opened this issue Apr 23, 2020 · 9 comments

Comments

@elizarov
Copy link
Contributor

elizarov commented Apr 23, 2020

This idea supersedes the idea that was originally presented in #57 (see discussion and use-cases there). Here is the direct link to the comment by @jxdabc with this proposal: #57 (comment)

The idea to create a wrapper function that converts a regular blocking code in a suspending cancellable function that converts cancellation into interruption of the underlying blocking primitive:

public suspend fun <T> interruptible(block: () -> T): T

We might want to tweak naming a bit, though. Here are the options that I see on the table drawing from the traditions in our naming conventions. Additional ideas are welcome. In these examples I show them used to wrap an interruptible queue.take() call from JDK:

  • interruptible { queue.take() }
  • runInterruptible { queue.take() }
  • withInterrupt ( queue.take() }
  • interruptOnCancel { queue.take() }
  • runInterruptingOnCancel { queue.take() }
  • withInteruptOnCancel { queue.take() }

We should also consider an optional (or maybe even a required?) context parameter to this function. The reason for this is that it will enable single-call conversion of interruptible Java methods into main-safe suspending functions like this:

// With one call here we are moving the call to Dispatchers.IO and supporting interruption
suspend fun <T> BlockingQueue<T>.awaitTake(): T = 
    withInterruptOnCancel(Dispatchers.IO) { queue.take() }
@elizarov elizarov changed the title Support thread interrupt-honoring blocking function Support thread interrupting blocking functions Apr 23, 2020
@jxdabc
Copy link
Contributor

jxdabc commented Apr 23, 2020

We should also consider an optional (or maybe even a required?) context parameter to this function. The reason for this is that it will enable single-cell conversion of interruptible Java methods into main-safe suspending functions like this:

// With one call here we are moving the call to Dispatchers.IO and supporting interruption
suspend fun <T> BlockingQueue.awaitTake(): T = 
    withInterruptOnCancel(Dispatchers.IO) { queue.take() }

Great!
I'll amend the PR.

jxdabc pushed a commit to jxdabc/kotlinx.coroutines that referenced this issue Apr 23, 2020
This is implementation of issue Kotlin#1947

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
jxdabc pushed a commit to jxdabc/kotlinx.coroutines that referenced this issue Apr 23, 2020
This is implementation of issue Kotlin#1947

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
@fvasco
Copy link
Contributor

fvasco commented Apr 23, 2020

The interruptible function should take a CoroutineContext as parameters (so mimic the withContext function) or a Dispatcher is enough?

Considering the awaitTake example, is it correct to use withInterruptOnCancel without the Dispatcher argument?
In other words: is it correct to use withInterruptOnCancel with an implicit Dispatcher or the Default one?

I put on the table:

suspend fun runInterruptible(dispatcher: Dispatcher = Dispatcher.IO, block: () -> T)

@jxdabc
Copy link
Contributor

jxdabc commented Apr 24, 2020

Considering the awaitTake example, is it correct to use withInterruptOnCancel without the Dispatcher argument?

It is perfectly fine as I could see. I don't understand what the concern is here.

In other words: is it correct to use withInterruptOnCancel with an implicit Dispatcher or the Default one?

I put on the table:

suspend fun runInterruptible(dispatcher: Dispatcher = Dispatcher.IO, block: () -> T)

I don't think that is appropriate. If the current context is Dispatchers.Default, the function above makes running the blocking block in current context impossible without knowing what the current context is and explicitly put it on the argument of runInterruptible.

@fvasco
Copy link
Contributor

fvasco commented Apr 24, 2020

Hi @jxdabc,
thank you for your response.

The awaitTake function should work independently by the Dispatcher, I consider BlockingQueue<T>.awaitTake(): T = withInterruptOnCancel { queue.take() } a great error, it may not work on Dispatcher.Default or Dispatcher.IO.

More in general, I expect that a suspend fun is Dispatcher-indepenent, but I can be wrong.
suspend fun interruptible can run on the current thread, it cannot work on some Dispatchers.

So, if the target of a withInterruptOnCancel function is the awaitTake utilty, then a Dispatcher is required, using the Default or the current thread can be a wrong choice, IMHO.

I don't think that is appropriate. If the current context is Dispatchers.Default, the function above makes running the blocking block in current context impossible without knowing what the current context is and explicitly put it on the argument of runInterruptible.

I didn't understand your point, sorry.
The block function does not requires a CoroutineContext, can you rephrase, please?
Should queue.take() run on a CPU-bound Dispatcher?

@jxdabc
Copy link
Contributor

jxdabc commented Apr 25, 2020

@fvasco
Sorry, I tried but didn't quite understand your point.

  • The simple version of runInterruptible which takes no parameter works in current context (thread), and can switch to another context (thread) by withContext(Dispatchers.IO) { runInterruptible { queue.task() } }.
  • To make the above easy, @elizarov thought it should task an (optional) parameter context. Thus, we could switch to another context (thread) directly by runInterruptible(Dispathcers.IO) { queue.task() }. That is what amended Support thread interrupting blocking functions (#1947) #1934 does currently.
  • suspend fun runInterruptible(dispatcher: Dispatcher = Dispatcher.IO, block: () -> T) has a small flaw, as I could see, that it could not work in current context (thread). runInterruptible { queue.task() } works in a implicit IO context, not the current context (thread).

@fvasco
Copy link
Contributor

fvasco commented Apr 25, 2020

Hi @jxdabc,
all your points are true.

I rewrote my questions. These are personal considerations.

Should runInterruptible mimic withContext?

Should runInterruptible run on the current thread "as default"?

I put an idea on the table, it is not a proposal.

In my opinion, BlockingQueue<T>.awaitTake(): T = withInterruptOnCancel { queue.take() } is wrong, this supendable function is not Dispatcher independent, it may not work on Dispatcher.Main.
A minimal workaround is to use a Dispatcher.IO as default, it is a reasonable default.

Please consider this code:

scope.launch(ioScope) {
  ...
  withInterruptOnCancel { queue.take() }
  ...
}

In such case withInterruptOnCancel switch the Dispatcher, therefore Dispatcher.IO can be a sane default, but it may be a _wrong_choice.
it is possible to require a Dispatcher, so no Dispatcher.IO default, ie:

suspend fun Dispatcher.runInterruptible(block: () -> T)

So we have to understand what is the use-case for withInterruptOnCancel: a stand alone wrapper or a local utility?

I pose these questions because the Dispatcher is really important on blocking functions, especially if a thread can be blocked for a lot of time.

Finally, should withInterruptOnCancel mimic withContext? However this question follow the previous.

@jxdabc
Copy link
Contributor

jxdabc commented Apr 25, 2020

@fvasco I think I caught your point : )

In such case withInterruptOnCancel switch the Dispatcher, therefore Dispatcher.IO can be a sane default, but it may be a _wrong_choice.
it is possible to require a Dispatcher, so no Dispatcher.IO default, ie:

The fact is, as implemented currently, withInterruptOnCancel { queue.take() } won't switch the context (dispatcher), it will just use the outer context (dispatcher) and even won't do dispatch. So awaitTake should be dispatcher-independent in your word. : )

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 25, 2020

Here is our final design of the API:

  • The function will be called runInterruptible to be consistent both with runBlocking and runCatching. Its block parameter should not be suspendable.

  • To avoid structured the following structured API misuse:

runBlocking {

    runIninterruptible {
        launch { ... } // <- run in the scope of runBlocking!
        queue.take()
    }
}

block parameter should have special receiver InterruptibleScope (not a coroutine scope), marked with internal @DslMarker (CoroutineScope also should be marked with this annotation).

  • Function should receive CoroutineContext parameter that is passed directly to the withContext. The default value is EmptyCoroutineContext (default Dispatchers.IO could annoy DI users and opt-out looks really ugly)

jxdabc pushed a commit to jxdabc/kotlinx.coroutines that referenced this issue Apr 28, 2020
This is implementation of issue Kotlin#1947

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
qwwdfsad added a commit that referenced this issue Apr 29, 2020
* Support thread interrupting blocking functions (#1947)

This is the implementation of issue #1947

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
Co-authored-by: Trol <jiaoxiaodong@xiaomi.com>
@zhanghai
Copy link

zhanghai commented May 4, 2020

Hi, thanks for implementing this! It will be very helpful when using blocking IO in coroutines.

I saw that PR #1972 was merged recently, so do you have any estimation for when we will be able to use it in production? If that will be at least months later, we may also consider bundling Interruptible.kt temporarily.

This has been released in 1.3.6

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this issue Dec 28, 2020
* Support thread interrupting blocking functions (Kotlin#1947)

This is the implementation of issue Kotlin#1947

Signed-off-by: Trol <jiaoxiaodong@xiaomi.com>
Co-authored-by: Trol <jiaoxiaodong@xiaomi.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants