forked from Kotlin/kotlinx.coroutines
/
DispatchedTask.kt
167 lines (150 loc) · 7.23 KB
/
DispatchedTask.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package kotlinx.coroutines
import kotlinx.coroutines.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
@PublishedApi internal const val MODE_ATOMIC_DEFAULT = 0 // schedule non-cancellable dispatch for suspendCoroutine
@PublishedApi internal const val MODE_CANCELLABLE = 1 // schedule cancellable dispatch for suspendCancellableCoroutine
@PublishedApi internal const val MODE_UNDISPATCHED = 2 // when the thread is right, but need to mark it with current coroutine
internal val Int.isCancellableMode get() = this == MODE_CANCELLABLE
internal val Int.isDispatchedMode get() = this == MODE_ATOMIC_DEFAULT || this == MODE_CANCELLABLE
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
internal abstract val delegate: Continuation<T>
internal abstract fun takeState(): Any?
internal open fun cancelResult(state: Any?, cause: Throwable) {}
@Suppress("UNCHECKED_CAST")
internal open fun <T> getSuccessfulResult(state: Any?): T =
state as T
internal fun getExceptionalResult(state: Any?): Throwable? =
(state as? CompletedExceptionally)?.cause
public final override fun run() {
val taskContext = this.taskContext
var fatalException: Throwable? = null
try {
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement) {
val exception = getExceptionalResult(state)
val job = if (resumeMode.isCancellableMode) context[Job] else null
/*
* Check whether continuation was originally resumed with an exception.
* If so, it dominates cancellation, otherwise the original exception
* will be silently lost.
*/
if (exception == null && job != null && !job.isActive) {
val cause = job.getCancellationException()
cancelResult(state, cause)
continuation.resumeWithStackTrace(cause)
} else {
if (exception != null) continuation.resumeWithException(exception)
else continuation.resume(getSuccessfulResult(state))
}
}
} catch (e: Throwable) {
// This instead of runCatching to have nicer stacktrace and debug experience
fatalException = e
} finally {
val result = runCatching { taskContext.afterTask() }
handleFatalException(fatalException, result.exceptionOrNull())
}
}
/**
* Machinery that handles fatal exceptions in kotlinx.coroutines.
* There are two kinds of fatal exceptions:
*
* 1) Exceptions from kotlinx.coroutines code. Such exceptions indicate that either
* the library or the compiler has a bug that breaks internal invariants.
* They usually have specific workarounds, but require careful study of the cause and should
* be reported to the maintainers and fixed on the library's side anyway.
*
* 2) Exceptions from [ThreadContextElement.updateThreadContext] and [ThreadContextElement.restoreThreadContext].
* While a user code can trigger such exception by providing an improper implementation of [ThreadContextElement],
* we can't ignore it because it may leave coroutine in the inconsistent state.
* If you encounter such exception, you can either disable this context element or wrap it into
* another context element that catches all exceptions and handles it in the application specific manner.
*
* Fatal exception handling can be intercepted with [CoroutineExceptionHandler] element in the context of
* a failed coroutine, but such exceptions should be reported anyway.
*/
internal fun handleFatalException(exception: Throwable?, finallyException: Throwable?) {
if (exception === null && finallyException === null) return
if (exception !== null && finallyException !== null) {
exception.addSuppressedThrowable(finallyException)
}
val cause = exception ?: finallyException
val reason = CoroutinesInternalError("Fatal exception in coroutines machinery for $this. " +
"Please read KDoc to 'handleFatalException' method and report this incident to maintainers", cause!!)
handleCoroutineException(this.delegate.context, reason)
}
}
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) {
val delegate = this.delegate
if (mode.isDispatchedMode && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
// dispatch directly using this instance's Runnable implementation
val dispatcher = delegate.dispatcher
val context = delegate.context
if (dispatcher.isDispatchNeeded(context)) {
dispatcher.dispatch(context, this)
} else {
resumeUnconfined()
}
} else {
resume(delegate, mode)
}
}
@Suppress("UNCHECKED_CAST")
internal fun <T> DispatchedTask<T>.resume(delegate: Continuation<T>, useMode: Int) {
// slow-path - use delegate
val state = takeState()
val exception = getExceptionalResult(state)?.let { recoverStackTrace(it, delegate) }
val result = if (exception != null) Result.failure(exception) else Result.success(getSuccessfulResult<T>(state))
when (useMode) {
MODE_ATOMIC_DEFAULT -> delegate.resumeWith(result)
MODE_CANCELLABLE -> delegate.resumeCancellableWith(result)
MODE_UNDISPATCHED -> (delegate as DispatchedContinuation).resumeUndispatchedWith(result)
else -> error("Invalid mode $useMode")
}
}
private fun DispatchedTask<*>.resumeUnconfined() {
val eventLoop = ThreadLocalEventLoop.eventLoop
if (eventLoop.isUnconfinedLoopActive) {
// When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
eventLoop.dispatchUnconfined(this)
} else {
// Was not active -- run event loop until all unconfined tasks are executed
runUnconfinedEventLoop(eventLoop) {
resume(delegate, MODE_UNDISPATCHED)
}
}
}
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (true) {
// break when all unconfined continuations where executed
if (!eventLoop.processUnconfinedEvent()) break
}
} catch (e: Throwable) {
/*
* This exception doesn't happen normally, only if we have a bug in implementation.
* Report it as a fatal exception.
*/
handleFatalException(e, null)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
}
@Suppress("NOTHING_TO_INLINE")
internal inline fun Continuation<*>.resumeWithStackTrace(exception: Throwable) {
resumeWith(Result.failure(recoverStackTrace(exception, this)))
}