Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix early cancellation handling and improve switchOnFirst #2019

Merged
merged 4 commits into from Mar 6, 2020

Conversation

OlegDokuka
Copy link
Contributor

This PR provides a couple of non-breaking changes in the behavior of SwitchOnFirst operator:

  1. Applies Name changes in internal class naming
  2. Provides additional parameter which allows canceling the source Publisher or continuing it work regardless derived Publisher completion
  3. Provides the ability to cancel source if there are no elements but downstream has already been canceled (actually this was a bug, cuz in case of Flux.never or Fluxes which require much time to produce an element it was impossible to cancel execution in a timely manner. Imagine the case -> Flux.just(1).delayElement(Durations.ofHour(1)).switchOnFirst(...).timeout(Duration.ofSeconds(1))).

Signed-off-by: Oleh Dokuka shadowgun@i.ua

@OlegDokuka
Copy link
Contributor Author

FYI. The extended API is needed for RSocket-Java in order to follow the specified behaviors

In absence of ERROR or CANCEL, the stream is terminated after both Requester and Responder have sent and received COMPLETE.

@codecov-io
Copy link

codecov-io commented Jan 22, 2020

Codecov Report

Merging #2019 into master will increase coverage by 0.1%.
The diff coverage is 88.61%.

Impacted file tree graph

@@             Coverage Diff             @@
##             master    #2019     +/-   ##
===========================================
+ Coverage     81.83%   81.94%   +0.1%     
- Complexity     4027     4028      +1     
===========================================
  Files           376      376             
  Lines         30981    31050     +69     
  Branches       5766     5781     +15     
===========================================
+ Hits          25353    25443     +90     
+ Misses         4053     4036     -17     
+ Partials       1575     1571      -4
Impacted Files Coverage Δ Complexity Δ
...ore/src/main/java/reactor/core/publisher/Flux.java 97.71% <100%> (+0.12%) 529 <2> (+2) ⬆️
...rc/main/java/reactor/core/publisher/Operators.java 81.87% <25%> (+0.2%) 136 <1> (+1) ⬆️
...java/reactor/core/publisher/FluxSwitchOnFirst.java 90.79% <89.79%> (+7.59%) 3 <0> (ø) ⬇️
...ain/java/reactor/core/scheduler/SchedulerTask.java 73.21% <0%> (-3.58%) 15% <0%> (-2%)
...c/main/java/reactor/core/publisher/MonoCreate.java 73.48% <0%> (-2.28%) 3% <0%> (ø)
.../java/reactor/core/publisher/UnicastProcessor.java 87.15% <0%> (-1.12%) 68% <0%> (-1%)
.../java/reactor/core/publisher/BlockingIterable.java 78.35% <0%> (-1.04%) 7% <0%> (ø)
...c/main/java/reactor/core/publisher/FluxReplay.java 85% <0%> (+0.14%) 29% <0%> (ø) ⬇️
... and 3 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 1856429...422e5ce. Read the comment docs.

@simonbasle simonbasle self-assigned this Jan 28, 2020
Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not totally sure I understood the deal with tryRequest and the REQUESTED state. @OlegDokuka can you explain these a little more in details?


