New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix #1992 Reimplement boundedElasticScheduler with reentrancy but less efficient work stealing #2040
Conversation
4af677d
to
6e0d168
Compare
note: for now |
@rstoyanchev @bsideup this should now be in final state, except for the pending removal of |
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Show resolved
Hide resolved
final long expireMillis; | ||
@Override | ||
public boolean isDisposed() { | ||
return get() == -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please extract the constant (to be used in dispose()
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Codecov Report
@@ Coverage Diff @@
## master #2040 +/- ##
============================================
- Coverage 81.85% 81.81% -0.04%
+ Complexity 4033 4024 -9
============================================
Files 376 376
Lines 31070 30973 -97
Branches 5814 5765 -49
============================================
- Hits 25431 25342 -89
- Misses 4049 4058 +9
+ Partials 1590 1573 -17
Continue to review full report at Codecov.
|
6aea16c
to
0d02a07
Compare
ttlSeconds, | ||
ttlSeconds, | ||
TimeUnit.SECONDS); | ||
evictor.scheduleAtFixedRate(boundedServices::eviction, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this.boundedServices =
, this.evictor =
and scheduleAtFixedRate
can be replaced with a single start()
since they share the implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
they don't really share the implementation since the BOUNDED_SERVICE
would need to be initialized to SHUTDOWN
first, and in constructor the looping is superfluous.
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
exec.shutdownNow(); | ||
} | ||
} | ||
//else optimistically retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WDYT about using continue
here (and also in the next two cases)? Otherwise, if we accidentally add some logic to the end of this loop, it will get executed when inner if
resolves to false
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avoiding that is the goal of the comment, but I'll augment the comment with "(implicit continue here)"
. otherwise triggers inspection "continue is unnecessary as the last statement in a loop".
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/scheduler/BoundedElasticScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simonbasle I enjoyed reviewing! Very clean implementation, easy to follow! 👍
Left a few comments but overall looks good :)
ok I'm now preparing this for merging, by removing the old code and squashing the commits that needs squashing (if not all), cc @bsideup |
The general idea is to abandon the facade Worker and instead always submit tasks to an executor-backed worker. In order of preference, when an operator requests a Worker: - if thread cap not reached, create and pick a new worker - else if idle workers, pick an idle worker - else pick a busy worker This implies a behavior under contention that is closer to parallel(), but with a pool that is expected to be quite larger than the typical parallel pool. The drawback is that once we get to pick a busy worker, there's no telling when its tasks (typically blocking tasks for a BoundedElasticScheduler) will finish. So even though another executor might become idle in the meantime, the operator's tasks will be pinned to the (potentially still busy) executor initially picked. To try to counter that effect a bit, we use a priority queue for the busy executors, favoring executors that are tied to less Workers (and thus less operators). We don't yet go as far as factoring in the task queue of each executor. Finally, one noticeable change is that the second int parameter in the API, maxPendingTask, is now influencing EACH executor's queue instead of being a shared counter. It should be safe in the sense that the number set with previous version in mind is bound to be over-dimensionned for the new version, but it would be recommended for users to reconsider that number.
997554b
to
cd2beff
Compare
This should fix #1973 and fix #1992
The general idea is to abandon the facade
Worker
and instead always submit tasks to an executor-backed worker. In order of preference, when an operator requests aWorker
:This implies a behavior under contention that is closer to
parallel()
, with a pool that is expected to be quite larger than the typical parallel pool. Livelocks can still happen but they seem to be far too frequent with the current implementation anyway.The drawback is that once we get to pick a busy worker, there's no telling when its tasks (typically blocking tasks for a
BoundedElasticScheduler
) will finish. So even though another executor might become idle in the meantime, the operator's tasks will be pinned to the (potentially still busy) executor initially picked.To try to counter that effect a bit, we use a priority queue for the busy executors, favoring executors that are tied to less
Worker
s (and thus operators). We don't yet go as far as factoring in the task queue of each executor.Finally, one noticeable change is that the second
int
parameter in the API,maxPendingTask
, is now influencing EACH executor's queue, and is not a shared counter. It should be safe in the sense that the number set with previous version in mind is bound to be over-dimensionned for the new version, but it would be recommended for users to scale that number back.