Skip to content

Commit

Permalink
Style
Browse files Browse the repository at this point in the history
  • Loading branch information
dkhalanskyjb committed Feb 21, 2022
1 parent 0c3ba51 commit e79d8de
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 4 deletions.
13 changes: 11 additions & 2 deletions reactive/kotlinx-coroutines-rx2/src/RxScheduler.kt
Expand Up @@ -50,19 +50,27 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher)
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)

/**
* The counter of created workers, for their pretty-printing.
*/
private val workerCounter = atomic(1L)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}

private val workerCounter = atomic(1L)
override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)

override fun shutdown() {
schedulerJob.cancel()
}

private class DispatcherWorker(val counter: Long, val dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() {
private class DispatcherWorker(
private val counter: Long,
private val dispatcher: CoroutineDispatcher,
parentJob: Job
) : Worker() {

private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
Expand Down Expand Up @@ -122,6 +130,7 @@ private fun CoroutineScope.scheduleTask(
handleUndeliverableException(e, ctx)
}
}

val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposables.disposed()
if (delayMillis <= 0) {
Expand Down
13 changes: 11 additions & 2 deletions reactive/kotlinx-coroutines-rx3/src/RxScheduler.kt
Expand Up @@ -51,19 +51,27 @@ private class DispatcherScheduler(@JvmField val dispatcher: CoroutineDispatcher)
*/
private val scope = CoroutineScope(schedulerJob + dispatcher)

/**
* The counter of created workers, for their pretty-printing.
*/
private val workerCounter = atomic(1L)

override fun scheduleDirect(block: Runnable, delay: Long, unit: TimeUnit): Disposable =
scope.scheduleTask(block, unit.toMillis(delay)) { task ->
Runnable { scope.launch { task() } }
}

private val workerCounter = atomic(1L)
override fun createWorker(): Worker = DispatcherWorker(workerCounter.getAndIncrement(), dispatcher, schedulerJob)

override fun shutdown() {
schedulerJob.cancel()
}

private class DispatcherWorker(val counter: Long, val dispatcher: CoroutineDispatcher, parentJob: Job) : Worker() {
private class DispatcherWorker(
private val counter: Long,
private val dispatcher: CoroutineDispatcher,
parentJob: Job
) : Worker() {

private val workerJob = SupervisorJob(parentJob)
private val workerScope = CoroutineScope(workerJob + dispatcher)
Expand Down Expand Up @@ -123,6 +131,7 @@ private fun CoroutineScope.scheduleTask(
handleUndeliverableException(e, ctx)
}
}

val toSchedule = adaptForScheduling(::task)
if (!isActive) return Disposable.disposed()
if (delayMillis <= 0) {
Expand Down

0 comments on commit e79d8de

Please sign in to comment.