Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancellation problem #2276

Closed
jxdabc opened this issue Sep 30, 2020 · 4 comments
Closed

Cancellation problem #2276

jxdabc opened this issue Sep 30, 2020 · 4 comments

Comments

@jxdabc
Copy link
Contributor

jxdabc commented Sep 30, 2020

Background

In most cases, cancellation of a suspend function needs to ensure some program invariant.

For example, Channel.send ensures that when this function throws a [CancellationException], it means that the [element] was not sent to this channel.

This is a most common kind of invariant and I will use it in the following discuss and name it atomic cancellation according to the comment of this kind of cancellable suspend methods.

Problem

Two simple cancellable methods are implemented as follows:

/**
 * Reads from disk into [channel] in IO context/dispatcher.
 *
 * Nothing will be sent to [channel] if cancelled.
 */
suspend fun readFromDiskIntoChannel(
        channel: Channel<String>
): Unit = withContext(Dispatchers.IO) {
    // blocking read
    val something = readFromDisk()
    channel.send(something)
}
/**
 * Reads from disk into [channel] with timeout.
 *
 * Nothing will be sent to [channel] if timeouts.
 *
 * Returns false if timeout true otherwise.
 */
suspend fun readFromDiskIntoChannel(channel: Channel<String>, timeoutMills: Long): Boolean {
    // disable outside cancellation
    return withContext(NonCancellable) {

        try {
            // setup timeout
            withTimeout(timeoutMills) {
                // blocking read
                val something = readFromDisk()
                channel.send(something)
                true
            }
        } catch (e: TimeoutCancellationException) {
            false
        }

    }
}

Unfortunately, the above implementations are both wrong. Either method could be cancelled(timeout) with something sent to its channel, but for different subtle reasons.

Reason 1:
For the second method, if timeout happens just after channel.send returns, withTimeout will throw TimeoutCancellationException instead of returning true. This is because the inner coroutine is cancelled and cancellation supersedes normal return value when calculate the final result of the inner coroutine. This is also true for the fist method.

Reason 2:
For the fist method, if the outer coroutine is cancelled just after the inner coroutine returns true, but before the outer dispatcher starts to resume the outer coroutine, withContext will throw CancellationException instead of returning true. This is because the result is cancellable dispatched back.

According to the above reasons, the implementations should be modified as follows:

/**
 * Reads from disk into [channel] in IO context/dispatcher.
 *
 * Nothing will be sent to [channel] if cancelled.
 */
suspend fun readFromDiskIntoChannel(
        channel: Channel<String>
): Unit {
    var finished = false

    try {
        withContext(Dispatchers.IO) {
            // blocking read
            val something = readFromDisk()
            channel.send(something)
            finished = true
        }
    } catch (e: CancellationException) {
        if (finished) { return }
        else { throw e }
    }
}
/**
 * Reads from disk into [channel] with timeout.
 *
 * Nothing will be sent to [channel] if timeouts.
 *
 * Returns false if timeout true otherwise.
 */
suspend fun readFromDiskIntoChannel(channel: Channel<String>, timeoutMills: Long): Boolean {
    // disable outside cancellation
    return withContext(NonCancellable) {
        
        var finished = false
        try {
            // setup timeout
            withTimeout(timeoutMills) {
                // blocking read
                val something = readFromDisk()
                channel.send(something)
                finished = true
                true
            }
        } catch (e: TimeoutCancellationException) {
            if (finished)
                true
            else
                false
        }

    }
}

This is complex, counter-structured.

Possible Improvement

Improvement 1: Reason 1 hints that at least in this scene, the inner coroutine is expected to return what the block returns. So, is it possible to provide a new kind of 'Job' to return what coroutine block returns?

Consider that a common pattern of structured concurrency usage is as follows. This kind of Job seems reasonable.

coroutineScope {
    val res1 = async {
        // return ...
        // throw ...
    }

    val res2 = async {
        // return ...
        // throw ...
    }

    // result or exception will be returned/thrown from the top coroutine
    return res1.await() + res2.await()
}

Improvement 2: Reason 2 hints that at least the library user should be able to control where the coroutine could be cancelled. I think maybe it's better to permit cancellation only by explicit pass true value to a cancellable: Boolean parameter of withContext-like methods. Thus, to fulfillment the promise that coroutine cancellation is cooperative.

Another Cancellation Problem

I'm recently working with a suspend method as follows:

/**
 * Connects [socketChannel] to [addr].
 *
 * If fails or cancelled, [socketChannel] will be closed.
 */
suspend fun connectOrClose(socketChannel: SocketChannel, addr: SocketAddress)

To implement connecting with timeout, I have to code like:

var started = false
try {
    // whether `withTimeout` could be cancelled before block starts to run
    // is not documented. Since `withContext` could do so. Assume `withTimeout`
    // could do so (in the future) as well.
    withTimeout(timeoutMillis) {
        started = true
        connectOrClose(socket, addr)
    }
} catch (e: CancellationException) {
    if (!started) {
        socket.close()
    }
}

