Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x Feature: introduce abstract time source for scheduler impl #7154

Closed
SergejIsbrecht opened this issue Jan 21, 2021 · 11 comments · Fixed by #7169
Closed

3.x Feature: introduce abstract time source for scheduler impl #7154

SergejIsbrecht opened this issue Jan 21, 2021 · 11 comments · Fixed by #7169

Comments

@SergejIsbrecht
Copy link
Contributor

SergejIsbrecht commented Jan 21, 2021

Version:
rxjava:3.0.9

Related:
#2943

Precondition:

  • Task with delay scheduled in a Scheduler

Problem:
When the linux kernel wakes up from suspension System.currentTimeMillis will be adjusted on linux x86 OpenJDK. In my case this is bad, because watchdogs/ timer will fire, which they shouldn't. I would like to use a different time-source, but can't, because System.currentTimeMillis is hard-coded.

    public long now(@NonNull TimeUnit unit) {
        return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

Solution:
I would like to introduce a TimeSource interface, which can be set via RxJavaPlugins.

interface TimeSource {
    fun now(unit : TimeUnit) : Long
}

Impl

class NanoSource : TimeSource {
    override fun now(unit: TimeUnit): Long {
        return unit.convert(System.nanoTime(), TimeUnit.NANOSECONDS)
    }
}

In order to not break the current behavior the default TimeSource would still be System.currentTimeMillis().

This idea was pitched by: @artem-zinnatullin in #2943 (comment)

If this is fine request is fine with the community I would try to implement it quickly and create a PR for it.

@SergejIsbrecht SergejIsbrecht changed the title 3.x Feature: introduce abstract time interface for scheduler 3.x Feature: introduce abstract time source for scheduler impl Jan 21, 2021
@akarnokd
Copy link
Member

You can override now when implementing Scheduler and Worker and delegate the other methods to an existing Scheduler and Worker.

@SergejIsbrecht
Copy link
Contributor Author

@akarnokd ,
thank you for your response.

I actually do not want to implement/ delegate, because this will probably only work 'new' scheduler. What about Schedulers.single or Schedulers.from (ExecutorScheduler).

For example, when I use Schedulers.single.worker(), the implementation for public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) will be used from super, which uses System.currentTimeMillis. That said, I am unable to re-use already existing Scheduler, because it uses a clock, which is not desired. In my case deep sleep is a feature, which will occur quite often and other developers in different departments probably do not know, that System.currentTimeMillis is used and just use Schedulers.single for everything, which could cause very subtle bugs.

Regarding delegation and overriding: Yes I could just wrap another Scheduler and overwrite now for Worker and Scheduler and use RxJavaPlugins in order to overwrite e.g. Schedulers.single, but I think this is actually more difficult, then just overwriting System.currentTimeMillis with an interface, which return a long as time-source.

Performance wise I would reckon, that C2/ ART would just inline the call to the time-source, if hot enough, but I would bench it first.

Do you have a better idea how to easily switch the time without requiring to re-implement schedulers (e.g. reuse Schedulers.single)?

@akarnokd
Copy link
Member

Another idea would be to use the Java standard ScheduledExecutorService (provided it doesn't have the same timing problem) to drive your sensitive flows through Subjects and/or Observable.create emitting the ticks.

Otherwise, yes, we would need to add indirection to the now calculation. The problem then is, should it be affecting any and all default Scheduler implementations or should it be more coarse grained?

@yuriykulikov
Copy link

Another idea would be to use the Java standard ScheduledExecutorService (provided it doesn't have the same timing problem) to drive your sensitive flows through Subjects and/or Observable.create emitting the ticks.

Otherwise, yes, we would need to add indirection to the now calculation. The problem then is, should it be affecting any and all default Scheduler implementations or should it be more coarse grained?

Is this a bad idea to add a method to RxJavaPlugins to override the default time source?
Something like this:

 /**
     * Sets the specific hook function.
     * @param handler the hook function to set, null allowed
     */
    public static void setTimeSourceHandler(@Nullable Function<? super TimeSource, ? extends TimeSource> handler) {
        if (lockdown) {
            throw new IllegalStateException("Plugins can't be changed anymore");
        }
        onTimeSourceHandler = handler;
    }

@SergejIsbrecht RxJavaPlugins can also be used to override single, io and computational schedulers. This doesn't cover Schedulers.from(), but at least it covers computation().

@akarnokd
Copy link
Member

Is this a bad idea to add a method to RxJavaPlugins to override the default time source?

I generally prefer to modify RxJavaPlugins as a last resort.

@SergejIsbrecht
Copy link
Contributor Author

Dear @akarnokd,

I have been looking into RxJavas Scheduler implementation. I will sum up my findings as follows:

Scheduler

Scheduler {
    Default:
        Disposable scheduleDirect(@NonNull Runnable run)
        Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) // depends on Worker default-impl
        Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) // uses now(TimeUnit) transitively from Worker default-impl
        long now(@NonNull TimeUnit unit)
    Abstract:
        abstract Worker createWorker()
}

Worker

Scheduler.Worker {
    Default:
        Disposable schedule(@NonNull Runnable run)
        Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) // uses now(TimeUnit.NANOSECONDS)
        long now(@NonNull TimeUnit unit)
    Abstract:
        abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)
}

Example implementation of SingleScheduler

SingleScheduler {
    Override:
        Disposable scheduleDirect(@NonNull Runnable run, long delay, TimeUnit unit)
        Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, TimeUnit unit)
        Worker createWorker()
}
SingleScheduler.Worker {
    Override:
        Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)
}

