Skip to content

Commit

Permalink
Rework FluxReplay to avoid hanging, but reject 0 size (#2741)
Browse files Browse the repository at this point in the history
This commit reworks the FluxReplay implementation to use a prefetch
strategy, as well as state-machine-like state to better keep track of
demand, avoiding hanging.

The operator is driven by active subscribers so the next prefetch
happens only when all the subscribers have consumed a particular
element. To enable that, now every implementation has an index actively
utilized to identify if specific element is a trigger for the next
prefetch.

In case there are no subscribers, the prefetch strategy is self-driven
and requests more when there are produced N elements into the cache

The downside is that historySize 0 is not supported anymore. It is
believed that it wasn't really relevant, but on the slight possibility
that this is actually used out there, the FluxReplay is replaced by a
FluxPublish transparently when 0 is passed to the operator method.

On the `Sinks` side, since part of the implementation is shared, the
same limitation is present, but we are more proactive in rejecting a
zero limit / historySize there. The bet is that for this younger API,
such incoherent usage hasn't emerged yet.

Fixes #2711.

Co-authored-by: Simon Baslé <sbasle@vmware.com>
  • Loading branch information
OlegDokuka and simonbasle committed Sep 9, 2021
1 parent a72bb0d commit a596764
Show file tree
Hide file tree
Showing 7 changed files with 544 additions and 193 deletions.
8 changes: 8 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -7500,6 +7500,10 @@ public final ConnectableFlux<T> replay() {
*
*/
public final ConnectableFlux<T> replay(int history) {
if (history == 0) {
//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
}
return onAssembly(new FluxReplay<>(this, history, 0L, null));
}

Expand Down Expand Up @@ -7579,6 +7583,10 @@ public final ConnectableFlux<T> replay(Duration ttl, Scheduler timer) {
*/
public final ConnectableFlux<T> replay(int history, Duration ttl, Scheduler timer) {
Objects.requireNonNull(timer, "timer");
if (history == 0) {
//TODO Flux.replay with history == 0 doesn't make much sense. This was replaced by Flux.publish, but such calls will be rejected in a future version
return onAssembly(new FluxPublish<>(this, Queues.SMALL_BUFFER_SIZE, Queues.get(Queues.SMALL_BUFFER_SIZE)));
}
return onAssembly(new FluxReplay<>(this, history, ttl.toNanos(), timer));
}

Expand Down

0 comments on commit a596764

Please sign in to comment.