From 5c4efe654c936d1695770431adfa3fe852807724 Mon Sep 17 00:00:00 2001 From: rechee Date: Sun, 19 Apr 2020 00:58:52 -0700 Subject: [PATCH] Provide a way to use CoroutineDispatcher as an Rx's Scheduler. Fixes #968 Fixes #548 --- .../api/kotlinx-coroutines-rx2.api | 4 +- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 139 +++++- .../test/SchedulerStressTest.kt | 87 ++++ .../test/SchedulerTest.kt | 470 +++++++++++++++++- .../api/kotlinx-coroutines-rx3.api | 4 +- .../kotlinx-coroutines-rx3/src/RxScheduler.kt | 138 ++++- .../test/SchedulerStressTest.kt | 87 ++++ .../test/SchedulerTest.kt | 468 ++++++++++++++++- 8 files changed, 1379 insertions(+), 18 deletions(-) create mode 100644 reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt create mode 100644 reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt diff --git a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api index 7cc594496e..c2d1c4bf1d 100644 --- a/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api +++ b/reactive/kotlinx-coroutines-rx2/api/kotlinx-coroutines-rx2.api @@ -69,7 +69,9 @@ public final class kotlinx/coroutines/rx2/RxObservableKt { } public final class kotlinx/coroutines/rx2/RxSchedulerKt { - public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher; + public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher; + public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher; + public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/Scheduler; } public final class kotlinx/coroutines/rx2/RxSingleKt { diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 0262fc12f7..d7d5f6cfbf 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -4,16 +4,143 @@ package kotlinx.coroutines.rx2 -import io.reactivex.Scheduler +import io.reactivex.* +import io.reactivex.disposables.* +import io.reactivex.plugins.* +import kotlinx.atomicfu.* import kotlinx.coroutines.* -import java.util.concurrent.TimeUnit -import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.channels.* +import java.util.concurrent.* +import kotlin.coroutines.* /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] * and provides native support of [delay] and [withTimeout]. */ -public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) +public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = + if (this is DispatcherScheduler) { + dispatcher + } else { + SchedulerCoroutineDispatcher(this) + } + +@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions") +@JvmName("asCoroutineDispatcher") +public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher = + SchedulerCoroutineDispatcher(this) + +/** + * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. + */ +public fun CoroutineDispatcher.asScheduler(): Scheduler = + if (this is SchedulerCoroutineDispatcher) { + scheduler + } else { + DispatcherScheduler(this) + } + +private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() { + + private val schedulerJob = SupervisorJob() + + /** + * The scope for everything happening in this [DispatcherScheduler]. + * + * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well. + */ + private val scope = CoroutineScope(schedulerJob + dispatcher) + + /** + * The counter of created workers, for their pretty-printing. + */ + private val workerCounter = atomic(1L) + + override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + scope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { scope.launch { task() } } + } + + override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob) + + override fun shutdown() { + schedulerJob.cancel() + } + + private class DispatcherWorker( + private val counter: Long, + private val dispatcher: CoroutineDispatcher, + parentJob: Job + ) : Worker() { + + private val workerJob = SupervisorJob(parentJob) + private val workerScope = CoroutineScope(workerJob + dispatcher) + private val blockChannel = Channel Unit>(Channel.UNLIMITED) + + init { + workerScope.launch { + blockChannel.consumeEach { + it() + } + } + } + + override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { blockChannel.trySend(task) } + } + + override fun isDisposed(): Boolean = !workerScope.isActive + + override fun dispose() { + blockChannel.close() + workerJob.cancel() + } + + override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})" + } + + override fun toString(): String = dispatcher.toString() +} + +private typealias Task = suspend () -> Unit + +/** + * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis] + * milliseconds. + */ +private fun CoroutineScope.scheduleTask( + block: Runnable, + delayMillis: Long, + adaptForScheduling: (Task) -> Runnable +): Disposable { + val ctx = coroutineContext + var handle: DisposableHandle? = null + val disposable = Disposables.fromRunnable { + // null if delay <= 0 + handle?.dispose() + } + val decoratedBlock = RxJavaPlugins.onSchedule(block) + suspend fun task() { + if (disposable.isDisposed) return + try { + runInterruptible { + decoratedBlock.run() + } + } catch (e: Throwable) { + handleUndeliverableException(e, ctx) + } + } + + val toSchedule = adaptForScheduling(::task) + if (!isActive) return Disposables.disposed() + if (delayMillis <= 0) { + toSchedule.run() + } else { + @Suppress("INVISIBLE_MEMBER") + ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + } + return disposable +} /** * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. @@ -45,8 +172,10 @@ public class SchedulerCoroutineDispatcher( /** @suppress */ override fun toString(): String = scheduler.toString() + /** @suppress */ override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler + /** @suppress */ override fun hashCode(): Int = System.identityHashCode(scheduler) -} +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt b/reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt new file mode 100644 index 0000000000..ea33a9fa58 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt @@ -0,0 +1,87 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import kotlinx.coroutines.* +import org.junit.* +import java.util.concurrent.* + +class SchedulerStressTest : TestBase() { + @Before + fun setup() { + ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + /** + * Test that we don't get an OOM if we schedule many jobs at once. + * It's expected that if you don't dispose you'd see an OOM error. + */ + @Test + fun testSchedulerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposed(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableDisposed(worker::schedule) + } + + private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { + val n = 2000 * stressTestMultiplier + repeat(n) { + val a = ByteArray(1000000) //1MB + val disposable = block(Runnable { + keepMe(a) + expectUnreached() + }) + disposable.dispose() + yield() // allow the scheduled task to observe that it was disposed + } + } + + /** + * Test function that holds a reference. Used for testing OOM situations + */ + private fun keepMe(a: ByteArray) { + Thread.sleep(a.size / (a.size + 1) + 10L) + } + + /** + * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd + * see a OOM error. + */ + @Test + fun testSchedulerDisposedDuringDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposedDuringDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableDisposedDuringDelay(worker::schedule) + } + + private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { + val n = 2000 * stressTestMultiplier + repeat(n) { + val a = ByteArray(1000000) //1MB + val delayMillis: Long = 10 + val disposable = block(Runnable { + keepMe(a) + expectUnreached() + }, delayMillis, TimeUnit.MILLISECONDS) + disposable.dispose() + } + } +} \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt index 26dbe8f4cf..194186718d 100644 --- a/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt +++ b/reactive/kotlinx-coroutines-rx2/test/SchedulerTest.kt @@ -4,10 +4,18 @@ package kotlinx.coroutines.rx2 -import io.reactivex.schedulers.Schedulers +import io.reactivex.* +import io.reactivex.disposables.* +import io.reactivex.plugins.* +import io.reactivex.schedulers.* import kotlinx.coroutines.* -import org.junit.Before +import kotlinx.coroutines.sync.* +import org.junit.* import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.* import kotlin.test.* class SchedulerTest : TestBase() { @@ -17,7 +25,7 @@ class SchedulerTest : TestBase() { } @Test - fun testIoScheduler(): Unit = runBlocking { + fun testIoScheduler(): Unit = runTest { expect(1) val mainThread = Thread.currentThread() withContext(Schedulers.io().asCoroutineDispatcher()) { @@ -31,4 +39,458 @@ class SchedulerTest : TestBase() { } finish(4) } -} \ No newline at end of file + + /** Tests [toString] implementations of [CoroutineDispatcher.asScheduler] and its [Scheduler.Worker]. */ + @Test + fun testSchedulerToString() { + val name = "Dispatchers.Default" + val scheduler = Dispatchers.Default.asScheduler() + assertContains(scheduler.toString(), name) + val worker = scheduler.createWorker() + val activeWorkerName = worker.toString() + assertContains(worker.toString(), name) + worker.dispose() + val disposedWorkerName = worker.toString() + assertNotEquals(activeWorkerName, disposedWorkerName) + } + + private fun runSchedulerTest(nThreads: Int = 1, action: (Scheduler) -> Unit) { + val future = CompletableFuture() + try { + newFixedThreadPoolContext(nThreads, "test").use { dispatcher -> + RxJavaPlugins.setErrorHandler { + if (!future.completeExceptionally(it)) { + handleUndeliverableException(it, dispatcher) + } + } + action(dispatcher.asScheduler()) + } + } finally { + RxJavaPlugins.setErrorHandler(null) + } + future.complete(Unit) + future.getNow(Unit) // rethrow any encountered errors + } + + private fun ensureSeparateThread(schedule: (Runnable, Long, TimeUnit) -> Unit, scheduleNoDelay: (Runnable) -> Unit) { + val mainThread = Thread.currentThread() + val cdl1 = CountDownLatch(1) + val cdl2 = CountDownLatch(1) + expect(1) + val thread = AtomicReference(null) + fun checkThread() { + val current = Thread.currentThread() + thread.getAndSet(current)?.let { assertEquals(it, current) } + } + schedule({ + assertNotSame(mainThread, Thread.currentThread()) + checkThread() + cdl2.countDown() + }, 300, TimeUnit.MILLISECONDS) + scheduleNoDelay { + expect(2) + checkThread() + assertNotSame(mainThread, Thread.currentThread()) + cdl1.countDown() + } + cdl1.await() + cdl2.await() + finish(3) + } + + /** + * Tests [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler] on a single-threaded dispatcher. + */ + @Test + fun testSingleThreadedDispatcherDirect(): Unit = runSchedulerTest(1) { + ensureSeparateThread(it::scheduleDirect, it::scheduleDirect) + } + + /** + * Tests [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler] running its tasks on the correct thread. + */ + @Test + fun testSingleThreadedWorker(): Unit = runSchedulerTest(1) { + val worker = it.createWorker() + ensureSeparateThread(worker::schedule, worker::schedule) + } + + private fun checkCancelling(schedule: (Runnable, Long, TimeUnit) -> Disposable) { + // cancel the task before it has a chance to run. + val handle1 = schedule({ + throw IllegalStateException("should have been successfully cancelled") + }, 10_000, TimeUnit.MILLISECONDS) + handle1.dispose() + // cancel the task after it started running. + val cdl1 = CountDownLatch(1) + val cdl2 = CountDownLatch(1) + val handle2 = schedule({ + cdl1.countDown() + cdl2.await() + if (Thread.interrupted()) + throw IllegalStateException("cancelling the task should not interrupt the thread") + }, 100, TimeUnit.MILLISECONDS) + cdl1.await() + handle2.dispose() + cdl2.countDown() + } + + /** + * Test cancelling [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler]. + */ + @Test + fun testCancellingDirect(): Unit = runSchedulerTest { + checkCancelling(it::scheduleDirect) + } + + /** + * Test cancelling [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler]. + */ + @Test + fun testCancellingWorker(): Unit = runSchedulerTest { + val worker = it.createWorker() + checkCancelling(worker::schedule) + } + + /** + * Test shutting down [CoroutineDispatcher.asScheduler]. + */ + @Test + fun testShuttingDown() { + val n = 5 + runSchedulerTest(nThreads = n) { scheduler -> + val cdl1 = CountDownLatch(n) + val cdl2 = CountDownLatch(1) + val cdl3 = CountDownLatch(n) + repeat(n) { + scheduler.scheduleDirect { + cdl1.countDown() + try { + cdl2.await() + } catch (e: InterruptedException) { + // this is the expected outcome + cdl3.countDown() + } + } + } + cdl1.await() + scheduler.shutdown() + if (!cdl3.await(1, TimeUnit.SECONDS)) { + cdl2.countDown() + error("the tasks were not cancelled when the scheduler was shut down") + } + } + } + + /** Tests that there are no uncaught exceptions if [Disposable.dispose] on a worker happens when tasks are present. */ + @Test + fun testDisposingWorker() = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + yield() // so that the worker starts waiting on the channel + assertFalse(worker.isDisposed) + worker.dispose() + assertTrue(worker.isDisposed) + } + + /** Tests trying to use a [Scheduler.Worker]/[Scheduler] after [Scheduler.Worker.dispose]/[Scheduler.shutdown]. */ + @Test + fun testSchedulingAfterDisposing() = runSchedulerTest { + expect(1) + val worker = it.createWorker() + // use CDL to ensure that the worker has properly initialized + val cdl1 = CountDownLatch(1) + setScheduler(2, 3) + val disposable1 = worker.schedule { + cdl1.countDown() + } + cdl1.await() + expect(4) + assertFalse(disposable1.isDisposed) + setScheduler(6, -1) + // check that the worker automatically disposes of the tasks after being disposed + assertFalse(worker.isDisposed) + worker.dispose() + assertTrue(worker.isDisposed) + expect(5) + val disposable2 = worker.schedule { + expectUnreached() + } + assertTrue(disposable2.isDisposed) + setScheduler(7, 8) + // ensure that the scheduler still works + val cdl2 = CountDownLatch(1) + val disposable3 = it.scheduleDirect { + cdl2.countDown() + } + cdl2.await() + expect(9) + assertFalse(disposable3.isDisposed) + // check that the scheduler automatically disposes of the tasks after being shut down + it.shutdown() + setScheduler(10, -1) + val disposable4 = it.scheduleDirect { + expectUnreached() + } + assertTrue(disposable4.isDisposed) + RxJavaPlugins.setScheduleHandler(null) + finish(11) + } + + @Test + fun testSchedulerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) { + expect(1) + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }) + } + yield() + finish(3) + } + + @Test + fun testSchedulerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, 300) + } + + @Test + fun testSchedulerWorkerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, 300) + } + + @Test + fun testSchedulerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) { + expect(1) + suspendCancellableCoroutine { + block({ + expect(2) + it.resume(Unit) + }, delayMillis, TimeUnit.MILLISECONDS) + } + finish(3) + } + + @Test + fun testAsSchedulerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, -1) + } + + @Test + fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, -1) + } + + @Test + fun testSchedulerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler.createWorker()::schedule) + } + + private fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) { + val disposable = block { + expectUnreached() + } + disposable.dispose() + } + + @Test + fun testConvertDispatcherToOriginalScheduler(): Unit = runTest { + val originalScheduler = Schedulers.io() + val dispatcher = originalScheduler.asCoroutineDispatcher() + val scheduler = dispatcher.asScheduler() + assertSame(originalScheduler, scheduler) + } + + @Test + fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest { + val originalDispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = originalDispatcher.asScheduler() + val dispatcher = scheduler.asCoroutineDispatcher() + assertSame(originalDispatcher, dispatcher) + } + + @Test + fun testSchedulerExpectRxPluginsCall(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) { + expect(1) + setScheduler(2, 4) + suspendCancellableCoroutine { + block(Runnable { + expect(5) + it.resume(Unit) + }) + expect(3) + } + RxJavaPlugins.setScheduleHandler(null) + finish(6) + } + + @Test + fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableExpectRxPluginsCallDelay(worker::schedule) + } + + private suspend fun testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + setScheduler(2, 4) + suspendCancellableCoroutine { + block({ + expect(5) + it.resume(Unit) + }, 10, TimeUnit.MILLISECONDS) + expect(3) + } + RxJavaPlugins.setScheduleHandler(null) + finish(6) + } + + private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { + RxJavaPlugins.setScheduleHandler { + expect(expectedCountOnSchedule) + Runnable { + expect(expectCountOnRun) + it.run() + } + } + } + + /** + * Tests that [Scheduler.Worker] runs all work sequentially. + */ + @Test + fun testWorkerSequentialOrdering() = runTest { + expect(1) + val scheduler = Dispatchers.Default.asScheduler() + val worker = scheduler.createWorker() + val iterations = 100 + for (i in 0..iterations) { + worker.schedule { + expect(2 + i) + } + } + suspendCoroutine { + worker.schedule { + it.resume(Unit) + } + } + finish((iterations + 2) + 1) + } + + /** + * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later, + * even when the later task is submitted before the earlier one) + */ + @Test + fun testSchedulerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) { + expect(1) + val semaphore = Semaphore(2, 2) + block({ + expect(3) + semaphore.release() + }, 100, TimeUnit.MILLISECONDS) + block({ + expect(2) + semaphore.release() + }, 1, TimeUnit.MILLISECONDS) + semaphore.acquire() + semaphore.acquire() + finish(4) + } + + /** + * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler. + * + * This is part of expected behavior documented. + */ + @Test + fun testMultipleWorkerCancellation(): Unit = runTest { + expect(1) + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + suspendCancellableCoroutine { + val workerOne = scheduler.createWorker() + workerOne.schedule({ + expect(3) + it.resume(Unit) + }, 50, TimeUnit.MILLISECONDS) + val workerTwo = scheduler.createWorker() + workerTwo.schedule({ + expectUnreached() + }, 1000, TimeUnit.MILLISECONDS) + workerTwo.dispose() + expect(2) + } + finish(4) + } +} + +typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable +typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api index f6f3f1d06d..5776214b0a 100644 --- a/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api +++ b/reactive/kotlinx-coroutines-rx3/api/kotlinx-coroutines-rx3.api @@ -58,7 +58,9 @@ public final class kotlinx/coroutines/rx3/RxObservableKt { } public final class kotlinx/coroutines/rx3/RxSchedulerKt { - public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher; + public static final fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher; + public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/rxjava3/core/Scheduler;)Lkotlinx/coroutines/rx3/SchedulerCoroutineDispatcher; + public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/rxjava3/core/Scheduler; } public final class kotlinx/coroutines/rx3/RxSingleKt { diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 24c3f11834..abaf02450a 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -4,16 +4,144 @@ package kotlinx.coroutines.rx3 -import io.reactivex.rxjava3.core.Scheduler +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* +import io.reactivex.rxjava3.plugins.* +import kotlinx.atomicfu.* import kotlinx.coroutines.* -import java.util.concurrent.TimeUnit -import kotlin.coroutines.CoroutineContext +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.channels.* +import java.util.concurrent.* +import kotlin.coroutines.* /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] * and provides native support of [delay] and [withTimeout]. */ -public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) +public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = + if (this is DispatcherScheduler) { + dispatcher + } else { + SchedulerCoroutineDispatcher(this) + } + +@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions") +@JvmName("asCoroutineDispatcher") +public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher = + SchedulerCoroutineDispatcher(this) + +/** + * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. + */ +public fun CoroutineDispatcher.asScheduler(): Scheduler = + if (this is SchedulerCoroutineDispatcher) { + scheduler + } else { + DispatcherScheduler(this) + } + +private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() { + + private val schedulerJob = SupervisorJob() + + /** + * The scope for everything happening in this [DispatcherScheduler]. + * + * Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well. + */ + private val scope = CoroutineScope(schedulerJob + dispatcher) + + /** + * The counter of created workers, for their pretty-printing. + */ + private val workerCounter = atomic(1L) + + override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + scope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { scope.launch { task() } } + } + + override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob) + + override fun shutdown() { + schedulerJob.cancel() + } + + private class DispatcherWorker( + private val counter: Long, + private val dispatcher: CoroutineDispatcher, + parentJob: Job + ) : Worker() { + + private val workerJob = SupervisorJob(parentJob) + private val workerScope = CoroutineScope(workerJob + dispatcher) + private val blockChannel = Channel Unit>(Channel.UNLIMITED) + + init { + workerScope.launch { + blockChannel.consumeEach { + it() + } + } + } + + override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { blockChannel.trySend(task) } + } + + override fun isDisposed(): Boolean = !workerScope.isActive + + override fun dispose() { + blockChannel.close() + workerJob.cancel() + } + + override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})" + } + + override fun toString(): String = dispatcher.toString() +} + +private typealias Task = suspend () -> Unit + +/** + * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis] + * milliseconds. + */ +private fun CoroutineScope.scheduleTask( + block: Runnable, + delayMillis: Long, + adaptForScheduling: (Task) -> Runnable +): Disposable { + val ctx = coroutineContext + var handle: DisposableHandle? = null + val disposable = Disposable.fromRunnable { + // null if delay <= 0 + handle?.dispose() + } + val decoratedBlock = RxJavaPlugins.onSchedule(block) + suspend fun task() { + if (disposable.isDisposed) return + try { + runInterruptible { + decoratedBlock.run() + } + } catch (e: Throwable) { + handleUndeliverableException(e, ctx) + } + } + + val toSchedule = adaptForScheduling(::task) + if (!isActive) return Disposable.disposed() + if (delayMillis <= 0) { + toSchedule.run() + } else { + @Suppress("INVISIBLE_MEMBER") + ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + } + return disposable +} /** * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. @@ -45,8 +173,10 @@ public class SchedulerCoroutineDispatcher( /** @suppress */ override fun toString(): String = scheduler.toString() + /** @suppress */ override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler + /** @suppress */ override fun hashCode(): Int = System.identityHashCode(scheduler) } diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt new file mode 100644 index 0000000000..5abb511d72 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerStressTest.kt @@ -0,0 +1,87 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx3 + +import kotlinx.coroutines.* +import org.junit.* +import java.util.concurrent.* + +class SchedulerStressTest : TestBase() { + @Before + fun setup() { + ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + } + + /** + * Test that we don't get an OOM if we schedule many jobs at once. + * It's expected that if you don't dispose you'd see an OOM error. + */ + @Test + fun testSchedulerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposed(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposed(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableDisposed(worker::schedule) + } + + private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) { + val n = 2000 * stressTestMultiplier + repeat(n) { + val a = ByteArray(1000000) //1MB + val disposable = block(Runnable { + keepMe(a) + expectUnreached() + }) + disposable.dispose() + yield() // allow the scheduled task to observe that it was disposed + } + } + + /** + * Test function that holds a reference. Used for testing OOM situations + */ + private fun keepMe(a: ByteArray) { + Thread.sleep(a.size / (a.size + 1) + 10L) + } + + /** + * Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd + * see a OOM error. + */ + @Test + fun testSchedulerDisposedDuringDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableDisposedDuringDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableDisposedDuringDelay(worker::schedule) + } + + private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) { + val n = 2000 * stressTestMultiplier + repeat(n) { + val a = ByteArray(1000000) //1MB + val delayMillis: Long = 10 + val disposable = block(Runnable { + keepMe(a) + expectUnreached() + }, delayMillis, TimeUnit.MILLISECONDS) + disposable.dispose() + } + } +} diff --git a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt index 9e95c213d0..c966cdfd08 100644 --- a/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt +++ b/reactive/kotlinx-coroutines-rx3/test/SchedulerTest.kt @@ -4,10 +4,18 @@ package kotlinx.coroutines.rx3 -import io.reactivex.rxjava3.schedulers.Schedulers +import io.reactivex.rxjava3.core.* +import io.reactivex.rxjava3.disposables.* +import io.reactivex.rxjava3.plugins.* +import io.reactivex.rxjava3.schedulers.* import kotlinx.coroutines.* -import org.junit.Before +import kotlinx.coroutines.sync.* +import org.junit.* import org.junit.Test +import java.lang.Runnable +import java.util.concurrent.* +import java.util.concurrent.atomic.AtomicReference +import kotlin.coroutines.* import kotlin.test.* class SchedulerTest : TestBase() { @@ -17,7 +25,7 @@ class SchedulerTest : TestBase() { } @Test - fun testIoScheduler(): Unit = runBlocking { + fun testIoScheduler(): Unit = runTest { expect(1) val mainThread = Thread.currentThread() withContext(Schedulers.io().asCoroutineDispatcher()) { @@ -31,4 +39,458 @@ class SchedulerTest : TestBase() { } finish(4) } + + /** Tests [toString] implementations of [CoroutineDispatcher.asScheduler] and its [Scheduler.Worker]. */ + @Test + fun testSchedulerToString() { + val name = "Dispatchers.Default" + val scheduler = Dispatchers.Default.asScheduler() + assertContains(scheduler.toString(), name) + val worker = scheduler.createWorker() + val activeWorkerName = worker.toString() + assertContains(worker.toString(), name) + worker.dispose() + val disposedWorkerName = worker.toString() + assertNotEquals(activeWorkerName, disposedWorkerName) + } + + private fun runSchedulerTest(nThreads: Int = 1, action: (Scheduler) -> Unit) { + val future = CompletableFuture() + try { + newFixedThreadPoolContext(nThreads, "test").use { dispatcher -> + RxJavaPlugins.setErrorHandler { + if (!future.completeExceptionally(it)) { + handleUndeliverableException(it, dispatcher) + } + } + action(dispatcher.asScheduler()) + } + } finally { + RxJavaPlugins.setErrorHandler(null) + } + future.complete(Unit) + future.getNow(Unit) // rethrow any encountered errors + } + + private fun ensureSeparateThread(schedule: (Runnable, Long, TimeUnit) -> Unit, scheduleNoDelay: (Runnable) -> Unit) { + val mainThread = Thread.currentThread() + val cdl1 = CountDownLatch(1) + val cdl2 = CountDownLatch(1) + expect(1) + val thread = AtomicReference(null) + fun checkThread() { + val current = Thread.currentThread() + thread.getAndSet(current)?.let { assertEquals(it, current) } + } + schedule({ + assertNotSame(mainThread, Thread.currentThread()) + checkThread() + cdl2.countDown() + }, 300, TimeUnit.MILLISECONDS) + scheduleNoDelay { + expect(2) + checkThread() + assertNotSame(mainThread, Thread.currentThread()) + cdl1.countDown() + } + cdl1.await() + cdl2.await() + finish(3) + } + + /** + * Tests [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler] on a single-threaded dispatcher. + */ + @Test + fun testSingleThreadedDispatcherDirect(): Unit = runSchedulerTest(1) { + ensureSeparateThread(it::scheduleDirect, it::scheduleDirect) + } + + /** + * Tests [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler] running its tasks on the correct thread. + */ + @Test + fun testSingleThreadedWorker(): Unit = runSchedulerTest(1) { + val worker = it.createWorker() + ensureSeparateThread(worker::schedule, worker::schedule) + } + + private fun checkCancelling(schedule: (Runnable, Long, TimeUnit) -> Disposable) { + // cancel the task before it has a chance to run. + val handle1 = schedule({ + throw IllegalStateException("should have been successfully cancelled") + }, 10_000, TimeUnit.MILLISECONDS) + handle1.dispose() + // cancel the task after it started running. + val cdl1 = CountDownLatch(1) + val cdl2 = CountDownLatch(1) + val handle2 = schedule({ + cdl1.countDown() + cdl2.await() + if (Thread.interrupted()) + throw IllegalStateException("cancelling the task should not interrupt the thread") + }, 100, TimeUnit.MILLISECONDS) + cdl1.await() + handle2.dispose() + cdl2.countDown() + } + + /** + * Test cancelling [Scheduler.scheduleDirect] for [CoroutineDispatcher.asScheduler]. + */ + @Test + fun testCancellingDirect(): Unit = runSchedulerTest { + checkCancelling(it::scheduleDirect) + } + + /** + * Test cancelling [Scheduler.Worker.schedule] for [CoroutineDispatcher.asScheduler]. + */ + @Test + fun testCancellingWorker(): Unit = runSchedulerTest { + val worker = it.createWorker() + checkCancelling(worker::schedule) + } + + /** + * Test shutting down [CoroutineDispatcher.asScheduler]. + */ + @Test + fun testShuttingDown() { + val n = 5 + runSchedulerTest(nThreads = n) { scheduler -> + val cdl1 = CountDownLatch(n) + val cdl2 = CountDownLatch(1) + val cdl3 = CountDownLatch(n) + repeat(n) { + scheduler.scheduleDirect { + cdl1.countDown() + try { + cdl2.await() + } catch (e: InterruptedException) { + // this is the expected outcome + cdl3.countDown() + } + } + } + cdl1.await() + scheduler.shutdown() + if (!cdl3.await(1, TimeUnit.SECONDS)) { + cdl2.countDown() + error("the tasks were not cancelled when the scheduler was shut down") + } + } + } + + /** Tests that there are no uncaught exceptions if [Disposable.dispose] on a worker happens when tasks are present. */ + @Test + fun testDisposingWorker() = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + yield() // so that the worker starts waiting on the channel + assertFalse(worker.isDisposed) + worker.dispose() + assertTrue(worker.isDisposed) + } + + /** Tests trying to use a [Scheduler.Worker]/[Scheduler] after [Scheduler.Worker.dispose]/[Scheduler.shutdown]. */ + @Test + fun testSchedulingAfterDisposing() = runSchedulerTest { + expect(1) + val worker = it.createWorker() + // use CDL to ensure that the worker has properly initialized + val cdl1 = CountDownLatch(1) + setScheduler(2, 3) + val disposable1 = worker.schedule { + cdl1.countDown() + } + cdl1.await() + expect(4) + assertFalse(disposable1.isDisposed) + setScheduler(6, -1) + // check that the worker automatically disposes of the tasks after being disposed + assertFalse(worker.isDisposed) + worker.dispose() + assertTrue(worker.isDisposed) + expect(5) + val disposable2 = worker.schedule { + expectUnreached() + } + assertTrue(disposable2.isDisposed) + setScheduler(7, 8) + // ensure that the scheduler still works + val cdl2 = CountDownLatch(1) + val disposable3 = it.scheduleDirect { + cdl2.countDown() + } + cdl2.await() + expect(9) + assertFalse(disposable3.isDisposed) + // check that the scheduler automatically disposes of the tasks after being shut down + it.shutdown() + setScheduler(10, -1) + val disposable4 = it.scheduleDirect { + expectUnreached() + } + assertTrue(disposable4.isDisposed) + RxJavaPlugins.setScheduleHandler(null) + finish(11) + } + + @Test + fun testSchedulerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithNoDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithNoDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithNoDelay(block: RxSchedulerBlockNoDelay) { + expect(1) + suspendCancellableCoroutine { + block(Runnable { + expect(2) + it.resume(Unit) + }) + } + yield() + finish(3) + } + + @Test + fun testSchedulerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, 300) + } + + @Test + fun testSchedulerWorkerWithDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, 300) + } + + @Test + fun testSchedulerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerWithZeroDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableWithDelay(block: RxSchedulerBlockWithDelay, delayMillis: Long = 0) { + expect(1) + suspendCancellableCoroutine { + block({ + expect(2) + it.resume(Unit) + }, delayMillis, TimeUnit.MILLISECONDS) + } + finish(3) + } + + @Test + fun testAsSchedulerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler::scheduleDirect, -1) + } + + @Test + fun testAsSchedulerWorkerWithNegativeDelay(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableWithDelay(scheduler.createWorker()::schedule, -1) + } + + @Test + fun testSchedulerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerImmediateDispose(): Unit = runTest { + val scheduler = (currentDispatcher() as CoroutineDispatcher).asScheduler() + testRunnableImmediateDispose(scheduler.createWorker()::schedule) + } + + private fun testRunnableImmediateDispose(block: RxSchedulerBlockNoDelay) { + val disposable = block { + expectUnreached() + } + disposable.dispose() + } + + @Test + fun testConvertDispatcherToOriginalScheduler(): Unit = runTest { + val originalScheduler = Schedulers.io() + val dispatcher = originalScheduler.asCoroutineDispatcher() + val scheduler = dispatcher.asScheduler() + assertSame(originalScheduler, scheduler) + } + + @Test + fun testConvertSchedulerToOriginalDispatcher(): Unit = runTest { + val originalDispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = originalDispatcher.asScheduler() + val dispatcher = scheduler.asCoroutineDispatcher() + assertSame(originalDispatcher, dispatcher) + } + + @Test + fun testSchedulerExpectRxPluginsCall(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCall(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCall(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableExpectRxPluginsCall(block: RxSchedulerBlockNoDelay) { + expect(1) + setScheduler(2, 4) + suspendCancellableCoroutine { + block(Runnable { + expect(5) + it.resume(Unit) + }) + expect(3) + } + RxJavaPlugins.setScheduleHandler(null) + finish(6) + } + + @Test + fun testSchedulerExpectRxPluginsCallWithDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + testRunnableExpectRxPluginsCallDelay(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerExpectRxPluginsCallWithDelay(): Unit = runTest { + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + val worker = scheduler.createWorker() + testRunnableExpectRxPluginsCallDelay(worker::schedule) + } + + private suspend fun testRunnableExpectRxPluginsCallDelay(block: RxSchedulerBlockWithDelay) { + expect(1) + setScheduler(2, 4) + suspendCancellableCoroutine { + block({ + expect(5) + it.resume(Unit) + }, 10, TimeUnit.MILLISECONDS) + expect(3) + } + RxJavaPlugins.setScheduleHandler(null) + finish(6) + } + + private fun setScheduler(expectedCountOnSchedule: Int, expectCountOnRun: Int) { + RxJavaPlugins.setScheduleHandler { + expect(expectedCountOnSchedule) + Runnable { + expect(expectCountOnRun) + it.run() + } + } + } + + /** + * Tests that [Scheduler.Worker] runs all work sequentially. + */ + @Test + fun testWorkerSequentialOrdering() = runTest { + expect(1) + val scheduler = Dispatchers.Default.asScheduler() + val worker = scheduler.createWorker() + val iterations = 100 + for (i in 0..iterations) { + worker.schedule { + expect(2 + i) + } + } + suspendCoroutine { + worker.schedule { + it.resume(Unit) + } + } + finish((iterations + 2) + 1) + } + + /** + * Test that ensures that delays are actually respected (tasks scheduled sooner in the future run before tasks scheduled later, + * even when the later task is submitted before the earlier one) + */ + @Test + fun testSchedulerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler::scheduleDirect) + } + + @Test + fun testSchedulerWorkerRespectsDelays(): Unit = runTest { + val scheduler = Dispatchers.Default.asScheduler() + testRunnableRespectsDelays(scheduler.createWorker()::schedule) + } + + private suspend fun testRunnableRespectsDelays(block: RxSchedulerBlockWithDelay) { + expect(1) + val semaphore = Semaphore(2, 2) + block({ + expect(3) + semaphore.release() + }, 100, TimeUnit.MILLISECONDS) + block({ + expect(2) + semaphore.release() + }, 1, TimeUnit.MILLISECONDS) + semaphore.acquire() + semaphore.acquire() + finish(4) + } + + /** + * Tests that cancelling a runnable in one worker doesn't affect work in another scheduler. + * + * This is part of expected behavior documented. + */ + @Test + fun testMultipleWorkerCancellation(): Unit = runTest { + expect(1) + val dispatcher = currentDispatcher() as CoroutineDispatcher + val scheduler = dispatcher.asScheduler() + suspendCancellableCoroutine { + val workerOne = scheduler.createWorker() + workerOne.schedule({ + expect(3) + it.resume(Unit) + }, 50, TimeUnit.MILLISECONDS) + val workerTwo = scheduler.createWorker() + workerTwo.schedule({ + expectUnreached() + }, 1000, TimeUnit.MILLISECONDS) + workerTwo.dispose() + expect(2) + } + finish(4) + } } + +typealias RxSchedulerBlockNoDelay = (Runnable) -> Disposable +typealias RxSchedulerBlockWithDelay = (Runnable, Long, TimeUnit) -> Disposable \ No newline at end of file