Condition

  • SingleScheduler uses a ScheduledExecutorService internally
  • ScheduledExecutorService uses System.nanoTime and SingleScheduler.Worker#schedulePeriodically uses System.currentTimeMillis

Problem:
SingleScheduler uses default impl schedulePeriodically from SingleScheduler.Worker, but overwrites Scheduler#scheduleDirect (with delay) & Scheduler#schedulePeriodicallyDirect

Conclusion:
Even one scheduler, in this case SingleScheduler, might use two different times, which might cause subtile issues, when returning from suspension (S2R). Any idea why SingleScheduler.Worker does not overwrite SingleScheduler.Worker#schedulePeriodically, just like it is done with SingleScheduler#schedulePeriodicallyDirect? I did not find a way to create a scheduler, which uses only one time-source at every case (e.g. ExecutorScheduler)

Regarding you suggestion:

You can override now when implementing Scheduler and Worker and delegate the other methods to an existing Scheduler and Worker.

I do see a problem with this approach:

  • in order to wrap a SingleScheduler, I would implement a new Scheduler and delegate all the work to given SingleScheduler and overload now with my time-source of choice. When looking into the implementation of Scheduler and Worker it becomes apparent, that now is actually only used in one place: Scheduler.Worker#schedulePeriodically. This means that this method should not be delegated in the new implementation, when SingleScheduler uses the default implementation. It can not be delegated, because the delegate would call now this it's this and not the overloaded now. This brings me to my point. I do need internal knowledge when and where now is used, which can change at any time. If this happens, my wrapper will probably not do what I want.

Conclusion:
It might be possible to use a different time source, but in the light of how RxJava implements/ delegates to Executors, the only option is actually to use System.nanoTime, in order to garantuee, that all methods use the same time-source.

Resolution:
Maybe we do not need a global time-source interface, which is set via RxJavaPlugins but extend some factory methods to take a time-source as a parameter when creating. By default a singleton of System.currentTimeMillis will be used. What do you think about it?

Otherwise, yes, we would need to add indirection to the now calculation. The problem then is, should it be affecting any and all default Scheduler implementations or should it be more coarse grained?

I would probably not do it. There might be a use-case, there you want to configure how each Scheduler should behave (e.g. fire directly after suspension, when times will be adjusted and time > timer_fires_time or not fire, because time was not adjusted and time < timer_fires_time), but this could be done with a simple overload, when creating a new Scheduler witth the factory provided by Schedulers. But in my opinion there is actually only one value, which should be allowed, and this is System.nanoTime, because all work is already delegate to ExecutorService, which internally uses this time-source. This would make sure, that all methods use the same time-source. As I see it, no one ever had such a problem or did not bother, therefore the time and resources are probably not well spend to extend RxJava to support fine grained editing of the time-source.

SchedulerWrapper-Impl:
https://gist.github.com/SergejIsbrecht/2a9d71e781c6f35d7b74e9ae2bc0c6ef

@akarnokd
Copy link
Member

ObservableInterval uses schedulePeriodicallyDirect. Would it work for your periodic needs?

Any idea why SingleScheduler.Worker does not overwrite SingleScheduler.Worker#schedulePeriodically, just like it is done with SingleScheduler#schedulePeriodicallyDirect

I have to think about this.

@SergejIsbrecht
Copy link
Contributor Author

ObservableInterval uses schedulePeriodicallyDirect. Would it work for your periodic needs?

This is probably more of a isosteric problen, than a real one. Our developers probably use every method in the Scheduler and Worker class. This is why I can not say in advance, which operators will be used.

Scheduler.Worker#schedulePeriodically

  • FlowableInterval
  • FlowableBufferTimed
  • FlowableWindowTimed

Scheduler#schedulePeriodicallyDirect

  • FlowableInterval
  • FlowableSampleTimed
  • FlowableWindowTimed

For example when I use a SingleScheduler and compose a Stream, which uses Scheduler.Worker#schedulePeriodically and Scheduler#schedulePeriodicallyDirect a developer might run into trouble, because a timer fires after suspension, and another one does not.

Example

        // Scheduler#schedulePeriodicallyDirect
        Flowable.interval(10, TimeUnit.SECONDS, Schedulers.single())
            // Scheduler.Worker#schedulePeriodically
            .buffer(5, TimeUnit.SECONDS, Schedulers.single())

@akarnokd
Copy link
Member

I see.

For a start, would the introduction of a system property rx3.scheduler.drift-use-nanotime and behavior help you?

@SergejIsbrecht
Copy link
Contributor Author

SergejIsbrecht commented Jan 26, 2021

For a start, would the introduction of a system property rx3.scheduler.drift-use-nanotime and behavior help you?

Yes, of course. This would be enough for me. I would probably call it rx3.scheduler.now-use-nanotime, but either way is fine.

Edit: if this is the way, I would like to suggest, that I backport this change back to RxJava2, because we are currently using it and we would not need to change, before RxJava2 reaches EOL on February 28, 2021

If you would like, I would create an PR for RxJava3 and RxJava3, adding said property.

@akarnokd
Copy link
Member

PR for RxJava3 and RxJava3, adding said property

Sure, go ahead. Let's use rx3.scheduler.drift-use-nanotime because there could be an implicit assumption that now(TimeUnit.MILLISECONDS) is the current time.

akarnokd pushed a commit that referenced this issue Jan 28, 2021
Issue-Id: #7154

Co-authored-by: Sergej Isbrecht <sergej.isbrecht@gmail.com>
akarnokd pushed a commit that referenced this issue Jan 28, 2021
Co-authored-by: Sergej Isbrecht <sergej.isbrecht@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants