diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java index d176f43490..ac195cd724 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxFlattenIterable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,13 +16,15 @@ package reactor.core.publisher; -import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.Queue; +import java.util.Spliterator; import java.util.concurrent.Callable; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -81,12 +83,12 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act return null; } - Iterator it; + Spliterator sp; boolean knownToBeFinite; try { Iterable iter = mapper.apply(v); - it = iter.iterator(); - knownToBeFinite = FluxIterable.checkFinite(iter); + sp = iter.spliterator(); + knownToBeFinite = FluxIterable.checkFinite(sp); } catch (Throwable ex) { Context ctx = actual.currentContext(); @@ -102,7 +104,7 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act } // TODO return subscriber (tail-call optimization)? - FluxIterable.subscribe(actual, it, knownToBeFinite); + FluxIterable.subscribe(actual, sp, knownToBeFinite); return null; } return new FlattenIterableSubscriber<>(actual, @@ -118,7 +120,7 @@ public Object scanUnsafe(Attr key) { } static final class FlattenIterableSubscriber - implements InnerOperator, QueueSubscription { + implements InnerOperator, QueueSubscription, Consumer { final CoreSubscriber actual; @@ -159,9 +161,13 @@ static final class FlattenIterableSubscriber "error"); @Nullable - Iterator current; + Spliterator current; boolean currentKnownToBeFinite; + boolean valueReady = false; + + R nextElement; + int consumed; int fusionMode; @@ -267,6 +273,29 @@ public void onComplete() { drain(null); } + @Override + public void accept(R t) { + valueReady = true; + nextElement = t; + } + + boolean hasNext(Spliterator spliterator) { + if (!valueReady) + spliterator.tryAdvance(this); + return valueReady; + } + + R next(Spliterator spliterator) { + if (!valueReady && !hasNext(spliterator)) + throw new NoSuchElementException(); + else { + valueReady = false; + R t = nextElement; + nextElement = null; + return t; + } + } + @Override public void request(long n) { if (Operators.validate(n)) { @@ -285,6 +314,7 @@ public void cancel() { if (WIP.getAndIncrement(this) == 0) { Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(queue, context, null); + Operators.onDiscard(nextElement, context); Operators.onDiscardMultiple(current, currentKnownToBeFinite, context); } } @@ -301,12 +331,12 @@ void drainAsync() { final Queue q = queue; int missed = 1; - Iterator it = current; + Spliterator sp = current; boolean itFinite = currentKnownToBeFinite; for (; ; ) { - if (it == null) { + if (sp == null) { if (cancelled) { Operators.onDiscardQueueWithClear(q, actual.currentContext(), null); @@ -345,17 +375,17 @@ void drainAsync() { if (!empty) { Iterable iterable; - boolean b; + boolean isEmpty; try { iterable = mapper.apply(t); - it = iterable.iterator(); - itFinite = FluxIterable.checkFinite(iterable); + sp = iterable.spliterator(); + itFinite = FluxIterable.checkFinite(sp); - b = it.hasNext(); + isEmpty = itFinite && sp.estimateSize() == 0; } catch (Throwable exc) { - it = null; + sp = null; itFinite = false; //reset explicitly Context ctx = actual.currentContext(); Throwable e_ = Operators.onNextError(t, exc, ctx, s); @@ -366,8 +396,8 @@ void drainAsync() { continue; } - if (!b) { - it = null; + if (isEmpty) { + sp = null; itFinite = false; //reset explicitly int c = consumed + 1; if (c == limit) { @@ -382,7 +412,7 @@ void drainAsync() { } } - if (it != null) { + if (sp != null) { long r = requested; long e = 0L; @@ -391,7 +421,8 @@ void drainAsync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(q, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); return; } @@ -401,7 +432,8 @@ void drainAsync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(q, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); a.onError(ex); return; } @@ -409,7 +441,7 @@ void drainAsync() { R v; try { - v = Objects.requireNonNull(it.next(), + v = Objects.requireNonNull(next(sp), "iterator returned null"); } catch (Throwable exc) { @@ -424,7 +456,8 @@ void drainAsync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(q, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); return; } @@ -433,7 +466,7 @@ void drainAsync() { boolean b; try { - b = it.hasNext(); + b = hasNext(sp); } catch (Throwable exc) { onError(Operators.onOperatorError(s, exc, @@ -450,7 +483,7 @@ void drainAsync() { else { consumed = c; } - it = null; + sp = null; itFinite = false; resetCurrent(); break; @@ -462,7 +495,8 @@ void drainAsync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(q, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); return; } @@ -472,13 +506,14 @@ void drainAsync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(q, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); a.onError(ex); return; } boolean d = done; - boolean empty = q.isEmpty() && it == null; + boolean empty = q.isEmpty() && sp == null; if (d && empty) { resetCurrent(); @@ -493,12 +528,12 @@ void drainAsync() { } } - if (it == null) { + if (sp == null) { continue; } } - current = it; + current = sp; currentKnownToBeFinite = itFinite; missed = WIP.addAndGet(this, -missed); if (missed == 0) { @@ -511,11 +546,11 @@ void drainSync() { final Subscriber a = actual; int missed = 1; - Iterator it = current; + Spliterator sp = current; boolean itFinite = currentKnownToBeFinite; for (; ; ) { - if (it == null) { + if (sp == null) { if (cancelled) { Operators.onDiscardQueueWithClear(queue, actual.currentContext(), null); @@ -546,14 +581,14 @@ void drainSync() { if (!empty) { Iterable iterable; - boolean b; + boolean isEmpty; try { iterable = mapper.apply(t); - it = iterable.iterator(); - itFinite = FluxIterable.checkFinite(iterable); + sp = iterable.spliterator(); + itFinite = FluxIterable.checkFinite(sp); - b = it.hasNext(); + isEmpty = itFinite && sp.estimateSize() == 0; } catch (Throwable exc) { resetCurrent(); @@ -569,15 +604,15 @@ void drainSync() { continue; } - if (!b) { - it = null; + if (isEmpty) { + sp = null; itFinite = false; continue; } } } - if (it != null) { + if (sp != null) { long r = requested; long e = 0L; @@ -586,14 +621,15 @@ void drainSync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(queue, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); return; } R v; try { - v = Objects.requireNonNull(it.next(), "iterator returned null"); + v = Objects.requireNonNull(next(sp), "iterator returned null"); } catch (Throwable exc) { resetCurrent(); @@ -607,7 +643,8 @@ void drainSync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(queue, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); return; } @@ -616,7 +653,7 @@ void drainSync() { boolean b; try { - b = it.hasNext(); + b = hasNext(sp); } catch (Throwable exc) { resetCurrent(); @@ -625,7 +662,7 @@ void drainSync() { } if (!b) { - it = null; + sp = null; itFinite = false; resetCurrent(); break; @@ -637,12 +674,13 @@ void drainSync() { resetCurrent(); final Context context = actual.currentContext(); Operators.onDiscardQueueWithClear(queue, context, null); - Operators.onDiscardMultiple(it, itFinite, context); + Operators.onDiscard(nextElement, context); + Operators.onDiscardMultiple(sp, itFinite, context); return; } boolean d = done; - boolean empty = queue.isEmpty() && it == null; + boolean empty = queue.isEmpty() && sp == null; if (d && empty) { resetCurrent(); @@ -657,12 +695,12 @@ void drainSync() { } } - if (it == null) { + if (sp == null) { continue; } } - current = it; + current = sp; currentKnownToBeFinite = itFinite; missed = WIP.addAndGet(this, -missed); if (missed == 0) { @@ -690,6 +728,7 @@ void drain(@Nullable T dataSignal) { @Override public void clear() { final Context context = actual.currentContext(); + Operators.onDiscard(nextElement, context); Operators.onDiscardMultiple(current, currentKnownToBeFinite, context); resetCurrent(); Operators.onDiscardQueueWithClear(queue, context, null); @@ -697,9 +736,9 @@ public void clear() { @Override public boolean isEmpty() { - Iterator it = current; - if (it != null) { - return !it.hasNext(); + Spliterator sp = current; + if (sp != null) { + return !hasNext(sp); } return queue.isEmpty(); // estimate } @@ -707,10 +746,10 @@ public boolean isEmpty() { @Override @Nullable public R poll() { - Iterator it = current; + Spliterator sp = current; boolean itFinite; for (; ; ) { - if (it == null) { + if (sp == null) { T v = queue.poll(); if (v == null) { return null; @@ -719,28 +758,28 @@ public R poll() { Iterable iterable; try { iterable = mapper.apply(v); - it = iterable.iterator(); - itFinite = FluxIterable.checkFinite(iterable); + sp = iterable.spliterator(); + itFinite = FluxIterable.checkFinite(sp); } catch (Throwable error) { Operators.onDiscard(v, actual.currentContext()); throw error; } - if (!it.hasNext()) { + if (!hasNext(sp)) { continue; } - current = it; + current = sp; currentKnownToBeFinite = itFinite; } - else if (!it.hasNext()) { - it = null; + else if (!hasNext(sp)) { + sp = null; continue; } - R r = Objects.requireNonNull(it.next(), "iterator returned null"); + R r = Objects.requireNonNull(next(sp), "iterator returned null"); - if (!it.hasNext()) { + if (!hasNext(sp)) { resetCurrent(); } diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java b/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java index ebfd1050c6..49e714f21d 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxIterable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,11 @@ package reactor.core.publisher; import java.util.Collection; -import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.Spliterator; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.Consumer; import org.reactivestreams.Subscriber; @@ -31,8 +32,8 @@ import reactor.util.function.Tuple2; /** - * Emits the contents of an Iterable source. Attempt to discard remainder of a source - * in case of error / cancellation, but uses the {@link Spliterator} API to try and detect + * Emits the contents of an Iterable source via its {@link Spliterator} successor. Attempt to discard remainder of a source + * in case of error / cancellation, uses the {@link Spliterator#characteristics()} API to determine * infinite sources (so that said discarding doesn't loop infinitely). * * @param the value type @@ -46,15 +47,14 @@ final class FluxIterable extends Flux implements Fuseable, SourceProducer< * finite, which implies forEachRemaining type of iteration can be done to discard unemitted * values (in case of cancellation or error). *

- * A {@link Collection} is assumed to be finite, and for other iterables the {@link Spliterator} - * {@link Spliterator#SIZED} characteristic is looked for. + * The {@link Spliterator#SIZED} characteristic is looked for. * - * @param iterable the {@link Iterable} to check. - * @param - * @return true if the {@link Iterable} can confidently classified as finite, false if not finite/unsure + * @param spliterator the {@link Spliterator} to check. + * @param values type + * @return true if the {@link Spliterator} can confidently classified as finite, false if not finite/unsure */ - static boolean checkFinite(Iterable iterable) { - return iterable instanceof Collection || iterable.spliterator().hasCharacteristics(Spliterator.SIZED); + static boolean checkFinite(Spliterator spliterator) { + return spliterator.hasCharacteristics(Spliterator.SIZED); } final Iterable iterable; @@ -73,18 +73,18 @@ static boolean checkFinite(Iterable iterable) { @Override public void subscribe(CoreSubscriber actual) { boolean knownToBeFinite; - Iterator it; + Spliterator sp; try { - knownToBeFinite = FluxIterable.checkFinite(iterable); - it = iterable.iterator(); + sp = this.iterable.spliterator(); + knownToBeFinite = FluxIterable.checkFinite(sp); } catch (Throwable e) { Operators.error(actual, Operators.onOperatorError(e, actual.currentContext())); return; } - subscribe(actual, it, knownToBeFinite, onClose); + subscribe(actual, sp, knownToBeFinite, onClose); } @Override @@ -100,37 +100,37 @@ public Object scanUnsafe(Attr key) { } /** - * Common method to take an {@link Iterator} as a source of values. + * Common method to take an {@link Spliterator} as a source of values. * * @param s the subscriber to feed this iterator to - * @param it the {@link Iterator} to use as a predictable source of values + * @param sp the {@link Spliterator} to use as a predictable source of values */ - static void subscribe(CoreSubscriber s, Iterator it, boolean knownToBeFinite) { - subscribe(s, it, knownToBeFinite, null); + static void subscribe(CoreSubscriber s, Spliterator sp, boolean knownToBeFinite) { + subscribe(s, sp, knownToBeFinite, null); } /** - * Common method to take an {@link Iterator} as a source of values. + * Common method to take an {@link Spliterator} as a source of values. * * @param s the subscriber to feed this iterator to - * @param it the {@link Iterator} to use as a source of values + * @param sp the {@link Spliterator} to use as a source of values * @param onClose close handler to call once we're done with the iterator (provided it * is not null, this includes when the iteration errors or complete or the subscriber * is cancelled). Null to ignore. */ @SuppressWarnings("unchecked") - static void subscribe(CoreSubscriber s, Iterator it, + static void subscribe(CoreSubscriber s, Spliterator sp, boolean knownToBeFinite, @Nullable Runnable onClose) { //noinspection ConstantConditions - if (it == null) { + if (sp == null) { Operators.error(s, new NullPointerException("The iterator is null")); return; } - boolean b; + boolean isEmpty; try { - b = it.hasNext(); + isEmpty = knownToBeFinite && sp.estimateSize() == 0; } catch (Throwable e) { Operators.error(s, Operators.onOperatorError(e, s.currentContext())); @@ -144,7 +144,7 @@ static void subscribe(CoreSubscriber s, Iterator it, } return; } - if (!b) { + if (isEmpty) { Operators.complete(s); if (onClose != null) { try { @@ -159,21 +159,21 @@ static void subscribe(CoreSubscriber s, Iterator it, if (s instanceof ConditionalSubscriber) { s.onSubscribe(new IterableSubscriptionConditional<>((ConditionalSubscriber) s, - it, knownToBeFinite, onClose)); + sp, knownToBeFinite, onClose)); } else { - s.onSubscribe(new IterableSubscription<>(s, it, knownToBeFinite, onClose)); + s.onSubscribe(new IterableSubscription<>(s, sp, knownToBeFinite, onClose)); } } static final class IterableSubscription - implements InnerProducer, SynchronousSubscription { + implements InnerProducer, SynchronousSubscription, Consumer { final CoreSubscriber actual; - final Iterator iterator; - final boolean knownToBeFinite; - final Runnable onClose; + final Spliterator spliterator; + final boolean knownToBeFinite; + final Runnable onClose; volatile boolean cancelled; @@ -204,19 +204,47 @@ static final class IterableSubscription static final int STATE_CALL_HAS_NEXT = 3; T current; + + boolean valueReady = false; + + T nextElement; + Throwable hasNextFailure; IterableSubscription(CoreSubscriber actual, - Iterator iterator, boolean knownToBeFinite, @Nullable Runnable onClose) { + Spliterator spliterator, boolean knownToBeFinite, @Nullable Runnable onClose) { this.actual = actual; - this.iterator = iterator; + this.spliterator = spliterator; this.knownToBeFinite = knownToBeFinite; this.onClose = onClose; } IterableSubscription(CoreSubscriber actual, - Iterator iterator, boolean knownToBeFinite) { - this(actual, iterator, knownToBeFinite, null); + Spliterator spliterator, boolean knownToBeFinite) { + this(actual, spliterator, knownToBeFinite, null); + } + + @Override + public void accept(T t) { + valueReady = true; + nextElement = t; + } + + boolean hasNext() { + if (!valueReady) + spliterator.tryAdvance(this); + return valueReady; + } + + T next() { + if (!valueReady && !hasNext()) + throw new NoSuchElementException(); + else { + valueReady = false; + T t = nextElement; + nextElement = null; + return t; + } } @Override @@ -245,7 +273,6 @@ private void onCloseWithDropError() { } void slowPath(long n) { - final Iterator a = iterator; final Subscriber s = actual; long e = 0L; @@ -256,7 +283,7 @@ void slowPath(long n) { T t; try { - t = Objects.requireNonNull(a.next(), + t = Objects.requireNonNull(next(), "The iterator returned a null value"); } catch (Throwable ex) { @@ -278,7 +305,7 @@ void slowPath(long n) { boolean b; try { - b = a.hasNext(); + b = hasNext(); } catch (Throwable ex) { s.onError(ex); @@ -312,7 +339,6 @@ void slowPath(long n) { } void fastPath() { - final Iterator a = iterator; final Subscriber s = actual; for (; ; ) { @@ -324,7 +350,7 @@ void fastPath() { T t; try { - t = Objects.requireNonNull(a.next(), + t = Objects.requireNonNull(next(), "The iterator returned a null value"); } catch (Exception ex) { @@ -346,7 +372,7 @@ void fastPath() { boolean b; try { - b = a.hasNext(); + b = hasNext(); } catch (Exception ex) { s.onError(ex); @@ -370,7 +396,8 @@ void fastPath() { public void cancel() { onCloseWithDropError(); cancelled = true; - Operators.onDiscardMultiple(this.iterator, this.knownToBeFinite, actual.currentContext()); + Operators.onDiscard(nextElement, actual.currentContext()); + Operators.onDiscardMultiple(this.spliterator, this.knownToBeFinite, actual.currentContext()); } @Override @@ -391,7 +418,8 @@ public Object scanUnsafe(Attr key) { @Override public void clear() { - Operators.onDiscardMultiple(this.iterator, this.knownToBeFinite, actual.currentContext()); + Operators.onDiscard(nextElement, actual.currentContext()); + Operators.onDiscardMultiple(this.spliterator, this.knownToBeFinite, actual.currentContext()); state = STATE_NO_NEXT; } @@ -410,7 +438,7 @@ else if (s == STATE_HAS_NEXT_HAS_VALUE || s == STATE_HAS_NEXT_NO_VALUE) { else { boolean hasNext; try { - hasNext = iterator.hasNext(); + hasNext = hasNext(); } catch (Throwable t) { //this is a corner case, most Iterators are not expected to throw in hasNext. @@ -441,7 +469,7 @@ public T poll() { if (!isEmpty()) { T c; if (state == STATE_HAS_NEXT_NO_VALUE) { - c = iterator.next(); + c = next(); } else { c = current; @@ -468,11 +496,11 @@ public int size() { } static final class IterableSubscriptionConditional - implements InnerProducer, SynchronousSubscription { + implements InnerProducer, SynchronousSubscription, Consumer { final ConditionalSubscriber actual; - final Iterator iterator; + final Spliterator spliterator; final boolean knownToBeFinite; final Runnable onClose; @@ -506,19 +534,46 @@ static final class IterableSubscriptionConditional T current; + boolean valueReady = false; + + T nextElement; + Throwable hasNextFailure; IterableSubscriptionConditional(ConditionalSubscriber actual, - Iterator iterator, boolean knownToBeFinite, @Nullable Runnable onClose) { + Spliterator spliterator, boolean knownToBeFinite, @Nullable Runnable onClose) { this.actual = actual; - this.iterator = iterator; + this.spliterator = spliterator; this.knownToBeFinite = knownToBeFinite; this.onClose = onClose; } IterableSubscriptionConditional(ConditionalSubscriber actual, - Iterator iterator, boolean knownToBeFinite) { - this(actual, iterator, knownToBeFinite, null); + Spliterator spliterator, boolean knownToBeFinite) { + this(actual, spliterator, knownToBeFinite, null); + } + + @Override + public void accept(T t) { + valueReady = true; + nextElement = t; + } + + boolean hasNext() { + if (!valueReady) + spliterator.tryAdvance(this); + return valueReady; + } + + T next() { + if (!valueReady && !hasNext()) + throw new NoSuchElementException(); + else { + valueReady = false; + T t = nextElement; + nextElement = null; + return t; + } } @Override @@ -547,7 +602,6 @@ private void onCloseWithDropError() { } void slowPath(long n) { - final Iterator a = iterator; final ConditionalSubscriber s = actual; long e = 0L; @@ -558,7 +612,7 @@ void slowPath(long n) { T t; try { - t = Objects.requireNonNull(a.next(), + t = Objects.requireNonNull(next(), "The iterator returned a null value"); } catch (Throwable ex) { @@ -580,7 +634,7 @@ void slowPath(long n) { boolean b; try { - b = a.hasNext(); + b = hasNext(); } catch (Throwable ex) { s.onError(ex); @@ -616,7 +670,6 @@ void slowPath(long n) { } void fastPath() { - final Iterator a = iterator; final ConditionalSubscriber s = actual; for (; ; ) { @@ -628,7 +681,7 @@ void fastPath() { T t; try { - t = Objects.requireNonNull(a.next(), + t = Objects.requireNonNull(next(), "The iterator returned a null value"); } catch (Exception ex) { @@ -650,7 +703,7 @@ void fastPath() { boolean b; try { - b = a.hasNext(); + b = hasNext(); } catch (Exception ex) { s.onError(ex); @@ -674,7 +727,8 @@ void fastPath() { public void cancel() { onCloseWithDropError(); cancelled = true; - Operators.onDiscardMultiple(this.iterator, this.knownToBeFinite, actual.currentContext()); + Operators.onDiscard(this.nextElement, actual.currentContext()); + Operators.onDiscardMultiple(this.spliterator, this.knownToBeFinite, actual.currentContext()); } @Override @@ -695,7 +749,8 @@ public Object scanUnsafe(Attr key) { @Override public void clear() { - Operators.onDiscardMultiple(this.iterator, this.knownToBeFinite, actual.currentContext()); + Operators.onDiscard(this.nextElement, actual.currentContext()); + Operators.onDiscardMultiple(this.spliterator, this.knownToBeFinite, actual.currentContext()); state = STATE_NO_NEXT; } @@ -714,7 +769,7 @@ else if (s == STATE_HAS_NEXT_HAS_VALUE || s == STATE_HAS_NEXT_NO_VALUE) { else { boolean hasNext; try { - hasNext = iterator.hasNext(); + hasNext = hasNext(); } catch (Throwable t) { //this is a corner case, most Iterators are not expected to throw in hasNext. @@ -745,7 +800,7 @@ public T poll() { if (!isEmpty()) { T c; if (state == STATE_HAS_NEXT_NO_VALUE) { - c = iterator.next(); + c = next(); } else { c = current; diff --git a/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java b/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java index 58bf5620cf..2680bafcef 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java +++ b/reactor-core/src/main/java/reactor/core/publisher/FluxStream.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,13 +53,13 @@ public void subscribe(CoreSubscriber actual) { return; } - Iterator it; + Spliterator sp; boolean knownToBeFinite; try { Spliterator spliterator = Objects.requireNonNull(stream.spliterator(), "The stream returned a null Spliterator"); knownToBeFinite = spliterator.hasCharacteristics(Spliterator.SIZED); - it = Spliterators.iterator(spliterator); //this is the default for BaseStream#iterator() anyway + sp = spliterator; //this is the default for BaseStream#iterator() anyway } catch (Throwable e) { Operators.error(actual, Operators.onOperatorError(e, actual.currentContext())); @@ -68,7 +68,7 @@ public void subscribe(CoreSubscriber actual) { //although not required by AutoCloseable, Stream::close SHOULD be idempotent //(at least the default AbstractPipeline implementation is) - FluxIterable.subscribe(actual, it, knownToBeFinite, stream::close); + FluxIterable.subscribe(actual, sp, knownToBeFinite, stream::close); } @Override diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoFlattenIterable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoFlattenIterable.java index cc8afcef04..cdd2a6d5e8 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/MonoFlattenIterable.java +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoFlattenIterable.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,9 +16,9 @@ package reactor.core.publisher; -import java.util.Iterator; import java.util.Objects; import java.util.Queue; +import java.util.Spliterator; import java.util.concurrent.Callable; import java.util.function.Function; import java.util.function.Supplier; @@ -73,10 +73,10 @@ public CoreSubscriber subscribeOrReturn(CoreSubscriber act } Iterable iter = mapper.apply(v); - Iterator it = iter.iterator(); - boolean itFinite = FluxIterable.checkFinite(iter); + Spliterator sp = iter.spliterator(); + boolean itFinite = FluxIterable.checkFinite(sp); - FluxIterable.subscribe(actual, it, itFinite); + FluxIterable.subscribe(actual, sp, itFinite); return null; } diff --git a/reactor-core/src/main/java/reactor/core/publisher/Operators.java b/reactor-core/src/main/java/reactor/core/publisher/Operators.java index 5e18c9f3b5..61fefd6d03 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Operators.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Operators.java @@ -618,6 +618,43 @@ public static void onDiscardMultiple(@Nullable Iterator multiple, boolean kno } } + + /** + * Invoke a (local or global) hook that processes elements that remains in an {@link java.util.Spliterator}. + * Since spliterators can be infinite, this method requires that you explicitly ensure the spliterator is + * {@code knownToBeFinite}. Typically, one can get such a guarantee by looking at the {@link Spliterator#getExactSizeIfKnown()}. + * + * @param multiple the {@link Spliterator} whose remainder to discard + * @param knownToBeFinite is the caller guaranteeing that the iterator is finite and can be iterated over + * @param context the {@link Context} in which to look for local hook + * @see #onDiscard(Object, Context) + * @see #onDiscardMultiple(Collection, Context) + * @see #onDiscardQueueWithClear(Queue, Context, Function) + */ + public static void onDiscardMultiple(@Nullable Spliterator multiple, boolean knownToBeFinite, Context context) { + if (multiple == null) return; + if (!knownToBeFinite) return; + + Consumer hook = context.getOrDefault(Hooks.KEY_ON_DISCARD, null); + if (hook != null) { + try { + multiple.forEachRemaining(o -> { + if (o != null) { + try { + hook.accept(o); + } + catch (Throwable t) { + log.warn("Error while discarding element from an Spliterator, continuing with next element", t); + } + } + }); + } + catch (Throwable t) { + log.warn("Error while discarding Spliterator, stopping", t); + } + } + } + /** * An unexpected exception is about to be dropped. *

diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java index 6236735601..e0d745756b 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxFlattenIterableTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -23,10 +23,13 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import org.assertj.core.api.Assertions; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.reactivestreams.Subscription; import reactor.core.CoreSubscriber; @@ -139,6 +142,27 @@ protected List> scenarios_errorFromUpstreamFailure() { ); } + @Test + //https://github.com/reactor/reactor-core/issues/3295 + public void useIterableOncePerSubscriber() { + AtomicInteger calls = new AtomicInteger(); + + Flux.range(1, 5) + .concatMapIterable(v -> new Iterable() { + @NotNull + @Override + public Iterator iterator() { + calls.incrementAndGet(); + return Arrays.asList("hello " + v).iterator(); + } + }) + .as(StepVerifier::create) + .expectNext("hello 1", "hello 2", "hello 3", "hello 4", "hello 5") + .verifyComplete(); + + Assertions.assertThat(calls).hasValue(5); + } + @Test public void failPrefetch() { assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> { diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java index fbc9146ddb..38cd4fbd64 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxIterableTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,18 +17,15 @@ package reactor.core.publisher; import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Iterator; -import java.util.List; -import java.util.Spliterator; +import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.assertj.core.api.Assertions; import org.assertj.core.api.InstanceOfAssertFactories; +import org.jetbrains.annotations.NotNull; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; @@ -53,6 +50,25 @@ public class FluxIterableTest { final Iterable source = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + @Test + //https://github.com/reactor/reactor-core/issues/3295 + public void useIterableOncePerSubscriber() { + AtomicInteger calls = new AtomicInteger(); + Iterable strings = new Iterable() { + @NotNull + @Override + public Iterator iterator() { + calls.incrementAndGet(); + return Arrays.asList("hello").iterator(); + } + }; + StepVerifier.create(Flux.fromIterable(strings)) + .expectNext("hello") + .verifyComplete(); + + Assertions.assertThat(calls).hasValue(1); + } + @Test public void emptyIterable() { StepVerifier.create(Flux.never().zipWithIterable(new ArrayList<>())) @@ -223,7 +239,7 @@ public void scanOperator() { public void scanSubscription() { CoreSubscriber actual = new LambdaSubscriber<>(null, e -> {}, null, sub -> sub.request(100)); FluxIterable.IterableSubscription test = - new FluxIterable.IterableSubscription<>(actual, Collections.singleton("test").iterator(), true); + new FluxIterable.IterableSubscription<>(actual, Collections.singleton("test").spliterator(), true); assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); test.request(123); @@ -245,7 +261,7 @@ public void scanConditionalSubscription() { Fuseable.ConditionalSubscriber actual = Mockito.mock(MockUtils.TestScannableConditionalSubscriber.class); Mockito.when(actual.currentContext()).thenReturn(Context.empty()); FluxIterable.IterableSubscriptionConditional test = - new FluxIterable.IterableSubscriptionConditional<>(actual, Collections.singleton("test").iterator(), true); + new FluxIterable.IterableSubscriptionConditional<>(actual, Collections.singleton("test").spliterator(), true); assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); test.request(123); @@ -318,8 +334,9 @@ void smokeTestIterableConditionalSubscriptionWithInfiniteIterable() { @SuppressWarnings("unchecked") Fuseable.ConditionalSubscriber testSubscriber = Mockito.mock(Fuseable.ConditionalSubscriber.class); + Mockito.when(testSubscriber.currentContext()).thenReturn(discardingContext); - Iterator iterator = new Iterator() { + Spliterator iterator = Spliterators.spliteratorUnknownSize(new Iterator() { @Override public boolean hasNext() { //approximate infinite source with a large upper bound instead @@ -330,7 +347,8 @@ public boolean hasNext() { public Integer next() { return backingAtomic.incrementAndGet(); } - }; + }, 0); + FluxIterable.IterableSubscriptionConditional subscription = new FluxIterable.IterableSubscriptionConditional<>( testSubscriber,