Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #2186 Align MonoCollect and MonoCollectList #2193

Merged
merged 1 commit into from
Jun 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R

final BiConsumer<? super R, ? super T> action;

R container;

Subscription s;

boolean done;
Expand All @@ -81,35 +83,17 @@ static final class CollectSubscriber<T, R> extends Operators.MonoSubscriber<T, R
R container) {
super(actual);
this.action = action;
this.value = container;
this.container = container;
}

@Override
@Nullable
public Object scanUnsafe(Attr key) {
if (key == Attr.TERMINATED) return done;
if (key == Attr.PARENT) return s;

if (key == Attr.TERMINATED) return done;
return super.scanUnsafe(key);
}

@Override
public void cancel() {
super.cancel();
s.cancel();
}

@Override
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void onSubscribe(Subscription s) {
if (Operators.validate(this.s, s)) {
Expand All @@ -127,15 +111,22 @@ public void onNext(T t) {
Operators.onNextDropped(t, actual.currentContext());
return;
}

try {
action.accept(value, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
R c;
synchronized (this) {
c = container;
if (c != null) {
try {
action.accept(c, t);
}
catch (Throwable e) {
Context ctx = actual.currentContext();
Operators.onDiscard(t, ctx);
onError(Operators.onOperatorError(this, e, t, ctx));
}
return;
}
}
Operators.onDiscard(t, actual.currentContext());
}

@Override
Expand All @@ -145,9 +136,12 @@ public void onError(Throwable t) {
return;
}
done = true;
R v = value;
discard(v);
value = null;
R c;
synchronized (this) {
c = container;
container = null;
}
discard(c);
actual.onError(t);
}

Expand All @@ -157,13 +151,46 @@ public void onComplete() {
return;
}
done = true;
complete(value);
R c;
synchronized (this) {
c = container;
container = null;
}
if (c != null) {
complete(c);
}
}

@Override
public void setValue(R value) {
// value is constant
protected void discard(R v) {
if (v instanceof Collection) {
Collection<?> c = (Collection<?>) v;
Operators.onDiscardMultiple(c, actual.currentContext());
}
else {
super.discard(v);
}
}

@Override
public void cancel() {
int state;
R c;
synchronized (this) {
state = STATE.getAndSet(this, CANCELLED);
if (state <= HAS_REQUEST_NO_VALUE) {
c = container;
value = null;
container = null;
}
else {
c = null;
}
}
if (c != null) {
s.cancel();
discard(c);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public void subscribe(CoreSubscriber<? super List<T>> actual) {

static final class MonoCollectListSubscriber<T> extends Operators.MonoSubscriber<T, List<T>> {

Subscription s;

List<T> list;

Subscription s;

boolean done;

MonoCollectListSubscriber(CoreSubscriber<? super List<T>> actual) {
Expand Down Expand Up @@ -92,7 +92,7 @@ public void onNext(T t) {

@Override
public void onError(Throwable t) {
if(done) {
if (done) {
Operators.onErrorDropped(t, actual.currentContext());
return;
}
Expand All @@ -102,7 +102,7 @@ public void onError(Throwable t) {
l = list;
list = null;
}
Operators.onDiscardMultiple(l, actual.currentContext());
discard(l);
actual.onError(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@
import reactor.core.CoreSubscriber;
import reactor.core.Fuseable;
import reactor.core.Scannable;
import reactor.core.publisher.MonoCollect.CollectSubscriber;
import reactor.test.StepVerifier;
import reactor.test.subscriber.AssertSubscriber;
import reactor.test.util.RaceTestUtils;
import reactor.util.Logger;
import reactor.util.Loggers;
import reactor.util.context.Context;

import static org.assertj.core.api.Assertions.assertThat;

public class MonoCollectTest {

static final Logger LOGGER = Loggers.getLogger(MonoCollectListTest.class);


@Test(expected = NullPointerException.class)
public void nullSource() {
new MonoCollect<>(null, () -> 1, (a, b) -> {
Expand All @@ -56,7 +64,7 @@ public void nullAction() {
public void normal() {
AssertSubscriber<ArrayList<Integer>> ts = AssertSubscriber.create();

Flux.range(1, 10).collect(ArrayList<Integer>::new, (a, b) -> a.add(b)).subscribe(ts);
Flux.range(1, 10).collect(ArrayList<Integer>::new, ArrayList::add).subscribe(ts);

ts.assertValues(new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)))
.assertNoError()
Expand Down Expand Up @@ -125,7 +133,7 @@ public void actionThrows() {
@Test
public void scanSubscriber() {
CoreSubscriber<List<String>> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null);
MonoCollect.CollectSubscriber<String, List<String>> test = new MonoCollect.CollectSubscriber<>(
CollectSubscriber<String, List<String>> test = new CollectSubscriber<>(
actual, (l, v) -> l.add(v), new ArrayList<>());
Subscription parent = Operators.emptySubscription();
test.onSubscribe(parent);
Expand Down Expand Up @@ -269,4 +277,62 @@ public void discardWholeArrayOnCancel() {
assertThat((Object[]) discarded.get(0)).containsExactly(0L, 1L, null, null);
}

@Test
public void discardCancelNextRace() {
AtomicInteger doubleDiscardCounter = new AtomicInteger();
Context discardingContext = Operators.enableOnDiscard(null, o -> {
AtomicBoolean ab = (AtomicBoolean) o;
if (ab.getAndSet(true)) {
doubleDiscardCounter.incrementAndGet();
throw new RuntimeException("test");
}
});
for (int i = 0; i < 100_000; i++) {
AssertSubscriber<List<AtomicBoolean>> testSubscriber = new AssertSubscriber<>(discardingContext);
CollectSubscriber<AtomicBoolean, List<AtomicBoolean>> subscriber =
new CollectSubscriber<>(testSubscriber, List::add, new ArrayList<>());
subscriber.onSubscribe(Operators.emptySubscription());

AtomicBoolean extraneous = new AtomicBoolean(false);

RaceTestUtils.race(subscriber::cancel,
() -> subscriber.onNext(extraneous));

testSubscriber.assertNoValues();
if (!extraneous.get()) {
LOGGER.info(""+subscriber.container);
}
assertThat(extraneous).as("released " + i).isTrue();
}
LOGGER.info("discarded twice or more: {}", doubleDiscardCounter.get());
}

@Test
public void discardCancelCompleteRace() {
AtomicInteger doubleDiscardCounter = new AtomicInteger();
Context discardingContext = Operators.enableOnDiscard(null, o -> {
AtomicBoolean ab = (AtomicBoolean) o;
if (ab.getAndSet(true)) {
doubleDiscardCounter.incrementAndGet();
throw new RuntimeException("test");
}
});
for (int i = 0; i < 100_000; i++) {
AssertSubscriber<List<AtomicBoolean>> testSubscriber = new AssertSubscriber<>(discardingContext);
CollectSubscriber<AtomicBoolean, List<AtomicBoolean>> subscriber =
new CollectSubscriber<>(testSubscriber, List::add, new ArrayList<>());
subscriber.onSubscribe(Operators.emptySubscription());

AtomicBoolean resource = new AtomicBoolean(false);
subscriber.onNext(resource);

RaceTestUtils.race(subscriber::cancel, subscriber::onComplete);

if (testSubscriber.values().isEmpty()) {
assertThat(resource).as("not completed and released " + i).isTrue();
}
}
LOGGER.info("discarded twice or more: {}", doubleDiscardCounter.get());
}

}