Skip to content

Commit

Permalink
Provide a way to use CoroutineDispatcher as an Rx's Scheduler.
Browse files Browse the repository at this point in the history
Fixes #968
Fixes #548
  • Loading branch information
recheej authored and dkhalanskyjb committed Feb 21, 2022
1 parent 70ae22b commit d5f852c
Show file tree
Hide file tree
Showing 8 changed files with 1,379 additions and 18 deletions.
Expand Up @@ -69,7 +69,9 @@ public final class kotlinx/coroutines/rx2/RxObservableKt {
}

public final class kotlinx/coroutines/rx2/RxSchedulerKt {
public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher;
public static final fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/CoroutineDispatcher;
public static final synthetic fun asCoroutineDispatcher (Lio/reactivex/Scheduler;)Lkotlinx/coroutines/rx2/SchedulerCoroutineDispatcher;
public static final fun asScheduler (Lkotlinx/coroutines/CoroutineDispatcher;)Lio/reactivex/Scheduler;
}

public final class kotlinx/coroutines/rx2/RxSingleKt {
Expand Down
139 changes: 134 additions & 5 deletions reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Expand Up @@ -4,16 +4,143 @@

package kotlinx.coroutines.rx2

import io.reactivex.Scheduler
import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.plugins.*
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import java.util.concurrent.TimeUnit
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.channels.*
import java.util.concurrent.*
import kotlin.coroutines.*

/**
* Converts an instance of [Scheduler] to an implementation of [CoroutineDispatcher]
* and provides native support of [delay] and [withTimeout].
*/
public fun Scheduler.asCoroutineDispatcher(): SchedulerCoroutineDispatcher = SchedulerCoroutineDispatcher(this)
public fun Scheduler.asCoroutineDispatcher(): CoroutineDispatcher =
if (this is DispatcherScheduler) {
dispatcher
} else {
SchedulerCoroutineDispatcher(this)
}

@Deprecated(level = DeprecationLevel.HIDDEN, message = "Since 1.4.2, binary compatibility with earlier versions")
@JvmName("asCoroutineDispatcher")
public fun Scheduler.asCoroutineDispatcher0(): SchedulerCoroutineDispatcher =
SchedulerCoroutineDispatcher(this)

/**
* Converts an instance of [CoroutineDispatcher] to an implementation of [Scheduler].
*/
public fun CoroutineDispatcher.asScheduler(): Scheduler =
if (this is SchedulerCoroutineDispatcher) {
scheduler
} else {
DispatcherScheduler(this)
}

private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {

private val schedulerJob = SupervisorJob()

/**
* The scope for everything happening in this [DispatcherScheduler].
*
* Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)

/**
* The counter of created workers, for their pretty-printing.
*/
private val workerCounter = atomic(1L)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}

override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)

override fun shutdown() {
schedulerJob.cancel()
}

private class DispatcherWorker(
private val counter: Long,
private val dispatcher: CoroutineDispatcher,
parentJob: Job
) : Worker() {

private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)

init {
workerScope.launch {
blockChannel.consumeEach {
it()
}
}
}

override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { blockChannel.trySend(task) }
}

override fun isDisposed(): Boolean = !workerScope.isActive

override fun dispose() {
blockChannel.close()
workerJob.cancel()
}

override fun toString(): String = "$dispatcher (worker $counter, ${if (isDisposed) "disposed" else "active"})"
}

override fun toString(): String = dispatcher.toString()
}

private typealias Task = suspend () -> Unit

/**
* Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
* milliseconds.
*/
private fun CoroutineScope.scheduleTask(
block: Runnable,
delayMillis: Long,
adaptForScheduling: (Task) -> Runnable
): Disposable {
val ctx = coroutineContext
var handle: DisposableHandle? = null
val disposable = Disposables.fromRunnable {
// null if delay <= 0
handle?.dispose()
}
val decoratedBlock = RxJavaPlugins.onSchedule(block)
suspend fun task() {
if (disposable.isDisposed) return
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
handleUndeliverableException(e, ctx)
}
}

val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposables.disposed()
if (delayMillis <= 0) {
toSchedule.run()
} else {
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
}
return disposable
}

/**
* Implements [CoroutineDispatcher] on top of an arbitrary [Scheduler].
Expand Down Expand Up @@ -45,8 +172,10 @@ public class SchedulerCoroutineDispatcher(

/** @suppress */
override fun toString(): String = scheduler.toString()

/** @suppress */
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler

/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}
}
87 changes: 87 additions & 0 deletions reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt
@@ -0,0 +1,87 @@
/*
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.rx2

import kotlinx.coroutines.*
import org.junit.*
import java.util.concurrent.*

class SchedulerStressTest : TestBase() {
@Before
fun setup() {
ignoreLostThreads("RxCachedThreadScheduler-", "RxCachedWorkerPoolEvictor-", "RxSchedulerPurge-")
}

/**
* Test that we don't get an OOM if we schedule many jobs at once.
* It's expected that if you don't dispose you'd see an OOM error.
*/
@Test
fun testSchedulerDisposed(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposed(scheduler::scheduleDirect)
}

@Test
fun testSchedulerWorkerDisposed(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
val worker = scheduler.createWorker()
testRunnableDisposed(worker::schedule)
}

private suspend fun testRunnableDisposed(block: RxSchedulerBlockNoDelay) {
val n = 2000 * stressTestMultiplier
repeat(n) {
val a = ByteArray(1000000) //1MB
val disposable = block(Runnable {
keepMe(a)
expectUnreached()
})
disposable.dispose()
yield() // allow the scheduled task to observe that it was disposed
}
}

/**
* Test function that holds a reference. Used for testing OOM situations
*/
private fun keepMe(a: ByteArray) {
Thread.sleep(a.size / (a.size + 1) + 10L)
}

/**
* Test that we don't get an OOM if we schedule many delayed jobs at once. It's expected that if you don't dispose that you'd
* see a OOM error.
*/
@Test
fun testSchedulerDisposedDuringDelay(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
}

@Test
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
val worker = scheduler.createWorker()
testRunnableDisposedDuringDelay(worker::schedule)
}

private fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
val n = 2000 * stressTestMultiplier
repeat(n) {
val a = ByteArray(1000000) //1MB
val delayMillis: Long = 10
val disposable = block(Runnable {
keepMe(a)
expectUnreached()
}, delayMillis, TimeUnit.MILLISECONDS)
disposable.dispose()
}
}
}

0 comments on commit d5f852c

Please sign in to comment.