Skip to content

Commit

Permalink
Provide API to create limited view of experimental dispatcher
Browse files Browse the repository at this point in the history
Fixes #475
  • Loading branch information
qwwdfsad authored and elizarov committed Aug 20, 2018
1 parent 8dd5b06 commit cac1a0f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,27 @@ class ExperimentalCoroutineDispatcher(

/**
* Creates new coroutine execution context with limited parallelism to execute tasks which may potentially block.
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and piggybacks on the original [ExperimentalCoroutineDispatcher],
* executing tasks in this context, giving original dispatcher hint to adjust its behaviour.
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
* giving it additional hints to adjust its behaviour.
*
* @param parallelism parallelism level, indicating how many threads can execute tasks in given context in parallel.
* @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
*/
fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
return LimitingBlockingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
return LimitingDispatcher(this, parallelism, TaskMode.PROBABLY_BLOCKING)
}

/**
* Creates new coroutine execution context with limited parallelism to execute CPU-intensive tasks.
* Resulting [CoroutineDispatcher] doesn't own any resources (its threads) and provides a view of the original [ExperimentalCoroutineDispatcher],
* giving it additional hints to adjust its behaviour.
*
* @param parallelism parallelism level, indicating how many threads can execute tasks in the resulting dispatcher parallel.
*/
public fun limited(parallelism: Int): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
return LimitingDispatcher(this, parallelism, TaskMode.NON_BLOCKING)
}

internal fun dispatchWithContext(block: Runnable, context: TaskContext, fair: Boolean): Unit =
Expand All @@ -73,7 +86,7 @@ class ExperimentalCoroutineDispatcher(
}
}

private class LimitingBlockingDispatcher(
private class LimitingDispatcher(
val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
override val taskMode: TaskMode
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package kotlinx.coroutines.experimental.scheduling

import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.*
import org.junit.Test
import kotlin.coroutines.experimental.*
import kotlin.test.*

class LimitingCoroutineDispatcherStressTest : SchedulerTestBase() {

init {
corePoolSize = 3
}

private val blocking = blockingDispatcher(2)
private val cpuView = view(2)
private val cpuView2 = view(2)
private val concurrentWorkers = atomic(0)
private val iterations = 25_000 * stressTestMultiplierSqrt

@Test
fun testCpuLimitNotExtended() = runBlocking<Unit> {
val tasks = ArrayList<Deferred<*>>(iterations * 2)
repeat(iterations) {
tasks += task(cpuView, 3)
tasks += task(cpuView2, 3)
}

tasks.awaitAll()
}

@Test
fun testCpuLimitWithBlocking() = runBlocking<Unit> {
val tasks = ArrayList<Deferred<*>>(iterations * 2)
repeat(iterations) {
tasks += task(cpuView, 4)
tasks += task(blocking, 4)
}

tasks.awaitAll()
}

private fun task(ctx: CoroutineContext, maxLimit: Int): Deferred<Unit> {
return async(ctx) {
try {
val currentlyExecuting = concurrentWorkers.incrementAndGet()
assertTrue(currentlyExecuting <= maxLimit, "Executing: $currentlyExecuting, max limit: $maxLimit")
} finally {
concurrentWorkers.decrementAndGet()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package kotlinx.coroutines.experimental.scheduling

import kotlinx.coroutines.experimental.*
import org.junit.*
import java.util.concurrent.*

class LimitingDispatcherTest : SchedulerTestBase() {

@Test(expected = IllegalArgumentException::class)
fun testTooLargeView() {
view(corePoolSize + 1)
}

@Test(expected = IllegalArgumentException::class)
fun testNegativeView() {
view(-1)
}

@Test(expected = IllegalArgumentException::class)
fun testZeroView() {
view(0)
}

@Test(timeout = 10_000)
fun testBlockingInterleave() = runBlocking {
corePoolSize = 3
val view = view(2)
val blocking = blockingDispatcher(4)
val barrier = CyclicBarrier(6)
val tasks = ArrayList<Job>(6)
repeat(2) {
tasks += async(view) {
barrier.await()
}

repeat(2) {
tasks += async(blocking) {
barrier.await()
}
}
}

tasks.joinAll()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ abstract class SchedulerTestBase : TestBase() {
}

private val exception = atomic<Throwable?>(null)
private val handler = CoroutineExceptionHandler({ _, e -> exception.value = e })
private val handler = CoroutineExceptionHandler { _, e -> exception.value = e }

protected var corePoolSize = 1
protected var maxPoolSize = 1024
Expand Down Expand Up @@ -89,6 +89,11 @@ abstract class SchedulerTestBase : TestBase() {
return _dispatcher!!.blocking(parallelism) + handler
}

protected fun view(parallelism: Int): CoroutineContext {
val intitialize = dispatcher
return _dispatcher!!.limited(parallelism) + handler
}

fun initialPoolSize() = corePoolSize.coerceAtMost(2)

@After
Expand Down

0 comments on commit cac1a0f

Please sign in to comment.