Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consume: the unreliable way to cancel upstream sources for channel operators #279

Closed
elizarov opened this issue Mar 12, 2018 · 10 comments
Closed
Assignees
Labels

Comments

@elizarov
Copy link
Contributor

Background

Currently, all built-in operations on ReceiveChannel are implemented using basically the same code pattern. Take a look at the implementation of filter, for example:

fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
    produce(context) {
        consumeEach {
            if (predicate(it)) send(it)
        }
    }

Under the hood, consumeEach uses consume to make sure that every item from the upstream channel is consumed even when the filter coroutine crashes or is cancelled:

suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit) =
    consume {
        for (element in this) action(element)
    }

where definition of consume is likewise simple:

inline fun <E, R> ReceiveChannel<E>.consume(block: ReceiveChannel<E>.() -> R): R =
    try {
        block()
    } finally {
        cancel()
    }

This design ensures that when you have a chain of operators applied to some source channel like in val res = src.filter { ... }, then closing the resulting res channel forwards that cancellation upstream and closes the source src channel, too, thus making sure that any resources used by the source are promptly released.

Problem

It all works great until we have an unhappy combination of factors. Consider the following code:

fun main(args: Array<String>) = runBlocking<Unit> {
    // Create the source channel producing numbers 0..9
    val src = produce { repeat(10) { send(it) } }
    // Create the resulting channel by filtering even numbers from the source
    val res = src.filter { it % 2 == 0 }
    // Immediately cancel the resulting channel
    res.cancel()
    // Check if the source was cancelled
    println("source was cancelled = ${src.isClosedForReceive}")
}

Run the code and see that it behaves as expected, printing:

source was cancelled = true

Now, replace the definition of res channel with new one, adding coroutineContext parameter to the filter invocation, so that filtering coroutine runs in the context of runBlocking dispatcher:

    val res = src.filter(coroutineContext) { it % 2 == 0 }

Running this code again produces:

source was cancelled = false

What is going on here? The problem is in the combination of three factors: produce(context) { consume { ... } }, our runBlocking dispatcher, and the fact that the resulting channel is immediately cancelled.

When produce is invoked it does not immediately start running the coroutine, but schedules its execution using the specified context. In the above code we've passed runBlocking context, so it gets scheduled to the main thread for execution instead of being executed right way. Now if we cancel the produce coroutine while it is being scheduled for execution, then it does not run at all, so it does not execute consume and does not cancel the source channel.

It works without problems when we don't specify the context explicitly, because, by default, Unconfined context is used, which starts executing its code immediate until the next suspension point.

Solution 1: CoroutineStart.ATOMIC

We can use produce(context, start = CoroutineStart.ATOMIC) { ... } in implementation of all the operators. It makes sure that coroutine is not cancellable until it starts to execute. This change provides an immediate relief for this problem, but it has a slight unintended side-effect that this "non-cancellable" period extends until the first suspension in the coroutine, while might be too long. It also feels extremely fragile. It is quite easy to forget that start parameter.

Solution 2: Abolish consume, provide onCompletion

We can add additional optional parameter onCompletion: (Throwable?) -> Unit to produce and other similar coroutine builders and completely change the implementation pattern for filter and other operations like this:

fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
    produce(context, onCompletion = { cancel(it) }) {
        for (element in this) {
            if (predicate(element)) send(element)
        }
    }

The advantage of this pattern is that it scales better to operators on multiple channels and makes them less error-prone to write. For example, the current implementation of zip looks like this:

fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
    produce(context) {
        other.consume {
            val otherIterator = other.iterator()
            this@zip.consumeEach { element1 ->
                if (!otherIterator.hasNext()) return@consumeEach
                val element2 = otherIterator.next()
                send(transform(element1, element2))
            }
        }
    }

It is quite nested and magic for its reliance on consume and consumeEach to close the source channels. With onCompletion we can write instead:

fun <E, R, V> ReceiveChannel<E>.zip(other: ReceiveChannel<R>, context: CoroutineContext = Unconfined, transform: (a: E, b: R) -> V): ReceiveChannel<V> =
    produce(context, onCompletion = { 
        cancel(it)
        other.cancel(it)
    }) {
        val otherIterator = other.iterator()
        for (element1 in this) {
            if (!otherIterator.hasNext()) return@produce
            val element2 = otherIterator.next()
            send(transform(element1, element2))
        }
    }

Here the fact that the source channels are closed on completion is much more explicit in the code instead of being hidden, and multiple source channel do not produce additional nesting of consume blocks.

However, that leaves an open question on what to do with consume and consumeEach. One approach is to deprecate consume for its error-proneness. However, we should leave finally { close() } behavior in consumeEach, because consumeEach is a terminal operator that should always make sure that the original channel is fully consumed.

Discussion

