From a13d99e7d615cad422cb8a3f64f055c8159f7152 Mon Sep 17 00:00:00 2001 From: Dmitry Khalanskiy Date: Tue, 25 Jan 2022 12:06:45 +0300 Subject: [PATCH] Fixes --- .../kotlinx-coroutines-rx2/src/RxScheduler.kt | 85 +++++++++--------- .../kotlinx-coroutines-rx3/src/RxScheduler.kt | 86 ++++++++++--------- 2 files changed, 87 insertions(+), 84 deletions(-) diff --git a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt index be03d7a451..9531c922a3 100644 --- a/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt @@ -8,7 +8,6 @@ import io.reactivex.* import io.reactivex.disposables.* import io.reactivex.plugins.* import kotlinx.coroutines.* -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.channels.* import java.util.concurrent.* import kotlin.coroutines.* @@ -39,7 +38,7 @@ public fun CoroutineDispatcher.asScheduler(): Scheduler = DispatcherScheduler(this) } -private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() { +private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() { private val schedulerJob = SupervisorJob() @@ -50,16 +49,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul */ private val scope = CoroutineScope(schedulerJob + dispatcher) - override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable { - if (!scope.isActive) return Disposables.disposed() - var handle: DisposableHandle? = null - val task = buildTask(block) { handle!! } - val newBlock = Runnable { scope.launch { task.block() } } - val ctx = scope.coroutineContext - @Suppress("INVISIBLE_MEMBER") - ctx.delay.invokeOnTimeout(unit.toMillis(delay), newBlock, ctx).let { handle = it } - return task - } + override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + scope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { scope.launch { task() } } + } override fun createWorker(): Worker = DispatcherWorker(dispatcher, schedulerJob) @@ -82,17 +75,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul } } - override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable { - if (!workerScope.isActive) return Disposables.disposed() - val timeMillis = unit.toMillis(delay) - var handle: DisposableHandle? = null - val task = buildTask(block) { handle!! } - val newBlock = Runnable { blockChannel.trySend(task.block) } - val ctx = workerScope.coroutineContext - @Suppress("INVISIBLE_MEMBER") - ctx.delay.invokeOnTimeout(timeMillis, newBlock, ctx).let { handle = it } - return task - } + override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { blockChannel.trySend(task) } + } override fun isDisposed(): Boolean = !workerScope.isActive @@ -101,30 +87,45 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul workerJob.cancel() } } +} - data class Task(val disposable: Disposable, val block: suspend () -> Unit): Disposable by disposable +private typealias Task = suspend () -> Unit - companion object { - private fun buildTask(block: Runnable, handle: () -> DisposableHandle): Task { - val decoratedBlock = RxJavaPlugins.onSchedule(block) - val disposable = Disposables.fromRunnable { - handle().dispose() - } - return Task(disposable) { - if (!disposable.isDisposed) { - try { - runInterruptible { - decoratedBlock.run() - } - } catch (e: Throwable) { - // TODO: what about TimeoutCancellationException? - if (e !is CancellationException) - RxJavaPlugins.onError(e) - } - } +/** + * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis] + * milliseconds. + */ +private fun CoroutineScope.scheduleTask( + block: Runnable, + delayMillis: Long, + adaptForScheduling: (Task) -> Runnable +): Disposable { + val ctx = coroutineContext + var handle: DisposableHandle? = null + val disposable = Disposables.fromRunnable { + // null if delay <= 0 + handle?.dispose() + } + val decoratedBlock = RxJavaPlugins.onSchedule(block) + suspend fun task() { + if (disposable.isDisposed) return + try { + runInterruptible { + decoratedBlock.run() } + } catch (e: Throwable) { + handleUndeliverableException(e, ctx) } } + val toSchedule = adaptForScheduling(::task) + if (!isActive) return Disposables.disposed() + if (delayMillis <= 0) { + toSchedule.run() + } else { + @Suppress("INVISIBLE_MEMBER") + ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + } + return disposable } /** diff --git a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt index 38aee5b52f..33b815c334 100644 --- a/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt +++ b/reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt @@ -39,7 +39,7 @@ public fun CoroutineDispatcher.asScheduler(): Scheduler = DispatcherScheduler(this) } -private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Scheduler() { +private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher) : Scheduler() { private val schedulerJob = SupervisorJob() @@ -50,16 +50,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul */ private val scope = CoroutineScope(schedulerJob + dispatcher) - override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable { - if (!scope.isActive) return Disposable.disposed() - var handle: DisposableHandle? = null - val task = buildTask(block) { handle!! } - val newBlock = Runnable { scope.launch { task.block() } } - val ctx = scope.coroutineContext - @Suppress("INVISIBLE_MEMBER") - ctx.delay.invokeOnTimeout(unit.toMillis(delay), newBlock, ctx).let { handle = it } - return task - } + override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + scope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { scope.launch { task() } } + } override fun createWorker(): Worker = DispatcherWorker(dispatcher, schedulerJob) @@ -82,17 +76,10 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul } } - override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable { - if (!workerScope.isActive) return Disposable.disposed() - val timeMillis = unit.toMillis(delay) - var handle: DisposableHandle? = null - val task = buildTask(block) { handle!! } - val newBlock = Runnable { blockChannel.trySend(task.block) } - val ctx = workerScope.coroutineContext - @Suppress("INVISIBLE_MEMBER") - ctx.delay.invokeOnTimeout(timeMillis, newBlock, ctx).let { handle = it } - return task - } + override fun schedule(block: Runnable, delay: Long, unit: TimeUnit): Disposable = + workerScope.scheduleTask(block, unit.toMillis(delay)) { task -> + Runnable { blockChannel.trySend(task) } + } override fun isDisposed(): Boolean = !workerScope.isActive @@ -101,30 +88,45 @@ private class DispatcherScheduler(val dispatcher: CoroutineDispatcher) : Schedul workerJob.cancel() } } +} - data class Task(val disposable: Disposable, val block: suspend () -> Unit): Disposable by disposable +private typealias Task = suspend () -> Unit - companion object { - private fun buildTask(block: Runnable, handle: () -> DisposableHandle): Task { - val decoratedBlock = RxJavaPlugins.onSchedule(block) - val disposable = Disposable.fromRunnable { - handle().dispose() - } - return Task(disposable) { - if (!disposable.isDisposed) { - try { - runInterruptible { - decoratedBlock.run() - } - } catch (e: Throwable) { - // TODO: what about TimeoutCancellationException? - if (e !is CancellationException) - RxJavaPlugins.onError(e) - } - } +/** + * Schedule [block] so that an adapted version of it, wrapped in [adaptForScheduling], executes after [delayMillis] + * milliseconds. + */ +private fun CoroutineScope.scheduleTask( + block: Runnable, + delayMillis: Long, + adaptForScheduling: (Task) -> Runnable +): Disposable { + val ctx = coroutineContext + var handle: DisposableHandle? = null + val disposable = Disposable.fromRunnable { + // null if delay <= 0 + handle?.dispose() + } + val decoratedBlock = RxJavaPlugins.onSchedule(block) + suspend fun task() { + if (disposable.isDisposed) return + try { + runInterruptible { + decoratedBlock.run() } + } catch (e: Throwable) { + handleUndeliverableException(e, ctx) } } + val toSchedule = adaptForScheduling(::task) + if (!isActive) return Disposable.disposed() + if (delayMillis <= 0) { + toSchedule.run() + } else { + @Suppress("INVISIBLE_MEMBER") + ctx.delay.invokeOnTimeout(delayMillis, toSchedule, ctx).let { handle = it } + } + return disposable } /** @@ -163,4 +165,4 @@ public class SchedulerCoroutineDispatcher( /** @suppress */ override fun hashCode(): Int = System.identityHashCode(scheduler) -} \ No newline at end of file +}