This is another kind of cancellation invariant that differs from atomic cancellation. But it hints the same improvements as said above.

@elizarov
Copy link
Contributor

elizarov commented Oct 1, 2020

The current guarantee on atomic cancellation will be reworked and will not be maintained anymore in the upcoming release. See PR #1937 for details. It also contains updated (rewritten) documentation.

@jxdabc
Copy link
Contributor Author

jxdabc commented Oct 2, 2020

The current guarantee on atomic cancellation will be reworked and will not be maintained anymore in the upcoming release. See PR #1937 for details. It also contains updated (rewritten) documentation.

@elizarov

I had a thoroughly read on #1937. The idea is:

  • To make thread-bound/single-thread-dispatcher coroutines throw CancellationException when cancelled from another same-thread/same-dispatcher coroutine or normal method (eg., Android onDestroy()), atomic resume won't be used anymore.
  • Instead, introduces onCancellation to resume, which is invoked if the continuation is cancelled after/before resume.
  • Related methods, eg., Channel.send(), Mutex.lock(), are redefined in cancellation scenes.

I think #1937 is orthogonal to this issue.

Although #1937 redefined the cancellation invariant:

  • For Channel.send(), from 'the element will not sent to this channel' to 'the element will send to this channel or onUndeliveredElement'.
  • For Mutex.lock(), from 'the lock will not be held', to 'the lock will be unlocked'. (almost no difference from the view of a caller)

The problem discussed above still exists and is essentially the same.

For example:

/**
 * Connects [socket] to [address] in IO context/dispatcher and send it to [channel].
 *
 * [socket] will be closed if failed, otherwise, even cancelled, the socket will be sent to [channel]
 * or [onUndeliveredElement] of [Channel] to avoid resource leak.
 */
suspend fun openConnectionAndSendToChannel(
    socket: SocketChannel, address: SocketAddress, channel: Channel<SocketChannel>
) = withContext(Dispatchers.IO) {
    // blocking connect
    connectOrCloseIfFailed(socket, address)
    channel.send(socket)
}

This implementation is not correct, similar to the very last example in the first post of this issue. If the outer coroutine is cancelled just before withContext(Dispatchers.IO), the inner coroutine won't run. Thus, this method is cancelled but fails to 'send socket to [channel] or [onUndeliveredElement] of [Channel] to avoid resource leak'. And I failed to figure out a workaround without changing the definition of this method.

This hints the same improvement as Improvement 2 in the first post of this issue.

Another example:

/**
 * Locks with timeout.
 *
 * The lock will not be held if timeouts.
 *
 * Returns false if timeouts true otherwise.
 */
suspend fun lockWithTimeout(lock: Mutex, timeoutMillis: Long) {
    // disable outside cancellation
    return withContext(NonCancellable) {

        try {
            // setup timeout
            withTimeout(timeoutMillis) {
                lock.lock()
                true
            }
        } catch (e: TimeoutCancellationException) {
            false
        }

    }
}

This implementation is not correct, either. The reason and workaround is the same as the second example, the timeout-version readFromDiskIntoChannel, in the Problem section of the first post of this issue.

This hints the same improvement in the first post of this issue as well.

@jxdabc jxdabc changed the title (Atomic) cancellation problem Cancellation problem Oct 6, 2020
@elizarov
Copy link
Contributor

elizarov commented Oct 9, 2020

I've recently updated docs on how to work around the problem when using timeouts resource to close #2233
You can read updated docs here: https://github.com/Kotlin/kotlinx.coroutines/blob/develop/docs/cancellation-and-timeouts.md#asynchronous-timeout-and-resources

@jxdabc
Copy link
Contributor Author

jxdabc commented Oct 12, 2020

I've recently updated docs on how to work around the problem when using timeouts resource to close #2233
You can read updated docs here: https://github.com/Kotlin/kotlinx.coroutines/blob/develop/docs/cancellation-and-timeouts.md#asynchronous-timeout-and-resources

@elizarov

I had a thoroughly read on #2233 and the updated docs asynchronous-timeout-and-resources. The idea is similar to the workaround above.

This kind of workaround works to some problem, but doesn't seem to work to some problem, eg., the openConnectionAndSendToChannel example above.

And, this kind of workaround is more or less anti-structure, counter-intuitive and error-prone as I could see (one have to fully read the asynchronous-timeout-and-resources docs and be careful about cancellation)

To make withContext-like / withTimeout-like methods structured and intuitive in cancellation scenes. And to make user able to fully cooperate with cancellation.

  • Is it possible to provide a new kind of Job, similar to SupervisorJob, to return what coroutine block returns? (detailed above in the first post of this issue, 'Possible Improvement' section)
  • Is it possible to add a cancellable: Boolean parameter to withContext-like methods and make the method non-cancellable itself (the inner coroutine is still cancellable) when cancellable=false? (detailed above in the first post of this issue, 'Possible Improvement' section)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants