Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework FluxReplay to avoid hanging, but reject 0 size #2741

Merged
merged 9 commits into from
Sep 9, 2021
154 changes: 85 additions & 69 deletions reactor-core/src/main/java/reactor/core/publisher/FluxReplay.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@
* @param <T>
* @see <a href="https://github.com/reactor/reactive-streams-commons">Reactive-Streams-Commons</a>
*/
final class FluxReplay<T> extends ConnectableFlux<T> implements Scannable, Fuseable,
OptimizableOperator<T, T> {
final class FluxReplay<T> extends ConnectableFlux<T>
implements Scannable, Fuseable, OptimizableOperator<T, T> {

final CorePublisher<T> source;
final int history;
final long ttl;
final Scheduler scheduler;
final CorePublisher<T> source;
final int history;
final long ttl;
final Scheduler scheduler;

volatile ReplaySubscriber<T> connection;
volatile ReplaySubscriber<T> connection;
@SuppressWarnings("rawtypes")
static final AtomicReferenceFieldUpdater<FluxReplay, ReplaySubscriber> CONNECTION =
AtomicReferenceFieldUpdater.newUpdater(FluxReplay.class,
Expand Down Expand Up @@ -140,9 +140,9 @@ static final class TimedNode<T> extends AtomicReference<TimedNode<T>> {
}
}

final int limit;
final int indexUpdateLimit;
final long maxAge;
final int limit;
final int indexUpdateLimit;
final long maxAge;
final Scheduler scheduler;
int size;

Expand Down Expand Up @@ -423,7 +423,9 @@ public int capacity() {
@Override
public void add(T value) {
final TimedNode<T> tail = this.tail;
final TimedNode<T> n = new TimedNode<>(tail.index + 1, value, scheduler.now(TimeUnit.NANOSECONDS));
final TimedNode<T> n = new TimedNode<>(tail.index + 1,
value,
scheduler.now(TimeUnit.NANOSECONDS));
tail.set(n);
this.tail = n;
int s = size;
Expand Down Expand Up @@ -718,7 +720,8 @@ public T poll(ReplaySubscription<T> rs) {

if ((index + 1) % indexUpdateLimit == 0) {
rs.requestMore(index + 1);
} else {
}
else {
rs.index(index + 1);
}

Expand Down Expand Up @@ -762,7 +765,7 @@ static final class SizeBoundReplayBuffer<T> implements ReplayBuffer<T> {
Throwable error;

SizeBoundReplayBuffer(int limit) {
if(limit < 0){
if (limit < 0) {
throw new IllegalArgumentException("Limit cannot be negative");
}
this.limit = limit;
Expand Down Expand Up @@ -962,7 +965,7 @@ static final class Node<T> extends AtomicReference<Node<T>> {
private static final long serialVersionUID = 3713592843205853725L;

final int index;
final T value;
final T value;

Node(int index, @Nullable T value) {
this.index = index;
Expand Down Expand Up @@ -1057,10 +1060,13 @@ public int size() {
else {
this.optimizableOperator = null;
}
this.history = history;
if(history < 0){
throw new IllegalArgumentException("History cannot be negative : " + history);

if (history <= 0) {
throw new IllegalArgumentException("History cannot be zero or negative : " + history);
}

this.history = history;

if (scheduler != null && ttl < 0) {
throw new IllegalArgumentException("TTL cannot be negative : " + ttl);
}
Expand All @@ -1077,15 +1083,16 @@ ReplaySubscriber<T> newState() {
if (scheduler != null) {
return new ReplaySubscriber<>(new SizeAndTimeBoundReplayBuffer<>(history,
ttl,
scheduler),
this);
scheduler), this, history);
}
if (history != Integer.MAX_VALUE) {
return new ReplaySubscriber<>(new SizeBoundReplayBuffer<>(history),
this);
this,
history);
}
return new ReplaySubscriber<>(new UnboundedReplayBuffer<>(Queues.SMALL_BUFFER_SIZE),
this);
this,
Queues.SMALL_BUFFER_SIZE);
}

@Override
Expand Down Expand Up @@ -1134,7 +1141,8 @@ public void subscribe(CoreSubscriber<? super T> actual) {
}

@Override
public final CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) throws Throwable {
public final CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual)
throws Throwable {
boolean expired;
for (; ; ) {
ReplaySubscriber<T> c = connection;
Expand Down Expand Up @@ -1189,16 +1197,16 @@ public Object scanUnsafe(Scannable.Attr key) {
return null;
}

static final class ReplaySubscriber<T>
implements InnerConsumer<T>, Disposable {
static final class ReplaySubscriber<T> implements InnerConsumer<T>, Disposable {

final FluxReplay<T> parent;
final ReplayBuffer<T> buffer;
final int limit;
final long prefetch;
final int limit;

Subscription s;
int produced;
int nextPrefetchIndex;
int produced;
int nextPrefetchIndex;

volatile ReplaySubscription<T>[] subscribers;

Expand All @@ -1213,12 +1221,12 @@ static final class ReplaySubscriber<T>
static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0];

@SuppressWarnings("unchecked")
ReplaySubscriber(ReplayBuffer<T> buffer,
FluxReplay<T> parent) {
ReplaySubscriber(ReplayBuffer<T> buffer, FluxReplay<T> parent, int prefetch) {
this.buffer = buffer;
this.parent = parent;
this.subscribers = EMPTY;
this.limit = Operators.unboundedOrLimit(parent.history);
this.prefetch = Operators.unboundedOrPrefetch(prefetch);
this.limit = Operators.unboundedOrLimit(prefetch);
this.nextPrefetchIndex = this.limit;
}

Expand All @@ -1238,27 +1246,29 @@ public void onSubscribe(Subscription s) {
return;
}

s.request(this.parent.history);
s.request(this.prefetch);
}
}

void manageRequest(long currentState) {
final Subscription p = this.s;
for (;;) {
for (; ; ) {

int nextPrefetchIndex = this.nextPrefetchIndex;
boolean shouldPrefetch = true;
boolean shouldPrefetch;

// find out if we need to make another prefetch
final ReplaySubscription<T>[] subscribers = this.subscribers;
if (subscribers.length > 0) {
shouldPrefetch = true;
for (ReplaySubscription<T> rp : subscribers) {
if (rp.index() < nextPrefetchIndex) {
shouldPrefetch = false;
break;
}
}
} else {
}
else {
shouldPrefetch = this.produced >= nextPrefetchIndex;
}

Expand Down Expand Up @@ -1477,25 +1487,23 @@ public boolean isDisposed() {
return isDisposed(this.state);
}


static final long CONNECTED_FLAG =
static final long CONNECTED_FLAG =
0b0001_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L;
static final long SUBSCRIBED_FLAG =
static final long SUBSCRIBED_FLAG =
0b0010_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L;
static final long DISPOSED_FLAG =
static final long DISPOSED_FLAG =
0b1000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000_0000L;
static final long WORK_IN_PROGRESS_MAX_VALUE =
0b0000_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111_1111L;

/**
* Set {@link #CONNECTED_FLAG} and increments amount of work. Fails if states
* has the flag {@link #DISPOSED_FLAG}
* Adds {@link #CONNECTED_FLAG} to the state. Fails if the flag is already set
*
* @param instance to operate on
* @return true if flag was set
*/
static boolean markConnected(ReplaySubscriber<?> instance) {
for (;;) {
for (; ; ) {
final long state = instance.state;

if (isConnected(state)) {
Expand All @@ -1509,14 +1517,14 @@ static boolean markConnected(ReplaySubscriber<?> instance) {
}

/**
* Set {@link #SUBSCRIBED_FLAG}. Fails if states has the flag
* {@link #DISPOSED_FLAG}
* Adds {@link #SUBSCRIBED_FLAG} to the state. Fails if states has the flag {@link
* #DISPOSED_FLAG}
*
* @param instance to operate on
* @return previous observed state
*/
static long markSubscribed(ReplaySubscriber<?> instance) {
for (;;) {
for (; ; ) {
final long state = instance.state;

if (isDisposed(state)) {
Expand All @@ -1530,14 +1538,14 @@ static long markSubscribed(ReplaySubscriber<?> instance) {
}

/**
* Set {@link #DISPOSED_FLAG}. Fails if states
* has already had the {@link #DISPOSED_FLAG} flag
* Increments the work in progress part of the state, up to its max value. Fails
* if states has already had the {@link #DISPOSED_FLAG} flag
*
* @param instance to operate on
* @return previous observed state
*/
static long markWorkAdded(ReplaySubscriber<?> instance) {
for (;;) {
for (; ; ) {
final long state = instance.state;

if (isDisposed(state)) {
Expand All @@ -1555,14 +1563,14 @@ static long markWorkAdded(ReplaySubscriber<?> instance) {
}

/**
* Set {@link #DISPOSED_FLAG}. Fails if given states
* not equal to the actual state
* Sets work in progress to zero. Fails if given states not equal to the actual
* state.
*
* @param instance to operate on
* @return previous observed state
*/
static long markWorkDone(ReplaySubscriber<?> instance, long currentState) {
for (;;) {
for (; ; ) {
final long state = instance.state;

if (currentState != state) {
Expand All @@ -1577,14 +1585,14 @@ static long markWorkDone(ReplaySubscriber<?> instance, long currentState) {
}

/**
* Set {@link #DISPOSED_FLAG}. Fails if states
* has already had the {@link #DISPOSED_FLAG} flag
* Adds {@link #DISPOSED_FLAG} to the state. Fails if states has already had
* the flag
*
* @param instance to operate on
* @return previous observed state
*/
static long markDisposed(ReplaySubscriber<?> instance) {
for (;;) {
for (; ; ) {
final long state = instance.state;

if (isDisposed(state)) {
Expand All @@ -1598,7 +1606,7 @@ static long markDisposed(ReplaySubscriber<?> instance) {
}

/**
* Check if state has subscribed flag indicating subscription reception
* Check if state has {@link #CONNECTED_FLAG} flag indicating subscription reception
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which of SUBSCRIBED_FLAG and CONNECTED_FLAG indicates subscription reception? both this javadoc and the one below now correctly link to the right constant, but they otherwise use the exact same phrasing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* @param state to check flag presence
* @return true if flag is set
Expand All @@ -1608,7 +1616,7 @@ static boolean isConnected(long state) {
}

/**
* Check if state has subscribed flag indicating subscription reception
* Check if state has {@link #SUBSCRIBED_FLAG} flag indicating subscription reception
*
* @param state to check flag presence
* @return true if flag is set
Expand All @@ -1628,7 +1636,7 @@ static boolean isWorkInProgress(long state) {
}

/**
* Check if state has disposed flag
* Check if state has {@link #DISPOSED_FLAG} flag
*
* @param state to check flag presence
* @return true if flag is set
Expand All @@ -1639,8 +1647,7 @@ static boolean isDisposed(long state) {

}

static final class ReplayInner<T>
implements ReplaySubscription<T> {
static final class ReplayInner<T> implements ReplaySubscription<T> {

final CoreSubscriber<? super T> actual;
final ReplaySubscriber<T> parent;
Expand All @@ -1655,18 +1662,16 @@ static final class ReplayInner<T>

long totalRequested;

volatile int wip;
volatile int wip;
@SuppressWarnings("rawtypes")
static final AtomicIntegerFieldUpdater<ReplayInner> WIP =
AtomicIntegerFieldUpdater.newUpdater(ReplayInner.class, "wip");


volatile long requested;
volatile long requested;
@SuppressWarnings("rawtypes")
static final AtomicLongFieldUpdater<ReplayInner> REQUESTED =
AtomicLongFieldUpdater.newUpdater(ReplayInner.class, "requested");


ReplayInner(CoreSubscriber<? super T> actual, ReplaySubscriber<T> parent) {
this.actual = actual;
this.parent = parent;
Expand All @@ -1687,12 +1692,24 @@ public void request(long n) {
@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.PARENT) return parent;
if (key == Attr.TERMINATED) return parent.isTerminated();
if (key == Attr.BUFFERED) return size();
if (key == Attr.CANCELLED) return isCancelled();
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) return Math.max(0L, requested);
if (key == Attr.RUN_ON) return parent.parent.scheduler;
if (key == Attr.PARENT) {
return parent;
}
if (key == Attr.TERMINATED) {
return parent.isTerminated();
}
if (key == Attr.BUFFERED) {
return size();
}
if (key == Attr.CANCELLED) {
return isCancelled();
}
if (key == Attr.REQUESTED_FROM_DOWNSTREAM) {
return Math.max(0L, requested);
}
if (key == Attr.RUN_ON) {
return parent.parent.scheduler;
}

return ReplaySubscription.super.scanUnsafe(key);
}
Expand Down Expand Up @@ -1795,7 +1812,6 @@ public void requestMore(int index) {
this.parent.manageRequest(previousState + 1);
}


@Override
public int tailIndex() {
return tailIndex;
Expand Down