New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rework FluxReplay to avoid hanging, but reject 0 size #2741
Conversation
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
081c8e3
to
c6f78a7
Compare
reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Outdated
Show resolved
Hide resolved
reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java
Outdated
Show resolved
Hide resolved
reactor-core/src/test/java/reactor/core/publisher/FluxReplayTest.java
Outdated
Show resolved
Hide resolved
@@ -148,9 +159,10 @@ | |||
long maxAge, | |||
Scheduler scheduler) { | |||
this.limit = limit; | |||
this.indexUpdateLimit = Operators.unboundedOrLimit(limit); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as demonstrated by (currently failing) reactor.core.publisher.SinksTest.MulticastReplayN#checkSemanticsSize0
, limit
can be zero here which would lead to ArithmeticException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove tests that check zero history size and added protection against passing zero as a history size since it is meaning-less for replay operator (in such scenario we can suggest alternative represented in the .publish
operator)
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
@simonbasle most of the things were fixed / improved. If all good, will be adding stresstests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a small clarification on javadoc wording still, but otherwise looks good.
the commit message will need to be explicit that this commit removes support for historySize(0), just in case some users weirdly depend on it.
go ahead with stress tests
@@ -1598,7 +1606,7 @@ static long markDisposed(ReplaySubscriber<?> instance) { | |||
} | |||
|
|||
/** | |||
* Check if state has subscribed flag indicating subscription reception | |||
* Check if state has {@link #CONNECTED_FLAG} flag indicating subscription reception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
which of SUBSCRIBED_FLAG and CONNECTED_FLAG indicates subscription reception
? both this javadoc and the one below now correctly link to the right constant, but they otherwise use the exact same phrasing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
Signed-off-by: Oleh Dokuka <odokuka@vmware.com>
For the pragmatic pov of not breaking anybody that would depend on history size 0 in a patch release (probably by mistake, but still...) what do you think about the operator method delegating to Flux.cache when size is zero @OlegDokuka? |
@sbasleTo be BC I would replace that automatically with with Flux Publish |
Yeah I think that's what I meant: There could also be a warning logged, or a mention in the release notes that next major version will reject |
This is not entirely ideal, but we're making the bet here that nobody actually uses Still marking as For |
While I don't remember why I had this construct, I do have a usage of
within my tests which failed when ingesting the dependency update with this change. (I suspect I wanted a sink that would allow multiple subscriptions and replay terminal signals) |
How can I create a sink that:
Using None of the |
closes #1921
This PR adds a prefetch strategy driven by active subscribers so the next prefetch happens only when all the subscribers have consumed a particular element. To enable that, now every impl has an index actively utilized to identify if specific element is a trigger for the next prefetch.
In case there are no subscribers, the prefetch strategy is self-driven and requests more when there are produced N elements into the cache
Signed-off-by: Oleh Dokuka odokuka@vmware.com