Skip to content

Commit

Permalink
Merge #2186 into 3.3.7.RELEASE
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Jun 16, 2020
2 parents 8de5bf3 + cf63b5e commit 123475a
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 40 deletions.
95 changes: 61 additions & 34 deletions reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java
Expand Up @@ -64,6 +64,8 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R

final BiConsumer<? super R, ? super T> action;

R container;

Subscription s;

boolean done;
Expand All @@ -73,35 +75,17 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R
R container) {
super(actual);
this.action = action;
this.value = container;
this.container = container;
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;

if (key == Attr.TERMINATED) return done;
return super.scanUnsafe(key);
}

@Override
public void cancel() {
super.cancel();
s.cancel();
}

@Override
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand All @@ -119,15 +103,22 @@ public void onNext(T t) {
Operators.onNextDropped(t, actual.currentContext());
return;
}

try {
action.accept(value, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
R c;
synchronized (this) {
c = container;
if (c != null) {
try {
action.accept(c, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
}
return;
}
}
Operators.onDiscard(t, actual.currentContext());
}

@Override
Expand All @@ -137,9 +128,12 @@ public void onError(Throwable t) {
return;
}
done = true;
R v = value;
discard(v);
value = null;
R c;
synchronized (this) {
c = container;
container = null;
}
discard(c);
actual.onError(t);
}

Expand All @@ -149,13 +143,46 @@ public void onComplete() {
return;
}
done = true;
complete(value);
R c;
synchronized (this) {
c = container;
container = null;
}
if (c != null) {
complete(c);
}
}

@Override
public void setValue(R value) {
// value is constant
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void cancel() {
int state;
R c;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state <= HAS_REQUEST_NO_VALUE) {
c = container;
value = null;
container = null;
}
else {
c = null;
}
}
if (c != null) {
s.cancel();
discard(c);
}
}
}
}
Expand Up @@ -42,10 +42,10 @@ public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super List<T

static final class MonoCollectListSubscriber<T> extends Operators.MonoSubscriber<T, List<T>> {

Subscription s;

List<T> list;

Subscription s;

boolean done;

MonoCollectListSubscriber(CoreSubscriber<? super List<T>> actual) {
Expand Down Expand Up @@ -92,7 +92,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if(done) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
Expand All @@ -102,7 +102,7 @@ public void onError(Throwable t) {
l = list;
list = null;
}
Operators.onDiscardMultiple(l, actual.currentContext());
discard(l);
actual.onError(t);
}

Expand Down
Expand Up @@ -30,13 +30,21 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.MonoCollect.CollectSubscriber;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;

public class MonoCollectTest {

static final Logger LOGGER = Loggers.getLogger(MonoCollectListTest.class);


@Test(expected = NullPointerException.class)
public void nullSource() {
new MonoCollect<>(null, () -> 1, (a, b) -> {
Expand All @@ -57,7 +65,7 @@ public void nullAction() {
public void normal() {
AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create();

Flux.range(1, 10).collect(ArrayList<Integer>::new, (a, b) -> a.add(b)).subscribe(ts);
Flux.range(1, 10).collect(ArrayList<Integer>::new, ArrayList::add).subscribe(ts);

ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.assertNoError()
Expand Down Expand Up @@ -126,7 +134,7 @@ public void actionThrows() {
@Test
public void scanSubscriber() {
CoreSubscriber<List<String>> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
MonoCollect.CollectSubscriber<String, List<String>> test = new MonoCollect.CollectSubscriber<>(
CollectSubscriber<String, List<String>> test = new CollectSubscriber<>(
actual, (l, v) -> l.add(v), new ArrayList<>());
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
Expand Down Expand Up @@ -270,4 +278,62 @@ public void discardWholeArrayOnCancel() {
assertThat((Object[]) discarded.get(0)).containsExactly(0L, 1L, null, null);
}

@Test
public void discardCancelNextRace() {
AtomicInteger doubleDiscardCounter = new AtomicInteger();
Context discardingContext = Operators.enableOnDiscard(null, o -> {
AtomicBoolean ab = (AtomicBoolean) o;
if (ab.getAndSet(true)) {
doubleDiscardCounter.incrementAndGet();
throw new RuntimeException("test");
}
});
for (int i = 0; i < 100_000; i++) {
AssertSubscriber<List<AtomicBoolean>> testSubscriber = new AssertSubscriber<>(discardingContext);
CollectSubscriber<AtomicBoolean, List<AtomicBoolean>> subscriber =
new CollectSubscriber<>(testSubscriber, List::add, new ArrayList<>());
subscriber.onSubscribe(Operators.emptySubscription());

AtomicBoolean extraneous = new AtomicBoolean(false);

RaceTestUtils.race(subscriber::cancel,
() -> subscriber.onNext(extraneous));

testSubscriber.assertNoValues();
if (!extraneous.get()) {
LOGGER.info(""+subscriber.container);
}
assertThat(extraneous).as("released " + i).isTrue();
}
LOGGER.info("discarded twice or more: {}", doubleDiscardCounter.get());
}

@Test
public void discardCancelCompleteRace() {
AtomicInteger doubleDiscardCounter = new AtomicInteger();
Context discardingContext = Operators.enableOnDiscard(null, o -> {
AtomicBoolean ab = (AtomicBoolean) o;
if (ab.getAndSet(true)) {
doubleDiscardCounter.incrementAndGet();
throw new RuntimeException("test");
}
});
for (int i = 0; i < 100_000; i++) {
AssertSubscriber<List<AtomicBoolean>> testSubscriber = new AssertSubscriber<>(discardingContext);
CollectSubscriber<AtomicBoolean, List<AtomicBoolean>> subscriber =
new CollectSubscriber<>(testSubscriber, List::add, new ArrayList<>());
subscriber.onSubscribe(Operators.emptySubscription());

AtomicBoolean resource = new AtomicBoolean(false);
subscriber.onNext(resource);

RaceTestUtils.race(subscriber::cancel, subscriber::onComplete);

if (testSubscriber.values().isEmpty()) {
assertThat(resource).as("not completed and released " + i).isTrue();
}
}
LOGGER.info("discarded twice or more: {}", doubleDiscardCounter.get());
}

}

0 comments on commit 123475a

Please sign in to comment.