void tryRequest() {
final Subscription s = this.s;
long r = REQUESTED.getAndSet(this, -1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you should use the constant STATE_REQUESTED here too

return;
}
else {
r = requested;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for clarity's sake, use this.requested here

}
u = Operators.addCap(r, n);
if (REQUESTED.compareAndSet(this, r, u)) {
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it break from the for(;;) loop rather than completely exiting?
I get the return in the case where we detect we're running in unbounded mode, but here we're aggregating requests and it is not clear to me how they will be propagated upstream...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it is a long max value then it means that it was requested and not consumed yet. Therefore we have simply returned without checking anything. Actually, I can add tests which check the correctness just in case you are worried about that path

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what guarantees that the n you just added to r in addCap are actually propagated upstream? since you're returning, and r > STATE_REQUESTED, there's no chance the tryRequest will swap the new r for -1 and perform s.request(r), so that amount n is lost. Am I missing something?

Copy link
Contributor Author

@OlegDokuka OlegDokuka Feb 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure.

Let's consider the following:

State 1: No subscription; actual makes a request so we get in into the request method:

Line 576 : we read the current request size

Line 578 we check that r is higher than -1 and in into for-loop

Cases: Line 585: perform CAS operation which means if it is (1 - true, 2 - false):

  1. then we have won racing and the last observed request was higher than -1 so there is no upstream subscription and we can exit request method
  2. then we read the request field at Line 589 and perform a check on request is less than 0 (which means it is either canceled or we lost the racing so we have to break the loop and go straight to reading subscription s and perform a direct request(n).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanations, comments and face-to-face review, this is clear to me now 👍

return;
}

s.request(n);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only done when this.requested == STATE_REQUESTED, ie in tryRequest, in turn only invoked in onSubscribe, correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct, since there is a guard for that at line 578 I'm not trying to double-check that again here. it is assumed if the execution got there than the state is requested

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I meant is "what if the state was NOT requested, but a second batch of request (eg. this operator is called with request(3) then request(4)). wouldn't the second request call be "lost"?

Copy link
Contributor Author

@OlegDokuka OlegDokuka Mar 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not going to be lost. Consider the following

State 1 [r: 0]

request(3) -> State 2 [r: 3]

State 2 [r: 3] race with [onSubscribe]
Case 1:
tryRequest( getAndSet(-1) ) Happens Before request( REQUESTED.addAndGet(4) )

Result:
request(3) followed by
request(4) with a happens-before guarantee

Case 2:
request ( REQUESTED.addAndGet(4)) happens before tryRequest( getAndSet(-1) )

Result:
request(7)

}
u = Operators.addCap(r, n);
if (REQUESTED.compareAndSet(this, r, u)) {
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what guarantees that the n you just added to r in addCap are actually propagated upstream? since you're returning, and r > STATE_REQUESTED, there's no chance the tryRequest will swap the new r for -1 and perform s.request(r), so that amount n is lost. Am I missing something?

return;
}

s.request(n);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I meant is "what if the state was NOT requested, but a second batch of request (eg. this operator is called with request(3) then request(4)). wouldn't the second request call be "lost"?

1) Applies Name changes in internal class naming
2) Provides Additional parameter which allows cancelling the source Publisher or continuing it work regardless derived Publisher completion
3) Provides ability to cancel source if there is no elements but downstream has already been cancelled

Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
if (!cancelled) {
cancelled = true;
s.cancel();
if (INNER.getAndSet(this, Operators.emptySubscriber()) != Operators.emptySubscriber()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using empty subscriber instance (which I assume for internal usage only) in order to indicate canceled state and

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to be extra sure to be shielded from Operators.emptySubscriber() ever returning a new instance, you can directly use Operators.EMPTY_SUBSCRIBER ? (fine for usage internal to the publisher package)

@@ -147,36 +141,38 @@ public void cancel() {
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
s.request(1);
this.outer.sendSubscription();
if (this.inner != Operators.emptySubscriber()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

double-check here in order to ensure that we really need to request the first element. if the subscription has been canceled already then noops required

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, you can use Operators.EMPTY_SUBSCRIBER to be 100% shielded from API implementation details


if (cancelled) {
Operators.onDiscard(f, a.currentContext());
if (a == Operators.emptySubscriber()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made emptySubscriber an instance of ConditionalSubscriber specifically for that case

Copy link
Member

@simonbasle simonbasle Mar 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll evaluate that change separately in a comment below (the conditional one). ok for conditional ✅ Otherwise, same as elsewhere, can use EMPTY_SUBSCRIBER constant.

Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small adjustment suggestions, but this is almost there 👍

if (!cancelled) {
cancelled = true;
s.cancel();
if (INNER.getAndSet(this, Operators.emptySubscriber()) != Operators.emptySubscriber()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess to be extra sure to be shielded from Operators.emptySubscriber() ever returning a new instance, you can directly use Operators.EMPTY_SUBSCRIBER ? (fine for usage internal to the publisher package)

@@ -147,36 +141,38 @@ public void cancel() {
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
this.s = s;
s.request(1);
this.outer.sendSubscription();
if (this.inner != Operators.emptySubscriber()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again, you can use Operators.EMPTY_SUBSCRIBER to be 100% shielded from API implementation details

@@ -186,29 +182,29 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if (done) {
final CoreSubscriber<? super T> i = this.inner;
if (this.done || i == Operators.emptySubscriber()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

@@ -223,27 +219,27 @@ public void onError(Throwable t) {

@Override
public void onComplete() {
if (done) {
final CoreSubscriber<? super T> i = this.inner;
if (this.done || i == Operators.emptySubscriber()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

if (cancelled) {
Operators.onDiscard(f, a.currentContext());
return;
if (a == Operators.emptySubscriber()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use EMPTY_SUBSCRIBER constant here too


if (cancelled) {
Operators.onDiscard(f, a.currentContext());
if (a == Operators.emptySubscriber()) {
Copy link
Member

@simonbasle simonbasle Mar 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll evaluate that change separately in a comment below (the conditional one). ok for conditional ✅ Otherwise, same as elsewhere, can use EMPTY_SUBSCRIBER constant.

}
u = Operators.addCap(r, n);
if (REQUESTED.compareAndSet(this, r, u)) {
return;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the explanations, comments and face-to-face review, this is clear to me now 👍

Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
@simonbasle simonbasle changed the title [bugfix/refactoring] SwitchOnFirst Fix early cancellation handling and improve switchOnFirst Mar 6, 2020
@simonbasle simonbasle merged commit 34df2b0 into reactor:master Mar 6, 2020
@simonbasle simonbasle added type/bug A general bug type/enhancement A general enhancement labels Mar 6, 2020
@simonbasle simonbasle added this to the 3.3.4.RELEASE milestone Mar 6, 2020
simonbasle pushed a commit that referenced this pull request Mar 10, 2020
Signed-off-by: Oleh Dokuka <shadowgun@i.ua>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug type/enhancement A general enhancement
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants