Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow converting CoroutineDispatcher to RxJava scheduler #968

Closed
ansman opened this issue Feb 4, 2019 · 8 comments
Closed

Allow converting CoroutineDispatcher to RxJava scheduler #968

ansman opened this issue Feb 4, 2019 · 8 comments

Comments

@ansman
Copy link
Contributor

ansman commented Feb 4, 2019

Currently there is a way to create a CoroutineDispatcher from a Scheduler (RxJava) which is great but it would also be convenient to be able to do the reverse. RxJava allows you to set your own schedulers in the Schedulers singleton meaning it could be overridden for the whole system.

@ansman
Copy link
Contributor Author

ansman commented Feb 4, 2019

Here is a sample implementation:

class DispatcherBackedScheduler(private val dispatcher: CoroutineDispatcher) : Scheduler() {
    override fun createWorker(): Worker = DispatcherBackedWorker(dispatcher)

    private class DispatcherBackedWorker(private val dispatcher: CoroutineDispatcher) : Worker(), CoroutineScope {
        private val job = SupervisorJob()
        override val coroutineContext: CoroutineContext = EmptyCoroutineContext + job
        override fun isDisposed(): Boolean = !job.isActive
        override fun dispose() = job.cancel()

        override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable =
            if (delay <= 0) {
                dispatcher.dispatch(coroutineContext, run)
                Disposables.disposed()
            } else {
                launch(dispatcher) {
                    kotlinx.coroutines.delay(unit.toMillis(delay))
                    run.run()
                }.asDisposable()
            }
    }
}

private fun Job.asDisposable(): Disposable = object : Disposable {
    override fun isDisposed(): Boolean = !isActive
    override fun dispose() = cancel()
}

@Guang1234567
Copy link

Guang1234567 commented Mar 4, 2019

@ansman

Should CoroutineScope.asScheduler like below?

private fun Job.asDisposable(): Disposable = object : Disposable {
    override fun isDisposed(): Boolean = !isActive
    override fun dispose() = cancel()
}

fun CoroutineScope.asScheduler(): Scheduler {
    return ScopeBackedScheduler(this)
}

class ScopeBackedScheduler(private val scope: CoroutineScope) : Scheduler() {
    override fun createWorker(): Worker = ScopeBackedWorker(scope)

    private class ScopeBackedWorker(private val scope: CoroutineScope) : Worker() {
        override fun isDisposed(): Boolean = !scope.isActive
        override fun dispose() = scope.cancel()

        override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
            if (isDisposed) {
                return Disposables.disposed()
            }

            val job = scope.launch {
                if (delay > 0) {
                    delay(unit.toMillis(delay))
                }
                run.run()
            }
            return job.asDisposable()
        }
    }
}

I just want the lifecycle of CoroutineScopeScheduler as long as Activity’s like below:

class MainActivity : AppCompatActivity(), CoroutineScope {

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Main + job

    private lateinit var job: Job

    override fun onCreate(savedInstanceState: Bundle?) {
             super.onCreate(savedInstanceState)
             job = Job()
            
            RxView.onClick(btnABC).
                 .observeOn(this.asScheduler())  // The stream will be completed when the `job.cancel()` was called in `onDestroy()`
                 .subscribe(...)
       }

    override fun onDestroy() {
            job.cancel()
            super.onDestroy()
    }
}

@Guang1234567
Copy link

Guang1234567 commented Mar 5, 2019

@ansman

The kotlinx-coroutines-rx2 is interesting.

And it expand some excellent feature (new Backpressure strategy), so we can use it directly without cusstom a CoroutineDispatcherScheduler!

But keep in mind on the production env.

@ansman
Copy link
Contributor Author

ansman commented Mar 5, 2019

@Guang1234567 Your implementation of ScopeBackedScheduler has several problems:
• If can only handle a single dispatched job at the same time
• A worker has a shorter lifespan than a scope so you can't cancel the scope when disposing the worker

Also, you never dispose your click observer.

@Guang1234567
Copy link

Guang1234567 commented May 17, 2019

@ansman

Thanks your reply.

I just fix the problem under your suggestion like:

fun CoroutineScope.asScheduler(): Scheduler {
    return ScopeBackedScheduler(this)
}

class ScopeBackedScheduler(private val scope: CoroutineScope) : Scheduler() {
    override fun createWorker(): Worker = ScopeBackedWorker(scope)

    private class ScopeBackedWorker(private val scope: CoroutineScope) : Worker() {
        private val job = SupervisorJob(scope.coroutineContext[Job])
        override fun isDisposed(): Boolean = !job.isActive
        override fun dispose() = job.cancel()

        override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable {
            if (isDisposed) {
                return Disposables.disposed()
            }

            return scope.launch(job) {
                if (delay > 0) {
                    delay(unit.toMillis(delay))
                }
                run.run()
            }.asDisposable()
        }
    }
}

It solve several problems:

  • The Job’s lifespan as long as Worker. Because it is the child job of the scope, so we can cacnel the job instead of cancel the scope when disposing the worker.

@bkolarov
Copy link

Are there any intentions to get this finished?

@ansman
Copy link
Contributor Author

ansman commented Oct 15, 2020

I'm happy to provide a pull request as long as someone from the Coroutines team approves the idea

@elizarov
Copy link
Contributor

I'm happy to provide a pull request as long as someone from the Coroutines team approves the idea

There is a PR #1923 in review, close to being ready to merge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants