Skip to content

Commit

Permalink
Align MonoCollect with MonoList
Browse files Browse the repository at this point in the history
This change aligns the implementation of MonoCollect to be comparable
to MonoList. Effectively they're the same, a MonoList is but a
MonoCollect with an ArrayList as the container.

Fix reactor#2186
  • Loading branch information
rstoyanchev committed Jun 16, 2020
1 parent f6fcae1 commit 3e6aea3
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 39 deletions.
95 changes: 61 additions & 34 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java
Expand Up @@ -72,6 +72,8 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R

final BiConsumer<? super R, ? super T> action;

R container;

Subscription s;

boolean done;
Expand All @@ -81,35 +83,17 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R
R container) {
super(actual);
this.action = action;
this.value = container;
this.container = container;
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;

if (key == Attr.TERMINATED) return done;
return super.scanUnsafe(key);
}

@Override
public void cancel() {
super.cancel();
s.cancel();
}

@Override
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand All @@ -127,15 +111,22 @@ public void onNext(T t) {
Operators.onNextDropped(t, actual.currentContext());
return;
}

try {
action.accept(value, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
R c;
synchronized (this) {
c = container;
if (c != null) {
try {
action.accept(c, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
}
return;
}
}
Operators.onDiscard(t, actual.currentContext());
}

@Override
Expand All @@ -145,9 +136,12 @@ public void onError(Throwable t) {
return;
}
done = true;
R v = value;
discard(v);
value = null;
R c;
synchronized (this) {
c = container;
container = null;
}
discard(c);
actual.onError(t);
}

Expand All @@ -157,13 +151,46 @@ public void onComplete() {
return;
}
done = true;
complete(value);
R c;
synchronized (this) {
c = container;
container = null;
}
if (c != null) {
complete(c);
}
}

@Override
public void setValue(R value) {
// value is constant
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void cancel() {
int state;
R c;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state <= HAS_REQUEST_NO_VALUE) {
c = container;
value = null;
container = null;
}
else {
c = null;
}
}
if (c != null) {
s.cancel();
discard(c);
}
}
}
}
Expand Up @@ -42,10 +42,10 @@ public void subscribe(CoreSubscriber<? super List<T>> actual) {

static final class MonoCollectListSubscriber<T> extends Operators.MonoSubscriber<T, List<T>> {

Subscription s;

List<T> list;

Subscription s;

boolean done;

MonoCollectListSubscriber(CoreSubscriber<? super List<T>> actual) {
Expand Down Expand Up @@ -92,7 +92,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if(done) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
Expand All @@ -102,7 +102,7 @@ public void onError(Throwable t) {
l = list;
list = null;
}
Operators.onDiscardMultiple(l, actual.currentContext());
discard(l);
actual.onError(t);
}

Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.Assert;
Expand Down

0 comments on commit 3e6aea3

Please sign in to comment.