Skip to content

Commit

Permalink
Align MonoCollect with MonoList
Browse files Browse the repository at this point in the history
This change aligns the implementation of MonoCollect to be comparable
to MonoList. Effectively they're the same, a MonoList is but a
MonoCollect with an ArrayList as the container.

Fix #2186
  • Loading branch information
rstoyanchev committed Jun 16, 2020
1 parent f6fcae1 commit 85feec9
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 40 deletions.
95 changes: 61 additions & 34 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R

final BiConsumer<? super R, ? super T> action;

R container;

Subscription s;

boolean done;
Expand All @@ -81,35 +83,17 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R
R container) {
super(actual);
this.action = action;
this.value = container;
this.container = container;
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;

if (key == Attr.TERMINATED) return done;
return super.scanUnsafe(key);
}

@Override
public void cancel() {
super.cancel();
s.cancel();
}

@Override
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand All @@ -127,15 +111,22 @@ public void onNext(T t) {
Operators.onNextDropped(t, actual.currentContext());
return;
}

try {
action.accept(value, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
R c;
synchronized (this) {
c = container;
if (c != null) {
try {
action.accept(c, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
}
return;
}
}
Operators.onDiscard(t, actual.currentContext());
}

@Override
Expand All @@ -145,9 +136,12 @@ public void onError(Throwable t) {
return;
}
done = true;
R v = value;
discard(v);
value = null;
R c;
synchronized (this) {
c = container;
container = null;
}
discard(c);
actual.onError(t);
}

Expand All @@ -157,13 +151,46 @@ public void onComplete() {
return;
}
done = true;
complete(value);
R c;
synchronized (this) {
c = container;
container = null;
}
if (c != null) {
complete(c);
}
}

@Override
public void setValue(R value) {
// value is constant
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void cancel() {
int state;
R c;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state <= HAS_REQUEST_NO_VALUE) {
c = container;
value = null;
container = null;
}
else {
c = null;
}
}
if (c != null) {
s.cancel();
discard(c);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public void subscribe(CoreSubscriber<? super List<T>> actual) {

static final class MonoCollectListSubscriber<T> extends Operators.MonoSubscriber<T, List<T>> {

Subscription s;

List<T> list;

Subscription s;

boolean done;

MonoCollectListSubscriber(CoreSubscriber<? super List<T>> actual) {
Expand Down Expand Up @@ -92,7 +92,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if(done) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
Expand All @@ -102,7 +102,7 @@ public void onError(Throwable t) {
l = list;
list = null;
}
Operators.onDiscardMultiple(l, actual.currentContext());
discard(l);
actual.onError(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.MonoCollect.CollectSubscriber;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;

public class MonoCollectTest {

static final Logger LOGGER = Loggers.getLogger(MonoCollectListTest.class);


@Test(expected = NullPointerException.class)
public void nullSource() {
new MonoCollect<>(null, () -> 1, (a, b) -> {
Expand All @@ -56,7 +64,7 @@ public void nullAction() {
public void normal() {
AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create();

Flux.range(1, 10).collect(ArrayList<Integer>::new, (a, b) -> a.add(b)).subscribe(ts);
Flux.range(1, 10).collect(ArrayList<Integer>::new, ArrayList::add).subscribe(ts);

ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.assertNoError()
Expand Down Expand Up @@ -125,7 +133,7 @@ public void actionThrows() {
@Test
public void scanSubscriber() {
CoreSubscriber<List<String>> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
MonoCollect.CollectSubscriber<String, List<String>> test = new MonoCollect.CollectSubscriber<>(
CollectSubscriber<String, List<String>> test = new CollectSubscriber<>(
actual, (l, v) -> l.add(v), new ArrayList<>());
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
Expand Down Expand Up @@ -269,4 +277,62 @@ public void discardWholeArrayOnCancel() {
assertThat((Object[]) discarded.get(0)).containsExactly(0L, 1L, null, null);
}

@Test
public void discardCancelNextRace() {
AtomicInteger doubleDiscardCounter = new AtomicInteger();
Context discardingContext = Operators.enableOnDiscard(null, o -> {
AtomicBoolean ab = (AtomicBoolean) o;
if (ab.getAndSet(true)) {
doubleDiscardCounter.incrementAndGet();
throw new RuntimeException("test");
}
});
for (int i = 0; i < 100_000; i++) {
AssertSubscriber<List<AtomicBoolean>> testSubscriber = new AssertSubscriber<>(discardingContext);
CollectSubscriber<AtomicBoolean, List<AtomicBoolean>> subscriber =
new CollectSubscriber<>(testSubscriber, List::add, new ArrayList<>());
subscriber.onSubscribe(Operators.emptySubscription());

AtomicBoolean extraneous = new AtomicBoolean(false);

RaceTestUtils.race(subscriber::cancel,
() -> subscriber.onNext(extraneous));

testSubscriber.assertNoValues();
if (!extraneous.get()) {
LOGGER.info(""+subscriber.container);
}
assertThat(extraneous).as("released " + i).isTrue();
}
LOGGER.info("discarded twice or more: {}", doubleDiscardCounter.get());
}

@Test
public void discardCancelCompleteRace() {
AtomicInteger doubleDiscardCounter = new AtomicInteger();
Context discardingContext = Operators.enableOnDiscard(null, o -> {
AtomicBoolean ab = (AtomicBoolean) o;
if (ab.getAndSet(true)) {
doubleDiscardCounter.incrementAndGet();
throw new RuntimeException("test");
}
});
for (int i = 0; i < 100_000; i++) {
AssertSubscriber<List<AtomicBoolean>> testSubscriber = new AssertSubscriber<>(discardingContext);
CollectSubscriber<AtomicBoolean, List<AtomicBoolean>> subscriber =
new CollectSubscriber<>(testSubscriber, List::add, new ArrayList<>());
subscriber.onSubscribe(Operators.emptySubscription());

AtomicBoolean resource = new AtomicBoolean(false);
subscriber.onNext(resource);

RaceTestUtils.race(subscriber::cancel, subscriber::onComplete);

if (testSubscriber.values().isEmpty()) {
assertThat(resource).as("not completed and released " + i).isTrue();
}
}
LOGGER.info("discarded twice or more: {}", doubleDiscardCounter.get());
}

}

0 comments on commit 85feec9

Please sign in to comment.