Making sure that upstream channels are properly closed is a bane and inherent complexity of "hot" channels, especially when they are backed by actual resources, like network streams. It is not a problem with "cold" channels (see #254) which, by definition, do not allocate any resource until subscribed to. However, just switching to cold channels everywhere does not provide any relief if the domain problem at hand actually calls for a hot channel. Cold channels that are backed by a hot channels suffer from all the same problem that their underlying hot channels have to be somehow closed when they are no longer needed.

@elizarov elizarov added the bug label Mar 12, 2018
@elizarov elizarov changed the title consume: the unreliable way to cancel upstreams source for channel operators consume: the unreliable way to cancel upstream sources for channel operators Mar 12, 2018
@jcornaz
Copy link
Contributor

jcornaz commented Mar 12, 2018

In my opinion solution 2 is better. Especially because it makes custom operator writing even more easier and less error prone.

But I would personally keep consume and consumeEach, because they may be used at other places than in produce. And no matter where a channel is used it should always be easy to make sure that the channel is cancelled in case of error. If used in a regular suspending function consume would still be an easy and safe way to consume a channel.

@fvasco
Copy link
Contributor

fvasco commented Mar 12, 2018

Is it possible to address this issue using the job property (#260)? (Solution 3)

fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
        produce(context, parent = job) {
            for (element in this@filter) {
                if (predicate(element)) send(element)
            }
        }.also { produce ->
            produce.job.invokeOnCompletion {
                this@filter.job.cancel(it)
            }
        }

or something similar...

@elizarov
Copy link
Contributor Author

elizarov commented Mar 12, 2018

@fvasco Solution via an explicit job does not address the core problem that the code in produce { ... } block can never actually run if it is cancelled immediately. If you don't use produce builder, but manually use the launch builder, then you can install invokeOnCompletion yourself:

fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> {
    val channel = Channel<E>()
    val job = launch(context) { 
        for (element in this@filter) {
            if (predicate(element)) channel.send(element)
        }
    }
    job.invokeOnCompletion { this@filter.cancel(it) }
    return channel
}

So, the solution with produce(onCompletion = { ... }) { ... } is basically just a less verbose shortcut for the above solution with launch.

@fvasco
Copy link
Contributor

fvasco commented Mar 12, 2018

@elizarov If produce is canceled immediately then produce.job is completed immediately, should we provide a different behavior?

Personally I don't like the syntax produce(onCompletion = { ... }) (listener as parameter), however I consider using jobs an opportunity to define job's dependency in a declarative way instead of imperative one, like Gradle does.

task taskX(dependsOn: ':projectB:taskY')

elizarov added a commit that referenced this issue Mar 12, 2018
…osing

of the source channels under all circumstances;
`onCompletion` added to `produce` builder;
`ReceiveChannel.cancel: CompletionHandler` extension val;

Fixes #279
@jcornaz
Copy link
Contributor

jcornaz commented Mar 13, 2018

By the way, I'd like to take the opportunity to ask: Why is there a SubscriptionReceiveChannel which is conceptually a ReceiveChannel which implement Closeable by providing a close alias for cancel?

I think it is redundant, because with any ReceiveChannel the user should ensure that the channel is cancelled after usage. And consume for ReceiveChannel basically achieve the same as use does for SubscribtionReceiveChannel.

Or am I missing something?

@elizarov
Copy link
Contributor Author

@jcornaz Good observation. It is redundant legacy artifact: #283

@elizarov
Copy link
Contributor Author

@fvasco I'm trying to make more declarative by providing out-of-the box completion handlers. In the current draft version I'm working on implementation of filter looks like this:

fun <E> ReceiveChannel<E>.filter(context: CoroutineContext = Unconfined, predicate: suspend (E) -> Boolean): ReceiveChannel<E> =
    produce(context, onCompletion = cancellingHandler) {
        for (e in this@filter) {
            if (predicate(e)) send(e)
        }
    }

I'm also open to alternative approaches, like produce(context, dependsOn = this) { ... }, but this is definitely less flexible.

What I like about onCompletion is that you can use to work with wide variery of resource, for example, when reading a file you can first open it, then (if it was opened successfully), start a producer coroutine that closes the file on completion.

@elizarov
Copy link
Contributor Author

elizarov commented Mar 13, 2018

So, I've further played with different names and so far I like best the following:

produce(context, onCompletion = consumes()) { ... }

For multiple source channels operations (like zip) it is going to look like this:

produce(context, onCompletion = consumesAll(this, other)) { ... }

The reason for this particular naming is that in the future we'd like to to fail-fast when multiple consumers are trying to work with the same channel (see #167), so consumes() would do a double-duty of marking the source channel of being consumed and providing a completion handler to cancel remaining on completion of consumer.

@fvasco
Copy link
Contributor

fvasco commented Mar 13, 2018

Hi @elizarov,
I like your enhancements more.

Probably we had different approach of this problem.
You are looking for a channel disruptor, I am finding a channel constructor (the also clause in the my example above).
In such case we fall in the issue #87

@pull-vert
Copy link

Hope this new onCompletion parameter (kind of Callback function) will not lead to "Async Callback style" use of Coroutines, using onCompletion lambda for next statements

@elizarov elizarov self-assigned this Mar 21, 2018
streetsofboston pushed a commit to IntrepidPursuits/kotlinx.coroutines that referenced this issue Apr 20, 2018
of the source channels under all circumstances;
`onCompletion` added to `produce` builder;
`ReceiveChannel.consumes(): CompletionHandler` extension fun.

Fixes Kotlin#279
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants