Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClosedSendChannelException for callbackFlow #1770

Closed
ThanosFisherman opened this issue Jan 20, 2020 · 5 comments
Closed

ClosedSendChannelException for callbackFlow #1770

ThanosFisherman opened this issue Jan 20, 2020 · 5 comments

Comments

@ThanosFisherman
Copy link

ThanosFisherman commented Jan 20, 2020

I'm trying to convert an Android CountDownTimer into emitting values via callbackFlow but it throws ClosedSendChannelException: Channel was closed when collecting the items. I can't really find a valid reason why this happens.

@ExperimentalCoroutinesApi
class TimerFlow private constructor(millisInFuture: Long, countDownInterval: Long) {

    private val tick: Flow<Long> = callbackFlow {

        object : CountDownTimer(millisInFuture, countDownInterval) {
            override fun onFinish() {
                offer(0L)
            }

            override fun onTick(millisUntilFinished: Long) {
                Log.i("TimerFlow","$millisUntilFinished")
                offer(millisUntilFinished)
            }
        }.start()
    }

    companion object {
        /**
         * Create a [Flow] that will be a countdown until a specified time in the future.
         *
         * @param millisInFuture The milliseconds in the future that this will countdown to.
         * @param countDownInterval The minimum amount of time between emissions.
         */
        @JvmStatic
        fun create(millisInFuture: Long, countDownInterval: Long) =
            TimerFlow(millisInFuture, countDownInterval).tick
    }
}

Is this a bug or am I missing something? I'm new to Coroutines.

@circusmagnus
Copy link

circusmagnus commented Jan 20, 2020

Reason is that callbackFlow block closes the (hidden under the hood) channel, as soon, as everything within

callbackFlow {
...
}

brackets gets executed. So you start your timer and callbackFlow thinks you are done, so it closes the channel, and your timer cannot offer it any values.

In order to keep your callbackFlow alive, while the Timer is ticking, you need to suspend it. Preferably like this:

callbackFlow {
   //register listener / timer
    awaitClose { //unregister your listener, which is calling 'offer' }
}

The awaitClose {} will keep callbackFlow active, until it gets cancelled manually (either via your listener or by the consumer of the flow).

I agree, that it is not very intuitive, especially for former RxJava users, where Observable.create behaves in opposite way - it will stay alive by default, but you need to complete it manually.

Perhaps having callbackFlow behave, like Observable.create would be a good thing (and channelFlow staying with the current implementation). I do not know.

PS: edited wrong function name

@qwwdfsad
Copy link
Member

The above answer is generally correct.
This behaviour is described in the documentation and proposes a way to resolve it:

Use awaitClose as the last statement to keep it running. awaitClose argument is called when either flow consumer cancels flow collection or when callback-based API invokes SendChannel.close manually.

This is done intentionally to indicate that not only consumer can complete the flow, but it also can be cancelled (and then callback should be unregistered).
What we can do better here is to add a clear exception message if callbackFlow is closed without by its block

@circusmagnus
Copy link

Maybe callbackFlow should accept two lambdas as it parameters:

  • what to do onStart (current lambda)
  • what to do onClose(as most of the time you need to unregister your callback, so it should have been a mandatory thing, not some optional function invocation).

And it would be 'always active' by default.

Again, I'm not sure. Just food for thought.

@qwwdfsad
Copy link
Member

qwwdfsad commented Jan 20, 2020

Thanks for the input. We were considering a similar API initially but found that it has a way too many limitations.

Consider the following example from the documentation:

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow {
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            offer(value)
        }
        ... other methods ...
    }
    api.register(callback)
    // Suspend until either onCompleted or external cancellation are invoked
    awaitClose { api.unregister(callback) }
}

To properly close this API, you should have an instance of the callback in your hands. So fun <T> callbackFlow(onClose: () -> Unit, onStart: (ProducerScope<T>.() -> Unit)) is not going to work.

It can be changed to fun <T, C> callbackFlow(onClose: (C) -> Unit, onStart: (ProducerScope<T>.() -> C) though.

But what if both api and callback are created within a onStart? What if onStart and onClose share some common local variables, e.g. diagnostic info? API immediately becomes bloated and unreadable, so awaitClose was the lesser evil

@ThanosFisherman
Copy link
Author

ThanosFisherman commented Jan 20, 2020

Thanks for the heads up guys. It makes sense now. Coming from RxJava I would expect a behavior similar to observable where you manually have to call onComplete() to terminate it.

Now that the rationale was given I'm not sure if it is a good idea to make channelFlow/callbackFlow stay alive by default or if it should even be intended as an observable alternative. You guys decide that.

What we can do better here is to add a clear exception message if callbackFlow is closed without by its block

Yes that would be a good idea I think.

So I modified a little bit my code to look like this:

@ExperimentalCoroutinesApi
class TimerFlow private constructor(millisInFuture: Long, countDownInterval: Long) {


    private val tick: Flow<Long> = callbackFlow {

        if (Looper.myLooper() == null) {
            throw IllegalStateException("Can't create TimerFlow inside thread that has not called Looper.prepare() Just use Dispatchers.Main")
        }

        object : CountDownTimer(millisInFuture, countDownInterval) {
            override fun onFinish() {
                cancel()
            }

            override fun onTick(millisUntilFinished: Long) {
                offer(millisUntilFinished)
            }
        }.start()

        awaitClose()
    }

    companion object {
        /**
         * Create a [Flow] that will be a countdown until a specified time in the future.
         *
         * @param millisInFuture The milliseconds in the future that this will countdown to.
         * @param countDownInterval The minimum amount of time between emissions.
         */
        @JvmStatic
        fun create(millisInFuture: Long, countDownInterval: Long) =
            TimerFlow(millisInFuture, countDownInterval).tick
    }
}

Afterwards I call the above class like this:

class MainActivity : AppCompatActivity() {

    @ExperimentalCoroutinesApi
    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)
        btnStart.setOnClickListener {
            CoroutineScope(Dispatchers.Main).launch {
                setCountDown(5000, 1000)
            }
        }
    }

    @ExperimentalCoroutinesApi
    private suspend fun setCountDown(millisInFuture: Long, countDownInterval: Long) {

        TimerFlow.create(millisInFuture, countDownInterval).collect {
            Log.i("main", it.toString())
            textView.text = it.toString()
        }
    }
}

qwwdfsad added a commit that referenced this issue Jan 20, 2020
qwwdfsad added a commit that referenced this issue Jan 20, 2020
qwwdfsad added a commit that referenced this issue Jan 29, 2020
qwwdfsad added a commit that referenced this issue Feb 11, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants