From 3e6aea346d7592b79a4516a14f73819f4f75b135 Mon Sep 17 00:00:00 2001 From: Rossen Stoyanchev Date: Tue, 16 Jun 2020 07:36:41 +0100 Subject: [PATCH] Align MonoCollect with MonoList This change aligns the implementation of MonoCollect to be comparable to MonoList. Effectively they're the same, a MonoList is but a MonoCollect with an ArrayList as the container. Fix #2186 --- .../reactor/core/publisher/MonoCollect.java | 95 ++++++++++++------- .../core/publisher/MonoCollectList.java | 8 +- .../core/publisher/MonoCollectTest.java | 1 - 3 files changed, 65 insertions(+), 39 deletions(-) diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java index b66d7d8380..f8439f1cc2 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCollect.java @@ -72,6 +72,8 @@ static final class CollectSubscriber extends Operators.MonoSubscriber action; + R container; + Subscription s; boolean done; @@ -81,35 +83,17 @@ static final class CollectSubscriber extends Operators.MonoSubscriber c = (Collection) v; - Operators.onDiscardMultiple(c, actual.currentContext()); - } - else { - super.discard(v); - } - } - @Override public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { @@ -127,15 +111,22 @@ public void onNext(T t) { Operators.onNextDropped(t, actual.currentContext()); return; } - - try { - action.accept(value, t); - } - catch (Throwable e) { - Context ctx = actual.currentContext(); - Operators.onDiscard(t, ctx); - onError(Operators.onOperatorError(this, e, t, ctx)); + R c; + synchronized (this) { + c = container; + if (c != null) { + try { + action.accept(c, t); + } + catch (Throwable e) { + Context ctx = actual.currentContext(); + Operators.onDiscard(t, ctx); + onError(Operators.onOperatorError(this, e, t, ctx)); + } + return; + } } + Operators.onDiscard(t, actual.currentContext()); } @Override @@ -145,9 +136,12 @@ public void onError(Throwable t) { return; } done = true; - R v = value; - discard(v); - value = null; + R c; + synchronized (this) { + c = container; + container = null; + } + discard(c); actual.onError(t); } @@ -157,13 +151,46 @@ public void onComplete() { return; } done = true; - complete(value); + R c; + synchronized (this) { + c = container; + container = null; + } + if (c != null) { + complete(c); + } } @Override - public void setValue(R value) { - // value is constant + protected void discard(R v) { + if (v instanceof Collection) { + Collection c = (Collection) v; + Operators.onDiscardMultiple(c, actual.currentContext()); + } + else { + super.discard(v); + } } + @Override + public void cancel() { + int state; + R c; + synchronized (this) { + state = STATE.getAndSet(this, CANCELLED); + if (state <= HAS_REQUEST_NO_VALUE) { + c = container; + value = null; + container = null; + } + else { + c = null; + } + } + if (c != null) { + s.cancel(); + discard(c); + } + } } } diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoCollectList.java b/reactor-core/src/main/java/reactor/core/publisher/MonoCollectList.java index b02b253cff..70dff4a8ef 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoCollectList.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoCollectList.java @@ -42,10 +42,10 @@ public void subscribe(CoreSubscriber> actual) { static final class MonoCollectListSubscriber extends Operators.MonoSubscriber> { - Subscription s; - List list; + Subscription s; + boolean done; MonoCollectListSubscriber(CoreSubscriber> actual) { @@ -92,7 +92,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if(done) { + if (done) { Operators.onErrorDropped(t, actual.currentContext()); return; } @@ -102,7 +102,7 @@ public void onError(Throwable t) { l = list; list = null; } - Operators.onDiscardMultiple(l, actual.currentContext()); + discard(l); actual.onError(t); } diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java index 2327a44279..4bd6698b50 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoCollectTest.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.Assert;