forked from Kotlin/kotlinx.coroutines
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Provide a way to use CoroutineDispatcher as an Rx's Scheduler.
Fixes Kotlin#968 Fixes Kotlin#548
- Loading branch information
1 parent
da5e7e0
commit 5c4efe6
Showing
8 changed files
with
1,379 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
87 changes: 87 additions & 0 deletions
87
reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
/* | ||
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. | ||
*/ | ||
|
||
package kotlinx.coroutines.rx2 | ||
|
||
import kotlinx.coroutines.* | ||
import org.junit.* | ||
import java.util.concurrent.* | ||
|
||
class SchedulerStressTest : TestBase() { | ||
@Before | ||
fun setup() { | ||
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") | ||
} | ||
|
||
/** | ||
* Test that we don't get an OOM if we schedule many jobs at once. | ||
* It's expected that if you don't dispose you'd see an OOM error. | ||
*/ | ||
@Test | ||
fun testSchedulerDisposed(): Unit = runTest { | ||
val dispatcher = currentDispatcher() as CoroutineDispatcher | ||
val scheduler = dispatcher.asScheduler() | ||
testRunnableDisposed(scheduler::scheduleDirect) | ||
} | ||
|
||
@Test | ||
fun testSchedulerWorkerDisposed(): Unit = runTest { | ||
val dispatcher = currentDispatcher() as CoroutineDispatcher | ||
val scheduler = dispatcher.asScheduler() | ||
val worker = scheduler.createWorker() | ||
testRunnableDisposed(worker::schedule) | ||
} | ||
|
||
private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { | ||
val n = 2000 * stressTestMultiplier | ||
repeat(n) { | ||
val a = ByteArray(1000000) //1MB | ||
val disposable = block(Runnable { | ||
keepMe(a) | ||
expectUnreached() | ||
}) | ||
disposable.dispose() | ||
yield() // allow the scheduled task to observe that it was disposed | ||
} | ||
} | ||
|
||
/** | ||
* Test function that holds a reference. Used for testing OOM situations | ||
*/ | ||
private fun keepMe(a: ByteArray) { | ||
Thread.sleep(a.size / (a.size + 1) + 10L) | ||
} | ||
|
||
/** | ||
* Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd | ||
* see a OOM error. | ||
*/ | ||
@Test | ||
fun testSchedulerDisposedDuringDelay(): Unit = runTest { | ||
val dispatcher = currentDispatcher() as CoroutineDispatcher | ||
val scheduler = dispatcher.asScheduler() | ||
testRunnableDisposedDuringDelay(scheduler::scheduleDirect) | ||
} | ||
|
||
@Test | ||
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest { | ||
val dispatcher = currentDispatcher() as CoroutineDispatcher | ||
val scheduler = dispatcher.asScheduler() | ||
val worker = scheduler.createWorker() | ||
testRunnableDisposedDuringDelay(worker::schedule) | ||
} | ||
|
||
private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { | ||
val n = 2000 * stressTestMultiplier | ||
repeat(n) { | ||
val a = ByteArray(1000000) //1MB | ||
val delayMillis: Long = 10 | ||
val disposable = block(Runnable { | ||
keepMe(a) | ||
expectUnreached() | ||
}, delayMillis, TimeUnit.MILLISECONDS) | ||
disposable.dispose() | ||
} | ||
} | ||
} |
Oops, something went wrong.