Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Way to use CoroutineDispatcher as RxJava2 scheduler (#548) #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
64 changes: 64 additions & 0 deletions reactive/kotlinx-coroutines-rx2/src/RxCoroutineDispatcher.kt
@@ -0,0 +1,64 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import io.reactivex.Scheduler
import io.reactivex.disposables.Disposable
import io.reactivex.schedulers.Schedulers
import kotlinx.coroutines.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext

/**
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
* If dispatcher is instance of [SchedulerCoroutineDispatcher] it just extracts underlying [Scheduler].
*/
public fun CoroutineDispatcher.asScheduler(): Scheduler = when (this) {
is SchedulerCoroutineDispatcher -> scheduler
else -> CoroutineDispatcherScheduler(this)
}

/**
* Implements [Scheduler] on top of an arbitrary [CoroutineDispatcher].
* @param dispatcher a dispatcher.
*/
public class CoroutineDispatcherScheduler(
/**
* Underlying dispatcher of current [Scheduler].
*/
public val dispatcher: CoroutineDispatcher
) : Scheduler() {

override fun createWorker(): Worker {
return DispatcherWorker(dispatcher)
}

internal class DispatcherWorker(val dispatcher: CoroutineDispatcher) : Worker(), CoroutineScope {

private val job = Job()

override val coroutineContext: CoroutineContext by lazy { dispatcher + job }

override fun isDisposed(): Boolean = job.isCompleted

override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
val taskJob = launch {
delay(unit.toMillis(delay))
run.run()
}
return DisposableJob(taskJob)
}

override fun dispose() {
job.cancel()
Schedulers.single()
}
}

internal class DisposableJob(val job: Job) : Disposable {
override fun isDisposed(): Boolean = job.isCompleted
override fun dispose() = job.cancel()
}
}
6 changes: 5 additions & 1 deletion reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Expand Up @@ -12,8 +12,12 @@ import kotlin.coroutines.CoroutineContext
/**
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
* and provides native support of [delay] and [withTimeout].
* If scheduler is instance of [CoroutineDispatcherScheduler] it just extracts underlying [CoroutineDispatcher].
*/
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher = when (this) {
is CoroutineDispatcherScheduler -> dispatcher
else -> SchedulerCoroutineDispatcher(this)
}

/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
Expand Down
58 changes: 58 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/DispatcherTest.kt
@@ -0,0 +1,58 @@
package kotlinx.coroutines.rx2

import io.reactivex.Observable
import kotlinx.coroutines.TestBase
import kotlinx.coroutines.ignoreLostThreads
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher
import org.hamcrest.core.IsEqual
import org.junit.After
import org.junit.Assert
import org.junit.Before
import org.junit.Test

class DispatcherTest : TestBase() {

lateinit var dispatcher: ExperimentalCoroutineDispatcher

@Before
fun setup() {
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
dispatcher = ExperimentalCoroutineDispatcher(1, 1, 1, "TestDispatcher")
}

@After
fun after() {
dispatcher.close()
}

@Test
fun testDispatcher() {
val scheduler = dispatcher.asScheduler()

lateinit var coroutineThread: Thread
runBlocking(dispatcher) {
coroutineThread = Thread.currentThread()
}

expect(1)
Observable
.create<String> {
expect(2)
val t1 = Thread.currentThread()
Assert.assertThat(t1, IsEqual(coroutineThread))
it.onNext("Message")
it.onComplete()
}
.subscribeOn(scheduler)
.doOnNext {
expect(3)
val t2 = Thread.currentThread()
Assert.assertThat(t2, IsEqual(coroutineThread))
}
.blockingSubscribe {
expect(4)
}
finish(5)
}
}