Skip to content

Commit

Permalink
Avoid OOM in thread-pool dispatchers, try to reflectively invoke setR…
Browse files Browse the repository at this point in the history
…emoveFutureOnCancel on executor instance and use default dispatcher if attempt failed

Fixes #571
  • Loading branch information
qwwdfsad authored and elizarov committed Sep 28, 2018
1 parent 7ff678f commit e29f497
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 18 deletions.
64 changes: 46 additions & 18 deletions core/kotlinx-coroutines-core/src/Executors.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package kotlinx.coroutines.experimental

import kotlinx.coroutines.experimental.internal.*
import java.io.*
import java.io.Closeable
import java.util.concurrent.*
import kotlin.coroutines.experimental.*

Expand All @@ -18,7 +20,7 @@ import kotlin.coroutines.experimental.*
public abstract class ExecutorCoroutineDispatcher: CloseableCoroutineDispatcher(), Closeable {
/**
* Closes this coroutine dispatcher and shuts down its executor.
*
*
* It may throw an exception if this dispatcher is global and cannot be closed.
*/
public abstract override fun close()
Expand Down Expand Up @@ -74,41 +76,67 @@ public fun ExecutorService.asCoroutineDispatcher_Deprecated(): CloseableCoroutin
public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
asCoroutineDispatcher()

private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
private class ExecutorCoroutineDispatcherImpl(override val executor: Executor) : ExecutorCoroutineDispatcherBase() {
init {
initFutureCancellation()
}
}

/**
* @suppress **This is unstable API and it is subject to change.**
*/
@InternalCoroutinesApi
public abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

private var removesFutureOnCancellation: Boolean = false

internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
}

override fun dispatch(context: CoroutineContext, block: Runnable) =
try { executor.execute(timeSource.trackTask(block)) }
catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.execute(block)
}

/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS) }
catch (e: RejectedExecutionException) { null }
if (timeout != null)
continuation.cancelFutureOnCancellation(timeout)
else
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
} else {
null
}
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
return
}
// Otherwise fallback to default executor
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val timeout =
try { (executor as? ScheduledExecutorService)
?.schedule(block, timeMillis, TimeUnit.MILLISECONDS) }
catch (e: RejectedExecutionException) { null }
return if (timeout != null)
DisposableFutureHandle(timeout)
else
DefaultExecutor.invokeOnTimeout(timeMillis, block)
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
} else {
null
}

return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
}

private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
} catch (e: RejectedExecutionException) {
null
}
}

override fun close() {
Expand Down
5 changes: 5 additions & 0 deletions core/kotlinx-coroutines-core/src/ThreadPoolDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.experimental

import kotlinx.coroutines.experimental.internal.*
import java.util.concurrent.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.experimental.*
Expand Down Expand Up @@ -88,6 +89,10 @@ public class ThreadPoolDispatcher internal constructor(
PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
}

init {
initFutureCancellation()
}

/**
* Closes this dispatcher -- shuts down all threads in this pool and releases resources.
*/
Expand Down
18 changes: 18 additions & 0 deletions core/kotlinx-coroutines-core/src/internal/Concurrent.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.experimental.internal

import java.lang.reflect.*
import java.util.*
import java.util.concurrent.*
import kotlin.concurrent.withLock as withLockJvm
Expand All @@ -16,3 +17,20 @@ internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLo
internal actual inline fun <T> ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action)

internal actual fun <E> identitySet(expectedSize: Int): MutableSet<E> = Collections.newSetFromMap(IdentityHashMap(expectedSize))

private val REMOVE_FUTURE_ON_CANCEL: Method? = try {
ScheduledThreadPoolExecutor::class.java.getMethod("setRemoveOnCancelPolicy", Boolean::class.java)
} catch (e: Throwable) {
null
}

@Suppress("NAME_SHADOWING")
internal fun removeFutureOnCancel(executor: Executor): Boolean {
try {
val executor = executor as? ScheduledExecutorService ?: return false
(REMOVE_FUTURE_ON_CANCEL ?: return false).invoke(executor, true)
return true
} catch (e: Throwable) {
return true
}
}

0 comments on commit e29f497

Please sign in to comment.