Skip to content

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Jan 25, 2022
1 parent 82d4b48 commit a13d99e
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 84 deletions.
85 changes: 43 additions & 42 deletions reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Expand Up @@ -8,7 +8,6 @@ import io.reactivex.*
import io.reactivex.disposables.*
import io.reactivex.plugins.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.channels.*
import java.util.concurrent.*
import kotlin.coroutines.*
Expand Down Expand Up @@ -39,7 +38,7 @@ public fun CoroutineDispatcher.asScheduler(): Scheduler =
DispatcherScheduler(this)
}

private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() {
private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {

private val schedulerJob = SupervisorJob()

Expand All @@ -50,16 +49,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!scope.isActive) return Disposables.disposed()
var handle: DisposableHandle? = null
val task = buildTask(block) { handle!! }
val newBlock = Runnable { scope.launch { task.block() } }
val ctx = scope.coroutineContext
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(unit.toMillis(delay), newBlock, ctx).let { handle = it }
return task
}
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}

override fun createWorker(): Worker = DispatcherWorker(dispatcher, schedulerJob)

Expand All @@ -82,17 +75,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul
}
}

override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!workerScope.isActive) return Disposables.disposed()
val timeMillis = unit.toMillis(delay)
var handle: DisposableHandle? = null
val task = buildTask(block) { handle!! }
val newBlock = Runnable { blockChannel.trySend(task.block) }
val ctx = workerScope.coroutineContext
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(timeMillis, newBlock, ctx).let { handle = it }
return task
}
override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { blockChannel.trySend(task) }
}

override fun isDisposed(): Boolean = !workerScope.isActive

Expand All @@ -101,30 +87,45 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul
workerJob.cancel()
}
}
}

data class Task(val disposable: Disposable, val block: suspend () -> Unit): Disposable by disposable
private typealias Task = suspend () -> Unit

companion object {
private fun buildTask(block: Runnable, handle: () -> DisposableHandle): Task {
val decoratedBlock = RxJavaPlugins.onSchedule(block)
val disposable = Disposables.fromRunnable {
handle().dispose()
}
return Task(disposable) {
if (!disposable.isDisposed) {
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
// TODO: what about TimeoutCancellationException?
if (e !is CancellationException)
RxJavaPlugins.onError(e)
}
}
/**
* Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
* milliseconds.
*/
private fun CoroutineScope.scheduleTask(
block: Runnable,
delayMillis: Long,
adaptForScheduling: (Task) -> Runnable
): Disposable {
val ctx = coroutineContext
var handle: DisposableHandle? = null
val disposable = Disposables.fromRunnable {
// null if delay <= 0
handle?.dispose()
}
val decoratedBlock = RxJavaPlugins.onSchedule(block)
suspend fun task() {
if (disposable.isDisposed) return
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
handleUndeliverableException(e, ctx)
}
}
val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposables.disposed()
if (delayMillis <= 0) {
toSchedule.run()
} else {
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
}
return disposable
}

/**
Expand Down
86 changes: 44 additions & 42 deletions reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt
Expand Up @@ -39,7 +39,7 @@ public fun CoroutineDispatcher.asScheduler(): Scheduler =
DispatcherScheduler(this)
}

private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() {
private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() {

private val schedulerJob = SupervisorJob()

Expand All @@ -50,16 +50,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!scope.isActive) return Disposable.disposed()
var handle: DisposableHandle? = null
val task = buildTask(block) { handle!! }
val newBlock = Runnable { scope.launch { task.block() } }
val ctx = scope.coroutineContext
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(unit.toMillis(delay), newBlock, ctx).let { handle = it }
return task
}
override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}

override fun createWorker(): Worker = DispatcherWorker(dispatcher, schedulerJob)

Expand All @@ -82,17 +76,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul
}
}

override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable {
if (!workerScope.isActive) return Disposable.disposed()
val timeMillis = unit.toMillis(delay)
var handle: DisposableHandle? = null
val task = buildTask(block) { handle!! }
val newBlock = Runnable { blockChannel.trySend(task.block) }
val ctx = workerScope.coroutineContext
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(timeMillis, newBlock, ctx).let { handle = it }
return task
}
override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
workerScope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { blockChannel.trySend(task) }
}

override fun isDisposed(): Boolean = !workerScope.isActive

Expand All @@ -101,30 +88,45 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul
workerJob.cancel()
}
}
}

data class Task(val disposable: Disposable, val block: suspend () -> Unit): Disposable by disposable
private typealias Task = suspend () -> Unit

companion object {
private fun buildTask(block: Runnable, handle: () -> DisposableHandle): Task {
val decoratedBlock = RxJavaPlugins.onSchedule(block)
val disposable = Disposable.fromRunnable {
handle().dispose()
}
return Task(disposable) {
if (!disposable.isDisposed) {
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
// TODO: what about TimeoutCancellationException?
if (e !is CancellationException)
RxJavaPlugins.onError(e)
}
}
/**
* Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis]
* milliseconds.
*/
private fun CoroutineScope.scheduleTask(
block: Runnable,
delayMillis: Long,
adaptForScheduling: (Task) -> Runnable
): Disposable {
val ctx = coroutineContext
var handle: DisposableHandle? = null
val disposable = Disposable.fromRunnable {
// null if delay <= 0
handle?.dispose()
}
val decoratedBlock = RxJavaPlugins.onSchedule(block)
suspend fun task() {
if (disposable.isDisposed) return
try {
runInterruptible {
decoratedBlock.run()
}
} catch (e: Throwable) {
handleUndeliverableException(e, ctx)
}
}
val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposable.disposed()
if (delayMillis <= 0) {
toSchedule.run()
} else {
@Suppress("INVISIBLE_MEMBER")
ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it }
}
return disposable
}

/**
Expand Down Expand Up @@ -163,4 +165,4 @@ public class SchedulerCoroutineDispatcher(

/** @suppress */
override fun hashCode(): Int = System.identityHashCode(scheduler)
}
}

0 comments on commit a13d99e

Please sign in to comment.