Skip to content

Commit

Permalink
Document that windows (and groupBy groups) are unicast
Browse files Browse the repository at this point in the history
This commit clarifies that window operators produce inner Flux (windows)
that are unicast. This is also true of `GroupedFlux` produced by the
`groupBy` operator.

Fixes #2568.
  • Loading branch information
simonbasle committed Sep 10, 2021
1 parent 3d620dc commit 39f481a
Showing 1 changed file with 151 additions and 0 deletions.
151 changes: 151 additions & 0 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Original file line number Diff line number Diff line change
Expand Up @@ -5406,6 +5406,13 @@ public int getPrefetch() {
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a specific group more than once: groups are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key.
* @param <K> the key type extracted from each value of this sequence
*
Expand All @@ -5430,6 +5437,13 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a specific group more than once: groups are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @param keyMapper the key mapping {@link Function} that evaluates an incoming data and returns a key.
* @param prefetch the number of values to prefetch from the source
* @param <K> the key type extracted from each value of this sequence
Expand All @@ -5456,6 +5470,13 @@ public final <K> Flux<GroupedFlux<K, T>> groupBy(Function<? super T, ? extends K
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a specific group more than once: groups are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @param keyMapper the key mapping function that evaluates an incoming data and returns a key.
* @param valueMapper the value mapping function that evaluates which data to extract for re-routing.
* @param <K> the key type extracted from each value of this sequence
Expand Down Expand Up @@ -5485,6 +5506,13 @@ public final <K, V> Flux<GroupedFlux<K, V>> groupBy(Function<? super T, ? extend
* if the groups are not suitably consumed downstream (eg. due to a {@code flatMap}
* with a {@code maxConcurrency} parameter that is set too low).
*
* <p>
* Note that groups are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a specific group more than once: groups are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @param keyMapper the key mapping function that evaluates an incoming data and returns a key.
* @param valueMapper the value mapping function that evaluates which data to extract for re-routing.
* @param prefetch the number of values to prefetch from the source
Expand Down Expand Up @@ -9263,6 +9291,13 @@ public final <V> Flux<V> transformDeferred(Function<? super Flux<T>, ? extends P
* <p>
* <img class="marble" src="doc-files/marbles/windowWithMaxSize.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
Expand Down Expand Up @@ -9291,6 +9326,13 @@ public final Flux<Flux<T>> window(int maxSize) {
* <p>
* <img class="marble" src="doc-files/marbles/windowWithMaxSizeEqualsSkipSize.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard The overlapping variant DOES NOT discard elements, as they might be part of another still valid window.
* The exact window and dropping window variants bot discard elements they internally queued for backpressure
* upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
Expand All @@ -9315,6 +9357,13 @@ public final Flux<Flux<T>> window(int maxSize, int skip) {
* <p>
* <img class="marble" src="doc-files/marbles/windowWithBoundary.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
Expand All @@ -9335,6 +9384,13 @@ public final Flux<Flux<T>> window(Publisher<?> boundary) {
* <p>
* <img class="marble" src="doc-files/marbles/windowWithTimespan.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
Expand Down Expand Up @@ -9366,6 +9422,13 @@ public final Flux<Flux<T>> window(Duration windowingTimespan) {
* <p>
* <img class="marble" src="doc-files/marbles/windowWithTimespanEqualsOpenWindowEvery.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard The overlapping variant DOES NOT discard elements, as they might be part of another still valid window.
* The exact window and dropping window variants bot discard elements they internally queued for backpressure
* upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
Expand All @@ -9388,6 +9451,13 @@ public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindo
* <p>
* <img class="marble" src="doc-files/marbles/windowWithTimespan.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
Expand Down Expand Up @@ -9420,6 +9490,13 @@ public final Flux<Flux<T>> window(Duration windowingTimespan, Scheduler timer) {
* <p>
* <img class="marble" src="doc-files/marbles/windowWithTimespanEqualsOpenWindowEvery.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard The overlapping variant DOES NOT discard elements, as they might be part of another still valid window.
* The exact window and dropping window variants bot discard elements they internally queued for backpressure
* upon cancellation or error triggered by a data signal. The dropping window variant also discards elements in between windows.
Expand Down Expand Up @@ -9448,6 +9525,13 @@ public final Flux<Flux<T>> window(Duration windowingTimespan, Duration openWindo
* <p>
* <img class="marble" src="doc-files/marbles/windowTimeout.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
Expand All @@ -9470,6 +9554,13 @@ public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime) {
* <p>
* <img class="marble" src="doc-files/marbles/windowTimeout.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal.
*
Expand All @@ -9496,6 +9587,13 @@ public final Flux<Flux<T>> windowTimeout(int maxSize, Duration maxTime, Schedule
* <p>
* <img class="marble" src="doc-files/marbles/windowUntil.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
* it also discards the remaining elements that were bound for it until the main sequence completes
Expand Down Expand Up @@ -9526,6 +9624,13 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger) {
* <p>
* <img class="marble" src="doc-files/marbles/windowUntilWithCutBefore.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
* it also discards the remaining elements that were bound for it until the main sequence completes
Expand Down Expand Up @@ -9558,6 +9663,13 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cut
* <p>
* <img class="marble" src="doc-files/marbles/windowUntilWithCutBefore.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
* it also discards the remaining elements that were bound for it until the main sequence completes
Expand All @@ -9584,7 +9696,13 @@ public final Flux<Flux<T>> windowUntil(Predicate<T> boundaryTrigger, boolean cut
*
* <p>
* <img class="marble" src="doc-files/marbles/windowUntilChanged.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
Expand All @@ -9604,7 +9722,13 @@ public final <V> Flux<Flux<T>> windowUntilChanged() {
*
* <p>
* <img class="marble" src="doc-files/marbles/windowUntilChangedWithKeySelector.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
Expand All @@ -9626,7 +9750,13 @@ public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T, ? super V>
*
* <p>
* <img class="marble" src="doc-files/marbles/windowUntilChangedWithKeySelector.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal. Upon cancellation of the current window,
Expand Down Expand Up @@ -9656,6 +9786,13 @@ public final <V> Flux<Flux<T>> windowUntilChanged(Function<? super T, ? extends
* <p>
* <img class="marble" src="doc-files/marbles/windowWhile.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match
* the predicate). Upon cancellation of the current window, it also discards the remaining elements
Expand All @@ -9682,6 +9819,13 @@ public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate) {
* <p>
* <img class="marble" src="doc-files/marbles/windowWhile.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator discards elements it internally queued for backpressure
* upon cancellation or error triggered by a data signal, as well as the triggering element(s) (that doesn't match
* the predicate). Upon cancellation of the current window, it also discards the remaining elements
Expand Down Expand Up @@ -9714,6 +9858,13 @@ public final Flux<Flux<T>> windowWhile(Predicate<T> inclusionPredicate, int pref
* <p>
* <img class="marble" src="doc-files/marbles/windowWhen.svg" alt="">
*
* <p>
* Note that windows are a live view of part of the underlying source publisher,
* and as such their lifecycle is tied to that source. As a result, it is not possible
* to subscribe to a window more than once: they are unicast.
* This is most noticeable when trying to {@link #retry()} or {@link #repeat()} a window,
* as these operators are based on re-subscription.
*
* @reactor.discard This operator DOES NOT discard elements.
*
* @param bucketOpening a {@link Publisher} that opens a new window when it emits any item
Expand Down

0 comments on commit 39f481a

Please sign in to comment.