New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Way to use CoroutineDispatcher as RxJava2 scheduler (#548) #857
Conversation
Tests are failing, please udpate binary compatibility data ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
General feedback:
-
It is not clear why jobs are required in the first place.
CoroutineDispatcher
API surface should be enough to implement the scheduler. -
asScheduler
result should be disposable if and only if underlying dispatcher is disposable, otherwise API contrdicts itself. -
How to ensure that
CoroutineDispatcherScheduler
fully compliesScheduler
API contract?
E.g. take a look atExecutorScheduler
: it is far from a trivialfun schedule(...) = executor.execute(block)
. Why? Does rx2 have some test suite for schedulers? Can some scheduler tests from rx2 be run againstasScheduler
scheduler?
|
||
override fun dispose() { | ||
job.cancel() | ||
Schedulers.single() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the point of this line?
|
||
internal class DispatcherWorker(val dispatcher: CoroutineDispatcher) : Worker(), CoroutineScope { | ||
|
||
private val job = Job() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the point of having a root job?
I see multiple caveats:
- Additional runtime overhead
- If a scheduled job will fail with an exception, it will cancel its parent and no more items will be scheduled.
override fun schedule(run: Runnable, delay: Long, unit: TimeUnit): Disposable { | ||
val taskJob = launch { | ||
delay(unit.toMillis(delay)) | ||
run.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would happen if run
throws an exception?
.create<String> { | ||
expect(2) | ||
val t1 = Thread.currentThread() | ||
Assert.assertThat(t1, IsEqual(coroutineThread)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ExperimentalCoroutineDispatcher
does not have such guarantee.
Please use newSingleThreadContext
instead
import org.junit.Test | ||
|
||
class DispatcherTest : TestBase() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add more test cases: exception handling, Unconfined
wrapping, etc.
If you introduce a new API, please make sure it works in a lot of scenarios, not only the basic ones.
} | ||
|
||
internal class DisposableJob(val job: Job) : Disposable { | ||
override fun isDisposed(): Boolean = job.isCompleted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that isDisposed
can return false
after dispose
call for a long time.
Does it match disposable contract?
Closing this as outdated |
I made implementation for #548 issue