Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation #4091

Open
rusmonster opened this issue Apr 5, 2024 · 12 comments
Labels

Comments

@rusmonster
Copy link

rusmonster commented Apr 5, 2024

Describe the bug

The following test works on Android, but crashes on iOS:

    @Test
    fun reproduce() = runBlocking {
        val dispatcher = newSingleThreadContext("MyDispatcher")

        launch(dispatcher) {
            try {
                withContext(Dispatchers.Default) {
                    println("!!! before delay")
                    delay(1000)
                    println("!!! after delay")
                }
            } catch (e: Exception) {
                println("!!! Ignored exception: $e")
            }
        }

        delay(200)

        println("!!! before close")
        dispatcher.close()
        println("!!! after close")
        println("!!! DONE !!!")
    }

Output form Android Emulator Arm64 (no crashes, everything works as expected):

        I  !!! before delay
        I  !!! before close
        I  !!! after close
        I  !!! DONE !!!
        I  !!! after delay
        I  !!! Ignored exception: java.util.concurrent.CancellationException: The task was rejected

run finished: 1 tests, 0 failed, 0 ignored

Output from iOS Simulator Arm64 (test crashed):

1 test completed, 1 failed
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':sync-android-kt:iosSimulatorArm64Test'.
> Test running process exited unexpectedly.
  Current test: reproduce
  Process output:
  !!! before delay
  !!! before close
  !!! after close
  !!! DONE !!!
  !!! after delay
  kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[DarwinGlobalQueueDispatcher@53c0308, Continuation @ 6]){Completed}@79400a0. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
      at 0   test.kexe                           0x102735883        kfun:kotlin.Error#<init>(kotlin.String?;kotlin.Throwable?){} + 143 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:14:63)
      at 1   test.kexe                           0x10298ba27        kfun:kotlinx.coroutines.CoroutinesInternalError#<init>(kotlin.String;kotlin.Throwable){} + 123 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/Exceptions.common.kt:23:77)
      at 2   test.kexe                           0x1029f242f        kfun:kotlinx.coroutines.DispatchedTask#handleFatalException(kotlin.Throwable?;kotlin.Throwable?){} + 755 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:142:22)
      at 3   test.kexe                           0x1029f2113        kfun:kotlinx.coroutines.DispatchedTask#run(){} + 2455 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:113:13)
      at 4   test.kexe                           0x102a1b9bb        kfun:kotlinx.coroutines.Runnable#run(){}-trampoline + 91 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Runnable.kt:10:19)
      at 5   test.kexe                           0x102a17067        kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.dispatch$lambda$0#internal + 119 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:22:23)
      at 6   test.kexe                           0x102a170c3        kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.$dispatch$lambda$0$FUNCTION_REFERENCE$0.invoke#internal + 71 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:21:13)
      at 7   test.kexe                           0x102a17193        kfun:kotlinx.coroutines.DarwinGlobalQueueDispatcher.$dispatch$lambda$0$FUNCTION_REFERENCE$0.$<bridge-UNN>invoke(){}#internal + 71 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:21:13)
      at 8   test.kexe                           0x102865807        kfun:kotlin.Function0#invoke(){}1:0-trampoline + 99 (/Users/teamcity/.gradle/daemon/8.2.1/[K][Suspend]Functions:1:1)
      at 9   test.kexe                           0x102a18d13        _6f72672e6a6574627261696e732e6b6f746c696e783a6b6f746c696e782d636f726f7574696e65732d636f72652f6f70742f6275696c644167656e742f776f726b2f343465633665383530643563363366302f6b6f746c696e782d636f726f7574696e65732d636f72652f6e617469766544617277696e2f7372632f44697370617463686572732e6b74_knbridge8 + 191 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/nativeDarwin/src/Dispatchers.kt:21:13)
      at 10  libdispatch.dylib                   0x18016b4f3        _dispatch_call_block_and_release + 23 
      at 11  libdispatch.dylib                   0x18016cd3b        _dispatch_client_callout + 15 
      at 12  libdispatch.dylib                   0x18017f5d3        _dispatch_root_queue_drain + 1075 
      at 13  libdispatch.dylib                   0x18017fd57        _dispatch_worker_thread2 + 231 
      at 14  libsystem_pthread.dylib             0x1045cf8e7        _pthread_wqthread + 223 
      at 15  libsystem_pthread.dylib             0x1045ce6e3        start_wqthread + 7 
      Suppressed: kotlinx.coroutines.internal.DiagnosticCoroutineContextException: [DispatchedCoroutine{Completed}@4661750, DarwinGlobalQueueDispatcher@53c0308]
          at 0   test.kexe                           0x10273c19b        kfun:kotlin.Throwable#<init>(kotlin.String?){} + 119 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Throwable.kt:28:37)
          at 1   test.kexe                           0x102735977        kfun:kotlin.Exception#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:23:44)
  Uncaught Kotlin exception:         at 2   test.kexe                           0x102735b97        kfun:kotlin.RuntimeException#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:34:44)
          at 3   test.kexe                           0x102a15593        kfun:kotlinx.coroutines.internal.DiagnosticCoroutineContextException#<init>(kotlin.coroutines.CoroutineContext){} + 167 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/internal/CoroutineExceptionHandlerImpl.kt:27:5)
          at 4   test.kexe                           0x1029ee17f        kfun:kotlinx.coroutines.internal#handleUncaughtCoroutineException(kotlin.coroutines.CoroutineContext;kotlin.Throwable){} + 647 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/CoroutineExceptionHandlerImpl.common.kt:43:33)
          at 5   test.kexe                           0x1029843ab        kfun:kotlinx.coroutines#handleCoroutineException(kotlin.coroutines.CoroutineContext;kotlin.Throwable){} + 515 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CoroutineExceptionHandler.kt:28:5)
          at 6   test.kexe                           0x1029f247b        kfun:kotlinx.coroutines.DispatchedTask#handleFatalException(kotlin.Throwable?;kotlin.Throwable?){} + 831 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:144:9)
          at 7   test.kexe                           0x1029f2113        kfun:kotlinx.coroutines.DispatchedTask#run(){} + 2455 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:113:13)
          ... and 12 more common stack frames skipped
  Caused by: kotlin.IllegalStateException: Dispatcher MyDispatcher was closed, attempted to schedule: DispatchedContinuation[MultiWorkerDispatcher@4584120, Continuation @ 7]
      at 0   test.kexe                           0x10273c19b        kfun:kotlin.Throwable#<init>(kotlin.String?){} + 119 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Throwable.kt:28:37)
      at 1   test.kexe                           0x102735977        kfun:kotlin.Exception#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:23:44)
      at 2   test.kexe                           0x102735b97        kfun:kotlin.RuntimeException#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:34:44)
      at 3   test.kexe                           0x1027361bf        kfun:kotlin.IllegalStateException#<init>(kotlin.String?){} + 115 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/Exceptions.kt:70:44)
      at 4   test.kexe                           0x102a1243f        kfun:kotlinx.coroutines.MultiWorkerDispatcher.dispatch#internal + 819 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/MultithreadedDispatchers.kt:128:23)
      at 5   test.kexe                           0x102a1aad7        kfun:kotlinx.coroutines.CoroutineDispatcher#dispatch(kotlin.coroutines.CoroutineContext;kotlinx.coroutines.Runnable){}-trampoline + 67 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/CoroutineDispatcher.kt:<unknown>)
      at 6   test.kexe                           0x1029f08f7        kfun:kotlinx.coroutines.internal#resumeCancellableWith__at__kotlin.coroutines.Continuation<0:0>(kotlin.Result<0:0>;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?){0§<kotlin.Any?>} + 1019 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt:278:64)
      at 7   test.kexe                           0x1029f0f7b        kfun:kotlinx.coroutines.internal#resumeCancellableWith$default__at__kotlin.coroutines.Continuation<0:0>(kotlin.Result<0:0>;kotlin.Function1<kotlin.Throwable,kotlin.Unit>?;kotlin.Int){0§<kotlin.Any?>} + 287 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt:274:8)
      at 8   test.kexe                           0x102978e77        kfun:kotlinx.coroutines.DispatchedCoroutine#afterResume(kotlin.Any?){} + 311 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/Builders.common.kt:257:29)
      at 9   test.kexe                           0x102a190ab        kfun:kotlinx.coroutines.AbstractCoroutine#afterResume(kotlin.Any?){}-trampoline + 59 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt:<unknown>)
      at 10  test.kexe                           0x102976a8b        kfun:kotlinx.coroutines.AbstractCoroutine#resumeWith(kotlin.Result<1:0>){} + 303 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/AbstractCoroutine.kt:99:9)
      at 11  test.kexe                           0x102868e03        kfun:kotlin.coroutines.Continuation#resumeWith(kotlin.Result<1:0>){}-trampoline + 99 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/libraries/stdlib/src/kotlin/coroutines/Continuation.kt:26:12)
      at 12  test.kexe                           0x102740fd7        kfun:kotlin.coroutines.native.internal.BaseContinuationImpl#resumeWith(kotlin.Result<kotlin.Any?>){} + 1163 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/kotlin-native/runtime/src/main/kotlin/kotlin/coroutines/ContinuationImpl.kt:43:32)
      at 13  test.kexe                           0x102868e03        kfun:kotlin.coroutines.Continuation#resumeWith(kotlin.Result<1:0>){}-trampoline + 99 (/opt/buildAgent/work/2fed3917837e7e79/kotlin/libraries/stdlib/src/kotlin/coroutines/Continuation.kt:26:12)
      at 14  test.kexe                           0x1029f1ed3        kfun:kotlinx.coroutines.DispatchedTask#run(){} + 1879 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:104:71)
      at 15  test.kexe                           0x102a1b9bb        kfun:kotlinx.coroutines.Runnable#run(){}-trampoline + 91 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Runnable.kt:10:19)
      ... and 11 more common stack frames skipped
  Child process terminated with signal 6: Abort trap
