-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement variant of windowTimeout with fairBackpressure #3054
Conversation
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
This reverts commit 9a76d65.
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice and huge work ! my main concern is around discarding of window queued elements rather than the window itself.
@@ -177,7 +179,12 @@ public static void race(int timeoutSeconds, Scheduler s, final Runnable... rs) { | |||
|
|||
try { | |||
if (!cdl.await(timeoutSeconds, TimeUnit.SECONDS)) { | |||
throw new AssertionError("RaceTestUtils.race wait timed out after " + timeoutSeconds + "s"); | |||
throw new AssertionError("RaceTestUtils.race wait timed out after " + timeoutSeconds + "s" + Thread.getAllStackTraces() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks potentially very verbose for the general case? maybe add an overload that does the allStackTraces appending?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest adding an overload with a boolean collectAllStackTraces
parameter
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
import reactor.util.context.Context; | ||
|
||
/** | ||
* @author David Karnok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can add yourself here too @OlegDokuka 😆
reactor-core/src/main/java/reactor/core/publisher/FluxWindowTimeout.java
Outdated
Show resolved
Hide resolved
int received = this.received + 1 ; | ||
this.received = received; | ||
|
||
this.queue.offer(t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we guard against an already cancelled window here and directly discard each element before they get a chance to be added to the queue?
reactor-core/src/main/java/reactor/core/publisher/FluxWindowTimeout.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxWindowTimeout.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxWindowTimeout.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@simonbasle this PR seems to have been merged on a maintenance branch, please ensure the change is merge-forwarded to intermediate maintenance branches and up to |
This commit improves the InnerWindow.sendNext method to guard against attempts at sending more than the window's maxSize elements, earlier in the codepath. It also removes an unused method that was left-in during iterations on the backpressure-aware new implementation from #3054.
No description provided.