Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast close with CoroutineStart.DEFAULT produces an unexpected exception #1713

Closed
pauliusrum opened this issue Dec 16, 2019 · 5 comments
Closed
Assignees
Labels

Comments

@pauliusrum
Copy link

Kotlin: 1.3.61
Coroutines: 1.3.3

runBlocking {
    val a = produce { 
        while (true) send(1) 
    }
    val b = a.broadcast(start = CoroutineStart.DEFAULT)
    b.close()
}

Problem: send(1) throws an exception after b.close() is called.
Expected behavior: send(1) should suspend instead of throwing an exception.

@elizarov elizarov self-assigned this Dec 16, 2019
@elizarov
Copy link
Contributor

Why would you expect the sender to suspend instead of throwing an exception? See, conceptually broadcast() is designed as a consuming operation, just like all the other channel operators. The idea is that once you start to broadcast the original channel is not used for any other purpose, so now calling b.close() should not only terminate the broadcast but close the original channel, too.

@pauliusrum
Copy link
Author

pauliusrum commented Dec 16, 2019

I see what you are saying but if we have the same example just with CoroutineStart.LAZY

runBlocking {
    val a = produce {
        while (true) send(1)
    }
    val b = a.broadcast()
    b.close()
}

In this case send(1) suspends instead of throwing an exception right? Why is it in one case it suspends in the other it throws an exception?

@elizarov
Copy link
Contributor

That looks like a bug. Should be the same with lazy -- closing the broadcast channel should be closing the original channel.

@antanas-arvasevicius
Copy link

to clarify a bit:
Reason that this throws exception is here: "onCompletion = consumes()" in broadcast()
This is intended to cancel upstream and gets called only when BroadcastCoroutine completes.
Problem is that then we are calling "close()" it closes all broadcast subscriptions and itself first and only after that BroadcastCoroutine completes and consumes() get called.
I think it must be in this order:

when close() is called:

  1. do cancel upstream (source channel)
  2. then upstream is cancelled - close all subscription and broadcast itself.
  3. finish BroadcastCoroutine

Currently it is:

  1. close broadcast channel and all its subscriptions
  2. complete coroutine
  3. cancel upstream (source channel)
fun <E> ReceiveChannel<E>.broadcast(
    capacity: Int = 1,
    start: CoroutineStart = CoroutineStart.LAZY
): BroadcastChannel<E> =
    GlobalScope.broadcast(Dispatchers.Unconfined, 
capacity = capacity, 
start = start, 
onCompletion = consumes()) {
        for (e in this@broadcast) {
            send(e)
        }
    }

elizarov added a commit that referenced this issue Dec 25, 2019
Documentation on broadcast operators is added that explain the the resulting BroadcastChannel shall be cancelled if it is not needed anymore.

Fixes #1713
elizarov added a commit that referenced this issue Dec 25, 2019
* Documentation on broadcast operators is added that explains that the resulting BroadcastChannel shall be cancelled if it is not needed anymore.
* More tests added for various broadcast cancel/close cases.
* The only functional change is that closing a broadcast channel for lazy coroutine shall start the corresponding coroutine to give it a chance to promptly fail.
* Mark broadcast operators as obsolete. To be replaced with sharing operators on flows (see #1716).

Fixes #1713
@elizarov
Copy link
Contributor

Some additional explanaition here.

When you close a broadcast channel it makes the concurrent send operation in your broadcast { .. } to fail. If you want to stop broadcasting coroutine you should use cancel instead. I'm adding more documentation with respect to this in PR #1717 and also fix a problem with the fact that closing a lazily broadcast coroutine does nothing. The fix is to actually start it so that it has its chance to fail.

elizarov added a commit that referenced this issue Dec 26, 2019
* Documentation on broadcast operators is added that explains that the resulting BroadcastChannel shall be cancelled if it is not needed anymore.
* More tests added for various broadcast cancel/close cases.
* The only functional change is that closing a broadcast channel for lazy coroutine shall start the corresponding coroutine to give it a chance to promptly fail.
* Mark broadcast operators as obsolete. To be replaced with sharing operators on flows (see #1716).

Fixes #1713
elizarov added a commit that referenced this issue Feb 17, 2020
* Documentation on broadcast operators is added that explains that the resulting BroadcastChannel shall be cancelled if it is not needed anymore.
* More tests added for various broadcast cancel/close cases.
* The only functional change is that closing a broadcast channel for lazy coroutine shall start the corresponding coroutine to give it a chance to promptly fail.
* Mark broadcast operators as obsolete. To be replaced with sharing operators on flows (see #1716).

Fixes #1713
elizarov added a commit that referenced this issue Mar 16, 2020
* Documentation on broadcast operators is added that explains that the resulting BroadcastChannel shall be cancelled if it is not needed anymore.
* More tests added for various broadcast cancel/close cases.
* The only functional change is that closing a broadcast channel for lazy coroutine shall start the corresponding coroutine to give it a chance to promptly fail.
* Mark broadcast operators as obsolete. To be replaced with sharing operators on flows (see #1716).

Fixes #1713
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants