-
Notifications
You must be signed in to change notification settings - Fork 499
/
ScriptBackgroundCoroutineDispatcher.kt
86 lines (81 loc) · 3.58 KB
/
ScriptBackgroundCoroutineDispatcher.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package org.oppia.android.scripts.common
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.Runnable
import kotlinx.coroutines.asCoroutineDispatcher
import java.io.Closeable
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
/**
* A [CoroutineDispatcher] that's [Closeable] and particularly tailored to be easily used in scripts
* that need to perform parallel tasks for expensive IO. It's highly recommended to exclusively use
* this dispatcher over any others, and to ensure that [close] is called at the end of the script to
* avoid any potential threads hanging (causing the script to not actually close).
*
* Note that the dispatcher attempts to finish any ongoing tasks when [close] is called, but it will
* reject new tasks from being scheduled and it will force terminate if any pending tasks at the
* time of closing don't end within the configured [closeTimeout] provided.
*
* A simple example for using this dispatcher:
* ```kotlin
* ScriptBackgroundCoroutineDispatcher().use { scriptBgDispatcher ->
* val deferred = CoroutineScope(scriptBgDispatcher).async {
* // Expensive task...
* }
* // IMPORTANT: The operation must be observed before use{} ends, otherwise the dispatcher will
* // close and terminate any pending tasks.
* runBlocking { deferred.await() }
* }
* ```
*
* A more complex example for I/O operations:
* ```kotlin
* ScriptBackgroundCoroutineDispatcher().use { scriptBgDispatcher ->
* val deferred = CoroutineScope(scriptBgDispatcher).async {
* withContext(Dispatchers.IO) {
* // Perform I/O using Kotlin's highly parallelized I/O dispatcher, but wait for the result
* // using the background script dispatcher (since execution could continue if other I/O
* // operations need to be kicked off, or if other work can be done alongside the I/O).
* }
* }
* // IMPORTANT: The operation must be observed before use{} ends, otherwise the dispatcher will
* // close and terminate any pending tasks.
* runBlocking { deferred.await() }
* }
* ```
*
* @property closeTimeout the amount of time, in [closeTimeoutUnit] units, that should be waited
* when [close]ing this dispatcher before force-ending ongoing tasks
* @property closeTimeoutUnit the unit of time used for [closeTimeout]
*/
class ScriptBackgroundCoroutineDispatcher(
private val closeTimeout: Long = 5,
private val closeTimeoutUnit: TimeUnit = TimeUnit.SECONDS
) : CoroutineDispatcher(), Closeable {
private val threadPool by lazy { Executors.newCachedThreadPool() }
private val coroutineDispatcher by lazy { threadPool.asCoroutineDispatcher() }
override fun dispatch(context: CoroutineContext, block: Runnable) {
coroutineDispatcher.dispatch(context, block)
}
override fun close() {
threadPool.tryShutdownFully(timeout = closeTimeout, unit = closeTimeoutUnit)
coroutineDispatcher.close()
}
private companion object {
private fun ExecutorService.tryShutdownFully(timeout: Long, unit: TimeUnit) {
// Try to fully shutdown the executor service per https://stackoverflow.com/a/33690603 and
// https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html.
shutdown()
try {
if (!awaitTermination(timeout, unit)) {
shutdownNow()
check(awaitTermination(timeout, unit)) { "ExecutorService didn't fully shutdown: $this." }
}
} catch (e: InterruptedException) {
shutdownNow()
Thread.currentThread().interrupt()
}
}
}
}