Skip to content

Commit

Permalink
Use less coroutines and jobs
Browse files Browse the repository at this point in the history
Also,
* Don't use `kotlinx-coroutines-test`
* Now shutting down the scheduler will interrupt the tasks
  • Loading branch information
dkhalanskyjb committed Jan 25, 2022
1 parent 895c643 commit 82d4b48
Show file tree
Hide file tree
Showing 8 changed files with 390 additions and 260 deletions.
1 change: 0 additions & 1 deletion reactive/kotlinx-coroutines-rx2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import org.jetbrains.dokka.gradle.DokkaTaskPartial
dependencies {
api project(':kotlinx-coroutines-reactive')
testImplementation project(':kotlinx-coroutines-reactive').sourceSets.test.output
testImplementation project(':kotlinx-coroutines-test')
testImplementation "org.reactivestreams:reactive-streams-tck:$reactive_streams_version"
api "io.reactivex.rxjava2:rxjava:$rxjava2_version"
}
Expand Down
126 changes: 55 additions & 71 deletions reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.*
import java.util.concurrent.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -42,95 +43,86 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul

private val schedulerJob = SupervisorJob()

private val scope = CoroutineScope(schedulerJob + dispatcher + CoroutineExceptionHandler { _, throwable ->
RxJavaPlugins.onError(throwable)
})

override fun scheduleDirect(block: Runnable): Disposable =
scheduleDirect(block, 0, TimeUnit.MILLISECONDS)
/**
* The scope for everything happening in this [DispatcherScheduler].
*
* Running tasks, too, get launched under this scope, because [shutdown] should cancel the running tasks as well.
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!scope.isActive) return Disposables.disposed()
val newBlock = RxJavaPlugins.onSchedule(block)
return scope.launch {
delay(unit.toMillis(delay))
newBlock.run()
}.asDisposable()
var handle: DisposableHandle? = null
val task = buildTask(block) { handle!! }
val newBlock = Runnable { scope.launch { task.block() } }
val ctx = scope.coroutineContext
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(unit.toMillis(delay), newBlock, ctx).let { handle = it }
return task
}

override fun createWorker(): Worker =
DispatcherWorker(dispatcher, schedulerJob).also {
it.start()
}
override fun createWorker(): Worker = DispatcherWorker(dispatcher, schedulerJob)

override fun shutdown() {
scope.cancel()
schedulerJob.cancel()
}

private class DispatcherWorker(dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() {

private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
private val blockChannel = Channel<SchedulerChannelTask>(Channel.UNLIMITED)
private val blockChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)

fun start() {
init {
workerScope.launch {
while (isActive) {
val task = blockChannel.receive()
task.execute()
task()
}
}
}

override fun isDisposed(): Boolean = !workerScope.isActive

override fun schedule(block: Runnable): Disposable =
schedule(block, 0, TimeUnit.MILLISECONDS)

override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!workerScope.isActive) return Disposables.disposed()

val newBlock = RxJavaPlugins.onSchedule(block)

val taskJob = Job(workerJob)
val task = SchedulerChannelTask(newBlock, taskJob)

if (delay <= 0L) {
blockChannel.offer(task)
} else {
// Use `taskJob` as the parent here so the delay will also get cancelled if the Disposable
// is disposed.
workerScope.launch(taskJob) {
// Delay *before* enqueuing the task, so other tasks (e.g. via schedule without delay)
// aren't blocked by the delay.
delay(unit.toMillis(delay))
// Once the task is ready to run, it still needs to be executed via the queue to comply
// with the Scheduler contract of running all worker tasks in a non-overlapping manner.
blockChannel.offer(task)
}
}

return taskJob.asDisposable()
val timeMillis = unit.toMillis(delay)
var handle: DisposableHandle? = null
val task = buildTask(block) { handle!! }
val newBlock = Runnable { blockChannel.trySend(task.block) }
val ctx = workerScope.coroutineContext
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(timeMillis, newBlock, ctx).let { handle = it }
return task
}

override fun isDisposed(): Boolean = !workerScope.isActive

override fun dispose() {
workerScope.cancel()
blockChannel.close()
workerJob.cancel()
}
}
}

/**
* Represents a task to be queued sequentially on a [Channel] for a [Scheduler.Worker].
*
* Delayed tasks do not block [Channel] from processing other tasks
*/
private class SchedulerChannelTask(
private val block: Runnable,
job: Job
) : JobDisposable(job) {
fun execute() {
if (job.isActive) {
block.run()
data class Task(val disposable: Disposable, val block: suspend () -> Unit): Disposable by disposable

companion object {
private fun buildTask(block: Runnable, handle: () -> DisposableHandle): Task {
val decoratedBlock = RxJavaPlugins.onSchedule(block)
val disposable = Disposables.fromRunnable {
handle().dispose()
}
return Task(disposable) {
if (!disposable.isDisposed) {
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
// TODO: what about TimeoutCancellationException?
if (e !is CancellationException)
RxJavaPlugins.onError(e)
}
}
}
}
}
}
Expand Down Expand Up @@ -165,18 +157,10 @@ public class SchedulerCoroutineDispatcher(

/** @suppress */
override fun toString(): String = scheduler.toString()

/** @suppress */
override fun equals(other: Any?): Boolean = other is SchedulerCoroutineDispatcher && other.scheduler === scheduler

/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}

private open class JobDisposable(protected val job: Job) : Disposable {
override fun isDisposed(): Boolean = !job.isActive

override fun dispose() {
job.cancel()
}
}

private fun Job.asDisposable(): Disposable = JobDisposable(this)
}
12 changes: 5 additions & 7 deletions reactive/kotlinx-coroutines-rx2/test/SchedulerStressTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package kotlinx.coroutines.rx2

import kotlinx.coroutines.*
import kotlinx.coroutines.test.*
import org.junit.*
import java.util.concurrent.*

Expand All @@ -16,8 +15,8 @@ class SchedulerStressTest : TestBase() {
}

/**
* Test that we don't get an OOM if we schedule many jobs at once. It's expected that if you don't dispose that you'd
* see a OOM error.
* Test that we don't get an OOM if we schedule many jobs at once.
* It's expected that if you don't dispose you'd see an OOM error.
*/
@Test
fun testSchedulerDisposed(): Unit = runTest {
Expand Down Expand Up @@ -73,20 +72,20 @@ class SchedulerStressTest : TestBase() {
* see a OOM error.
*/
@Test
fun testSchedulerDisposedDuringDelay(): Unit = runBlockingTest {
fun testSchedulerDisposedDuringDelay(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposedDuringDelay(scheduler::scheduleDirect)
}

@Test
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runBlockingTest {
fun testSchedulerWorkerDisposedDuringDelay(): Unit = runTest {
val dispatcher = currentDispatcher() as CoroutineDispatcher
val scheduler = dispatcher.asScheduler()
testRunnableDisposedDuringDelay(scheduler.createWorker()::schedule)
}

private suspend fun TestCoroutineScope.testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
private suspend fun testRunnableDisposedDuringDelay(block: RxSchedulerBlockWithDelay) {
expect(1)

val dispatcher = currentDispatcher() as CoroutineDispatcher
Expand All @@ -103,7 +102,6 @@ class SchedulerStressTest : TestBase() {
}
}, delayMillis, TimeUnit.MILLISECONDS)
disposable.dispose()
advanceTimeBy(delayMillis)
yield()
}
}
Expand Down

0 comments on commit 82d4b48

Please sign in to comment.