* Try:
> Run with --stacktrace option to get the stack trace.
> Run with --info or --debug option to get more log output.
> Run with --scan to get full insights.
* Get more help at https://help.gradle.org
BUILD FAILED in 1m 3s

Maybe the root cause is related to fact that on iOS IllegalStateException is thrown when flow returns from the withContext(Dispatchers.Default) section, in case of android the output above demonstrates that the CancellationException is thrown at the same place.

Versions

    androidGradlePluginVersion = "8.1.2"
    kotlinVersion = '1.9.23'
    kotlinCoroutinesVersion = "1.8.0"
    skieVersion = "0.6.2"
    
    classpath "com.android.tools.build:gradle:$androidGradlePluginVersion"
    classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion"

    implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlinVersion"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinCoroutinesVersion"
    
    plugins {
      id 'org.jetbrains.kotlin.multiplatform'
      id 'com.android.library'
      id "co.touchlab.skie" version "$skieVersion"
    }

@rusmonster rusmonster added the bug label Apr 5, 2024
@dkhalanskyjb
Copy link
Collaborator

Interestingly, for me, this test crashes on the JVM as well.

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 8, 2024

It also crashes for me on the JVM. On Android, it's either timing (so Ignored exception is printed) or slightly different executor/dispatcher setup.

