Skip to content

Commit

Permalink
Avoid OOM in thread-pool dispatchers, try to reflectively invoke setR…
Browse files Browse the repository at this point in the history
…emoveFutureOnCancel on executor instance and use default dispatcher if attempt failed

Fixes #571
  • Loading branch information
qwwdfsad committed Sep 25, 2018
1 parent 8dceb17 commit 08e5b4f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 19 deletions.
63 changes: 45 additions & 18 deletions core/kotlinx-coroutines-core/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

package kotlinx.coroutines.experimental

import kotlinx.coroutines.experimental.internal.*
import kotlinx.coroutines.experimental.timeunit.TimeUnit
import java.io.*
import java.io.Closeable
import java.util.concurrent.*
import kotlin.coroutines.experimental.*

Expand Down Expand Up @@ -63,40 +64,66 @@ public fun ExecutorService.asCoroutineDispatcher_Deprecated(): CloseableCoroutin
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
asCoroutineDispatcher()

private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
initFutureCancellation()
}
}

/**
* @suppress **This is unstable API and it is subject to change.**
*/
public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = false

internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
}

override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}

/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
continuation.cancelFutureOnCancellation(timeout)
else
DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), time, unit)
} else {
null
}

if (future != null) {
continuation.cancelFutureOnCancellation(future)
return
}

DefaultExecutor.scheduleResumeAfterDelay(time, unit, continuation)
}

override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(block, time, unit) }
catch (e: RejectedExecutionException) { null }
return if (timeout != null)
DisposableFutureHandle(timeout)
else
DefaultExecutor.invokeOnTimeout(time, unit, block)
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, time, unit)
} else {
null
}

return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(time, unit, block)
}

private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
} catch (e: RejectedExecutionException) {
null
}
}

override fun close() {
Expand Down
7 changes: 6 additions & 1 deletion core/kotlinx-coroutines-core/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.experimental

import kotlinx.coroutines.experimental.internal.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.experimental.*
Expand Down Expand Up @@ -62,12 +63,16 @@ public class ThreadPoolDispatcher internal constructor(
private val nThreads: Int,
private val name: String
) : ExecutorCoroutineDispatcherBase() {
private val threadNo = AtomicInteger()

private val threadNo = AtomicInteger()
override val executor: Executor = Executors.newScheduledThreadPool(nThreads) { target ->
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}

init {
initFutureCancellation()
}

/**
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
*/
Expand Down
18 changes: 18 additions & 0 deletions core/kotlinx-coroutines-core/src/internal/Concurrent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.experimental.internal

import java.lang.reflect.*
import java.util.concurrent.*
import kotlin.concurrent.withLock as withLockJvm

Expand All @@ -13,3 +14,20 @@ internal actual fun <E> subscriberList(): SubscribersList<E> = CopyOnWriteArrayL
internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLock

internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)

private val REMOVE_FUTURE_ON_CANCEL: Method? = try {
ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java)
} catch (e: Throwable) {
null
}

@Suppress("NAME_SHADOWING")
internal fun removeFutureOnCancel(executor: Executor): Boolean {
try {
val executor = executor as? ScheduledExecutorService ?: return false
(REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true)
return true
} catch (e: Throwable) {
return true
}
}

0 comments on commit 08e5b4f

Please sign in to comment.