From 085ef9a0c2aba46146af5773d426ab17a964eef2 Mon Sep 17 00:00:00 2001 From: Jakub Blejder Date: Fri, 23 Nov 2018 13:37:04 +0100 Subject: [PATCH] Way to use CoroutineDispatcher as RxJava2 scheduler --- .../src/RxCoroutineDispatcher.kt | 64 +++++++++++++++++++ .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 6 +- .../test/DispatcherTest.kt | 58 +++++++++++++++++ 3 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 reactive/kotlinx-coroutines-rx2/src/RxCoroutineDispatcher.kt create mode 100644 reactive/kotlinx-coroutines-rx2/test/DispatcherTest.kt diff --git a/reactive/kotlinx-coroutines-rx2/src/RxCoroutineDispatcher.kt b/reactive/kotlinx-coroutines-rx2/src/RxCoroutineDispatcher.kt new file mode 100644 index 0000000000..8eef1f00d6 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/src/RxCoroutineDispatcher.kt @@ -0,0 +1,64 @@ +/* + * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. + */ + +package kotlinx.coroutines.rx2 + +import io.reactivex.Scheduler +import io.reactivex.disposables.Disposable +import io.reactivex.schedulers.Schedulers +import kotlinx.coroutines.* +import java.util.concurrent.TimeUnit +import kotlin.coroutines.CoroutineContext + +/** + * Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler]. + * If dispatcher is instance of [SchedulerCoroutineDispatcher] it just extracts underlying [Scheduler]. + */ +public fun CoroutineDispatcher.asScheduler(): Scheduler = when (this) { + is SchedulerCoroutineDispatcher -> scheduler + else -> CoroutineDispatcherScheduler(this) +} + +/** + * Implements [Scheduler] on top of an arbitrary [CoroutineDispatcher]. + * @param dispatcher a dispatcher. + */ +public class CoroutineDispatcherScheduler( + /** + * Underlying dispatcher of current [Scheduler]. + */ + public val dispatcher: CoroutineDispatcher +) : Scheduler() { + + override fun createWorker(): Worker { + return DispatcherWorker(dispatcher) + } + + internal class DispatcherWorker(val dispatcher: CoroutineDispatcher) : Worker(), CoroutineScope { + + private val job = Job() + + override val coroutineContext: CoroutineContext by lazy { dispatcher + job } + + override fun isDisposed(): Boolean = job.isCompleted + + override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable { + val taskJob = launch { + delay(unit.toMillis(delay)) + run.run() + } + return DisposableJob(taskJob) + } + + override fun dispose() { + job.cancel() + Schedulers.single() + } + } + + internal class DisposableJob(val job: Job) : Disposable { + override fun isDisposed(): Boolean = job.isCompleted + override fun dispose() = job.cancel() + } +} diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index 53fbaf6505..9a03741708 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -12,8 +12,12 @@ import kotlin.coroutines.CoroutineContext /** * Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher] * and provides native support of [delay] and [withTimeout]. + * If scheduler is instance of [CoroutineDispatcherScheduler] it just extracts underlying [CoroutineDispatcher]. */ -public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this) +public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = when (this) { + is CoroutineDispatcherScheduler -> dispatcher + else -> SchedulerCoroutineDispatcher(this) +} /** * Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler]. diff --git a/reactive/kotlinx-coroutines-rx2/test/DispatcherTest.kt b/reactive/kotlinx-coroutines-rx2/test/DispatcherTest.kt new file mode 100644 index 0000000000..8ec3b0eee0 --- /dev/null +++ b/reactive/kotlinx-coroutines-rx2/test/DispatcherTest.kt @@ -0,0 +1,58 @@ +package kotlinx.coroutines.rx2 + +import io.reactivex.Observable +import kotlinx.coroutines.TestBase +import kotlinx.coroutines.ignoreLostThreads +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher +import org.hamcrest.core.IsEqual +import org.junit.After +import org.junit.Assert +import org.junit.Before +import org.junit.Test + +class DispatcherTest : TestBase() { + + lateinit var dispatcher: ExperimentalCoroutineDispatcher + + @Before + fun setup() { + ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-") + dispatcher = ExperimentalCoroutineDispatcher(1, 1, 1, "TestDispatcher") + } + + @After + fun after() { + dispatcher.close() + } + + @Test + fun testDispatcher() { + val scheduler = dispatcher.asScheduler() + + lateinit var coroutineThread: Thread + runBlocking(dispatcher) { + coroutineThread = Thread.currentThread() + } + + expect(1) + Observable + .create { + expect(2) + val t1 = Thread.currentThread() + Assert.assertThat(t1, IsEqual(coroutineThread)) + it.onNext("Message") + it.onComplete() + } + .subscribeOn(scheduler) + .doOnNext { + expect(3) + val t2 = Thread.currentThread() + Assert.assertThat(t2, IsEqual(coroutineThread)) + } + .blockingSubscribe { + expect(4) + } + finish(5) + } +}