The cause is straightforward -- there are active coroutines running on the executor that is concurrently closed.
So, any attempt to (re)dispatch any coroutine on that dispatcher fails. Moreover, often there is no place in the code (or this place might be unrelated) to rethrow such an exception, thus an error. It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).

So I won't triage it as a bug. We can document it better and maybe (but it's really debatable topic) treat it not as an internal error, but instead raise global exception handler immediately (which, on Android, will crash the app)

@dkhalanskyjb
Copy link
Collaborator

So I won't triage it as a bug.

The error message clearly states that any such error is worth reporting to us:

kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(DispatchedContinuation[DarwinGlobalQueueDispatcher@53c0308, Continuation @ 6]){Completed}@79400a0. Please read KDoc to 'handleFatalException' method and report this incident to maintainers

So, I don't think we can ignore it when people do just that.

Also, I think this actually is an issue on our side: since there is no guarantee that dispatch doesn't throw, the places where it throws shouldn't be treated as surprises (that is, internal errors).

@qwwdfsad
Copy link
Member

qwwdfsad commented Apr 8, 2024

True, we have to acknowledge that.

since there is no guarantee that dispatch doesn't throw

The documentation (somewhat vaguely, though) states the following:

This method should generally be exception-safe. An exception thrown from this method may leave the coroutines that use this dispatcher in an inconsistent and hard-to-debug state.

We can re-visit the corresponding places again, though there are not that many alternatives -- internal error or immediate crash (handleCoroutineException, which is also invoked by the internal error path).
We have another place like this -- ThreadContextElement (the documentation to handleFatalException mentions it)

@rusmonster
Copy link
Author

rusmonster commented Apr 10, 2024

