Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotlin.coroutines.channels.awaitClose: JobCancellationException #1762

Closed
sellmair opened this issue Jan 15, 2020 · 7 comments
Closed

kotlin.coroutines.channels.awaitClose: JobCancellationException #1762

sellmair opened this issue Jan 15, 2020 · 7 comments
Labels
docs KDoc and API reference

Comments

@sellmair
Copy link
Member

sellmair commented Jan 15, 2020

The awaitClose function currently throws a JobCancellationException if the job was canceled before calling the function.

I would have personally expected that the block passed to awaitClose { } gets executed without throwing this exception.

Sample code:

internal fun SensorManager.consumeValuesAsFlow(
    sensor: Sensor,
    measureInterval: Duration = 20L with TimeUnit.MilliSeconds
): Flow<FloatArray> {
    return callbackFlow {
        val sensorEventListener = object : SensorEventListener {
            override fun onAccuracyChanged(sensor: Sensor?, accuracy: Int) = Unit
            override fun onSensorChanged(event: SensorEvent) {
                catch { offer(event.values) }
            }
        }

        registerListener(
            sensorEventListener,
            sensor,
            measureInterval[TimeUnit.MicroSeconds].toInt(),
            sensorHandler
        )
        try {
            awaitClose { unregisterListener(sensorEventListener) }
        } catch (t: Throwable) {
            Log.e("SensorManager", "consumeValuesAsFlow", t) // called with JobCancellationException
        }
    }
}

Coroutines version 1.3.3
Kotlin version 1.3.61

@qwwdfsad
Copy link
Member

I would have personally expected that the block passed to awaitClose { } gets executed without throwing this exception.

Could you please elaborate on why? awaitClose is a regular cancellable suspend function and thus it behaves like one.

And additional non-suspending block parameter is here to indicate that

  1. A coroutine can be cancelled, so suspension calls in a close sequence may be inappropriate
  2. Code in the block is not semantically the same as writing the same code after awaitClose

@sellmair
Copy link
Member Author

I think what confused me is the statement in the docs:

"Suspends the current coroutine until the channel is either closed or cancelled and invokes the given block before resuming the coroutine."

This made it sound like a good fit for a cleanup function. It waits until the coroutine/channel gets closed/canceled to finally do some last peace of work. It never came to my mind that it would throw an exception if it is already canceled or closed! Intuitively, I thought, it would just continue calling the block (which it does, but throws the exception also)

Maybe my intuition on this function is just different because I used it in that specific context ☺️

@qwwdfsad qwwdfsad added the docs KDoc and API reference label Jan 15, 2020
@qwwdfsad
Copy link
Member

Thanks! I will improve the documentation of awaitClose to be more straightforward, then

@ParadiseHell
Copy link

The awaitClose method is OK which will not throw JobCancellationException because it's a cancellable suspended function.

But the offer method is not OK especially in multi-thread, so maybe the job was canceled, but it still calls the offer method which will throw JobCancellationException.

And the JobCancellationException is really confused because it will have less stack trace, just kotlinx.coroutines.JobCancellationException: Job was cancelled; job=SupervisorJobImpl{Cancelling}@..., so I hope you can give more information about JobCancellationException if it was thrown when coroutine doesn't ignore the Exception.

@ParadiseHell
Copy link

ParadiseHell commented May 4, 2020

@ExperimentalCoroutinesApi
fun main(args: Array<String>) = runBlocking {
  val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
  scope.launch {
    getFlow().collect {
      println(it)
    }
  }
  Thread.sleep(10)
  scope.cancel()
  Thread.sleep(1000)
}

@ExperimentalCoroutinesApi
fun getFlow(): Flow<String> {
  return callbackFlow {
    Thread {
      for (i in 0 until 10) {
        val temp = "flow ==> $i"
        offer(temp)
        Thread.sleep(5)
      }
    }.start()
    Thread.sleep(1000)
  }
  // no awaitClose for better testing
}

This is an example I just test because I got lots of crashes because of the JobCancellationException.

It will not just happen every time, but it's really frequently.

@elizarov
Copy link
Contributor

elizarov commented May 7, 2020

Yes, offer is defined to throw an exception on a closed channel: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/offer.html
This could be surprising and there've been other requests to fix it.
Please, follow #974

@dyvoker
Copy link

dyvoker commented Nov 25, 2020

I wrote an extension function to prevent JobCancellationException in our app. Maybe it will be useful for someone who still looking for a solution. You must replace offer to offerSafe inside of callbackFlow.

inline fun <reified E : Any> ProducerScope<E>.offerSafe(element: E) {
    if (isActive) {
        offer(element)
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs KDoc and API reference
Projects
None yet
Development

No branches or pull requests

5 participants