It cannot be reasonably ignored -- it indicates something went really wrong (e.g. it might imply that your finally blocks in suspend functions didn't execute).

So what's correct way to close the dispatcher?

My real use case is simple: I'm implementing a client which setup websocket connection to backend and creates singleThreadDispatcher.

When an event is received from the websocket - the client parses it on singleThreadDispatcher then updates database withContext(Dispatchers.IO) and emitting an onUpdated event to outside.

Also the client has shutdown() method where it closes the websocket connection and the singleThreadDispatcher.
Obviously shutdown could be called at any moment of time.

So my code looks like:

class Client {
    private val dispatcher = newSingleThreadContext("MyDispatcher")
    private val scope = CoroutineScope(dispatcher)
    private var websocket: WebSocket? = null

    val onUpdated = MutableSharedFlow<Event>()

    private fun onWebsocketMessage(message: String) {
        scope.launch {
            val event = parseMessage(message)

            withContext(Dispatchers.IO) {
                updateDB(event)
            }

            processEvent(event)
            onUpdated.emit(event)
        }
    }

    fun shutdown() {
        websocket?.close()
        scope.cancel()
        dispatcher.close()
    }
}

Is there a better approach than just maintain collection of ioDispatcherJobs and joinAll them in the shutdown() method?

Like:

class Client {
    private val dispatcher = newSingleThreadContext("MyDispatcher")
    private val scope = CoroutineScope(dispatcher + SupervisorJob())
    private var websocket: WebSocket? = null

    val onUpdated = MutableSharedFlow<Event>()

    private val ioDispatcherJobs = mutableListOf<Job>()

    private fun onWebsocketMessage(message: String) {
        scope.launch {
            val event = parseMessage(message)

            val job = launch(Dispatchers.IO, CoroutineStart.LAZY) {
                updateDB(event)
            }

            ioDispatcherJobs += job
            job.join()
            ioDispatcherJobs -= job

            processEvent(event)
            onUpdated.emit(event)
        }
    }

    suspend fun shutdown() {
        websocket?.close()

        withContext(dispatcher) {
            ioDispatcherJobs.joinAll()
        }

        scope.cancel()
        dispatcher.close()
    }
}

@dkhalanskyjb
Copy link
Collaborator

May I suggest not using newSingleThreadContext at all, instead doing Dispatchers.IO.limitedParallelism(1)? This way, you won't need to close the dispatcher at all.

@rusmonster
Copy link
Author

Correct me if I'm wrong, but I don't see in documentation thatlimitedParallelism(1) guarantees execution on single thread.
It means that I have to synchronize access to data everywhere, for example in my processEvent method:

val allEvents = mutableListOf<Event>()
val mutex = Mutex()

fun processEvent(event: Event) {
    mutex.withLock { 
        allEvents += event
    }    
}

Correct?

Which is exactly what I'm trying to avoid by using newSingleThreadContext

@dkhalanskyjb
Copy link
Collaborator

I don't see in documentation that limitedParallelism(1) guarantees execution on single thread.

It doesn't guarantee that this will always run on the same thread, so if you have things like thread local variables, yes, limitedParallelism on its own won't help you; but it does guarantee that the parallelism will be at most 1, or, in other words, at most one thread at a time will execute the code scheduled on that dispatcher. So no, you don't need mutexes: only one thread at a time (though possibly a different one between calls) can call processEvent.

@rusmonster
Copy link
Author

Thank you, that make sense. Two more questions regarding limitedParallelism(1) then:

  1. Does it guarantees FIFO order of operations?
  2. As it could be executed on different threads I still have to to use atomic vars, otherwise it's not guaranteed that I read last value set to a var by a different thread. Correct?
var eventCounter by atomic(0)

fun processEvent(event: Event) {
  eventCounter++
}

@dkhalanskyjb
Copy link
Collaborator

  1. Yes, it stores a queue internally.
  2. No, the happens-before relationship is guaranteed by the coroutines machinery.

@rusmonster
Copy link
Author

Thank you so much! I'll try to go with limitedParallelism(1)

Regarding initial issue - I would expect the same behaviour on all platforms. So one of possible solution is to make the test crash on Android the same way how it crashes on iOS and JVM.

In this case common code debugged once on android - will work on other platforms without changes.

@dkhalanskyjb
Copy link
Collaborator

@rusmonster, could you please explain why you thought that limitedParellelism was unsuitable? @qwwdfsad found this misleading piece of information: https://github.com/KStateMachine/kstatemachine/blob/master/docs/index.md#use-single-threaded-coroutinescope Are there any other ones?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants