From 518d204e82efa2f83dbe0d8270163a1c2b00e92b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 19 Mar 2020 14:04:03 +0100 Subject: [PATCH 1/7] Rework of #1979 to avoid compiler ambiguity, polish docs --- docs/asciidoc/apdx-operatorChoice.adoc | 2 +- docs/asciidoc/coreFeatures.adoc | 20 ++-- docs/asciidoc/faq.adoc | 2 +- docs/asciidoc/snippetRetryWhenRetry.adoc | 6 +- .../java/reactor/core/publisher/Flux.java | 14 ++- .../java/reactor/core/publisher/Mono.java | 13 ++- .../main/java/reactor/util/retry/Retry.java | 91 +++++++++++++++--- .../reactor/util/retry/RetryBackoffSpec.java | 15 ++- .../java/reactor/util/retry/RetrySpec.java | 14 +-- .../core/publisher/FluxRetryWhenTest.java | 94 +++++++++++++++---- .../test/java/reactor/guide/GuideTests.java | 9 +- .../reactor/util/retry/RetrySpecTest.java | 24 ----- 12 files changed, 201 insertions(+), 103 deletions(-) diff --git a/docs/asciidoc/apdx-operatorChoice.adoc b/docs/asciidoc/apdx-operatorChoice.adoc index 23176066fc..8319f93075 100644 --- a/docs/asciidoc/apdx-operatorChoice.adoc +++ b/docs/asciidoc/apdx-operatorChoice.adoc @@ -230,7 +230,7 @@ I want to deal with: ** by retrying... *** ...with a simple policy (max number of attempts): `retry()`, `retry(long)` *** ...triggered by a companion control Flux: `retryWhen` -*** ...using a standard backoff strategy (exponential backoff with jitter): `retryWhen(Retry.backoff(...))` +*** ...using a standard backoff strategy (exponential backoff with jitter): `retryWhen(Retry.backoff(...))` (see also other factory methods in `Retry`) * I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)... ** by throwing a special `IllegalStateException`: `Flux#onBackpressureError` diff --git a/docs/asciidoc/coreFeatures.adoc b/docs/asciidoc/coreFeatures.adoc index 814a5da84b..27f758b5f5 100644 --- a/docs/asciidoc/coreFeatures.adoc +++ b/docs/asciidoc/coreFeatures.adoc @@ -881,11 +881,15 @@ condition. The companion `Flux` is a `Flux` that gets passed to a `Retry` strategy/function, supplied as the sole parameter of `retryWhen`. As the user, you define that function and make it return a new -`Publisher`. Retry cycles go as follows: +`Publisher`. The `Retry` class is an abstract class, but it offers a factory method if you +want to transform the companion with a simple lambda (`Retry.fromFunction(Function)`). -. Each time an error happens (giving potential for a retry), the error is emitted into the +Retry cycles go as follows: + +. Each time an error happens (giving potential for a retry), a `RetrySignal` is emitted into the companion `Flux`, which has been decorated by your function. Having a `Flux` here -gives a bird eye's view of all the attempts so far. +gives a bird eye's view of all the attempts so far. The `RetrySignal` gives access to the error +as well as metadata around it. . If the companion `Flux` emits a value, a retry happens. . If the companion `Flux` completes, the error is swallowed, the retry cycle stops, and the resulting sequence completes, too. @@ -902,12 +906,12 @@ companion would effectively swallow an error. Consider the following way of emul Flux flux = Flux .error(new IllegalArgumentException()) // <1> .doOnError(System.out::println) // <2> - .retryWhen(() -> // <3> - companion -> companion.take(3)); // <4> + .retryWhen(Retry.fromFunction(companion -> // <3> + companion.take(3))); // <4> ---- <1> This continuously produces errors, calling for retry attempts. <2> `doOnError` before the retry lets us log and see all failures. -<3> The `Retry` function is passed as a `Supplier` +<3> The `Retry` is adapted from a very simple `Function` lambda <4> Here, we consider the first three errors as retry-able (`take(3)`) and then give up. ==== @@ -919,12 +923,12 @@ Getting to the same behavior involves a few additional tricks: include::snippetRetryWhenRetry.adoc[] TIP: One can use the builders exposed in `Retry` to achieve the same in a more fluent manner, as -well as more finely tuned retry strategies: `errorFlux.retryWhen(Retry.max(3));`. +well as more finely tuned retry strategies. For example: `errorFlux.retryWhen(Retry.max(3));`. TIP: You can use similar code to implement an "`exponential backoff and retry`" pattern, as shown in the <>. -The core-provided `Retry` builders, `RetrySpec` and `RetryBackoffSpec`, both allow advanced customizations like: +The core-provided `Retry` helpers, `RetrySpec` and `RetryBackoffSpec`, both allow advanced customizations like: - setting the `filter(Predicate)` for the exceptions that can trigger a retry - modifying such a previously set filter through `modifyErrorFilter(Function)` diff --git a/docs/asciidoc/faq.adoc b/docs/asciidoc/faq.adoc index c5c931d4f8..cf7aa13588 100644 --- a/docs/asciidoc/faq.adoc +++ b/docs/asciidoc/faq.adoc @@ -176,7 +176,7 @@ an unstable state and is not likely to immediately recover from it. So blindly retrying immediately is likely to produce yet another error and add to the instability. -Since `3.3.4.RELEASE`, Reactor comes with a builder for such a retry baked in: `Retry.backoff`. +Since `3.3.4.RELEASE`, Reactor comes with a builder for such a retry, to be used with `Flux#retryWhen`: `Retry.backoff`. The following example showcases a simple use of the builder, with hooks logging message right before and after the retry attempt delays. diff --git a/docs/asciidoc/snippetRetryWhenRetry.adoc b/docs/asciidoc/snippetRetryWhenRetry.adoc index e1260b8d03..6c4ec7e70b 100644 --- a/docs/asciidoc/snippetRetryWhenRetry.adoc +++ b/docs/asciidoc/snippetRetryWhenRetry.adoc @@ -5,14 +5,14 @@ AtomicInteger errorCount = new AtomicInteger(); Flux flux = Flux.error(new IllegalArgumentException()) .doOnError(e -> errorCount.incrementAndGet()) - .retryWhen(() -> companion -> // <1> + .retryWhen(Retry.fromFunction(companion -> // <1> companion.map(rs -> { // <2> if (rs.totalRetries() < 3) return rs.totalRetries(); // <3> else throw Exceptions.propagate(rs.failure()); // <4> }) - ); + )); ---- -<1> `retryWhen` expects a `Supplier` +<1> We customize `Retry` by adapting from a `Function` lambda rather than providing a concrete class <2> The companion emits `RetrySignal` objects, which bear number of retries so far and last failure <3> To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index). <4> In order to terminate the sequence in error, we throw the original exception after diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 5e58238573..9199c34856 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7240,14 +7240,18 @@ public final Flux retry(long numRetries, Predicate retryMa * * @return a {@link Flux} that retries on onError when the companion {@link Publisher} produces an * onNext signal - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4 + * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make + * use of the error can simply be converted by wrapping via {@link Retry#fromFunction(Function)}. + * Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal} + * emitted by the companion to its {@link Retry.RetrySignal#failure()}. */ @Deprecated public final Flux retryWhen(Function, ? extends Publisher> whenFactory) { Objects.requireNonNull(whenFactory, "whenFactory"); - return onAssembly(new FluxRetryWhen<>(this, fluxRetryWhenState -> fluxRetryWhenState + return onAssembly(new FluxRetryWhen<>(this, Retry.fromFunction(fluxRetryWhenState -> fluxRetryWhenState .map(Retry.RetrySignal::failure) - .as(whenFactory))); + .as(whenFactory))) + ); } /** @@ -7276,7 +7280,7 @@ public final Flux retryWhen(Function, ? extends Publisher> * the previous Context: *
 	 * {@code
-	 * Retry customStrategy = companion -> companion.handle((retrySignal, sink) -> {
+	 * Retry customStrategy = Retry.fromFunction(companion -> companion.handle((retrySignal, sink) -> {
 	 * 	    Context ctx = sink.currentContext();
 	 * 	    int rl = ctx.getOrDefault("retriesLeft", 0);
 	 * 	    if (rl > 0) {
@@ -7287,7 +7291,7 @@ public final Flux retryWhen(Function, ? extends Publisher>
 	 * 	    } else {
 	 * 	        sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
 	 * 	    }
-	 * });
+	 * }));
 	 * Flux retried = originalFlux.retryWhen(customStrategy);
 	 * }
*
diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 6d0b9ed3af..190b339676 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -3716,13 +3716,16 @@ public final Mono retry(long numRetries, Predicate retryMa * * @return a {@link Mono} that retries on onError when the companion {@link Publisher} produces an * onNext signal - * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4 + * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make + * use of the error can simply be converted by wrapping via {@link Retry#fromFunction(Function)}. + * Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal} + * emitted by the companion to its {@link Retry.RetrySignal#failure()}. */ @Deprecated public final Mono retryWhen(Function, ? extends Publisher> whenFactory) { Objects.requireNonNull(whenFactory, "whenFactory"); - return onAssembly(new MonoRetryWhen<>(this, (Flux rws) -> whenFactory.apply(rws.map( - Retry.RetrySignal::failure)))); + return onAssembly(new MonoRetryWhen<>(this, Retry.fromFunction(rws -> whenFactory.apply(rws.map( + Retry.RetrySignal::failure))))); } /** @@ -3752,7 +3755,7 @@ public final Mono retryWhen(Function, ? extends Publisher> *
*
 	 * {@code
-	 * Retry customStrategy = companion -> companion.handle((retrySignal, sink) -> {
+	 * Retry customStrategy = Retry.fromFunction(companion -> companion.handle((retrySignal, sink) -> {
 	 * 	    Context ctx = sink.currentContext();
 	 * 	    int rl = ctx.getOrDefault("retriesLeft", 0);
 	 * 	    if (rl > 0) {
@@ -3763,7 +3766,7 @@ public final Mono retryWhen(Function, ? extends Publisher>
 	 * 	    } else {
 	 * 	        sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
 	 * 	    }
-	 * });
+	 * }));
 	 * Mono retried = originalMono.retryWhen(customStrategy);
 	 * }
*
diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index 0cdd3f2ead..b770ff3df1 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -17,6 +17,7 @@ package reactor.util.retry; import java.time.Duration; +import java.util.function.Function; import org.reactivestreams.Publisher; @@ -26,13 +27,25 @@ import static reactor.util.retry.RetrySpec.*; /** - * Functional interface to configure retries depending on a companion {@link Flux} of {@link RetrySignal}, - * as well as builders for such {@link Flux#retryWhen(Retry)} retries} companions. + * Base abstract class for a strategy to decide when to retry given a companion {@link Flux} of {@link RetrySignal}, + * for use with {@link Flux#retryWhen(Retry)} and {@link reactor.core.publisher.Mono#retryWhen(Retry)}. + * Also provides access to configurable built-in strategies via static factory methods: + *
    + *
  • {@link #indefinitely()}
  • + *
  • {@link #max(long)}
  • + *
  • {@link #maxInARow(long)}
  • + *
  • {@link #fixedDelays(long, Duration)}
  • + *
  • {@link #backoff(long, Duration)}
  • + *
+ *

+ * Users are encouraged to provide either concrete custom {@link Retry} strategies or builders that produce + * such concrete {@link Retry}. The {@link RetrySpec} returned by eg. {@link #max(long)} is a good inspiration + * for a fluent approach that generates a {@link Retry} at each step and uses immutability/copy-on-write to enable + * sharing of intermediate steps (that can thus be considered templates). * * @author Simon Baslé */ -@FunctionalInterface -public interface Retry { +public abstract class Retry { /** * The intent of the functional {@link Retry} class is to let users configure how to react to {@link RetrySignal} @@ -41,11 +54,11 @@ public interface Retry { * the attempt is delayed as well. This method generates the companion, out of a {@link Flux} of {@link RetrySignal}, * which itself can serve as the simplest form of retry companion (indefinitely and immediately retry on any error). * - * @param retrySignalCompanion the original {@link Flux} of {@link RetrySignal}, notifying of each source error that - * _might_ result in a retry attempt, with context around the error and current retry cycle. + * @param retrySignals the original {@link Flux} of {@link RetrySignal}, notifying of each source error that + * might result in a retry attempt, with context around the error and current retry cycle. * @return the actual companion to use, which might delay or limit retry attempts */ - Publisher generateCompanion(Flux retrySignalCompanion); + public abstract Publisher generateCompanion(Flux retrySignals); /** * State for a {@link Flux#retryWhen(Retry)} Flux retry} or {@link reactor.core.publisher.Mono#retryWhen(Retry) Mono retry}. @@ -53,7 +66,7 @@ public interface Retry { * a retry as well as two indexes: the number of errors that happened so far (and were retried) and the same number, * but only taking into account subsequent errors (see {@link #totalRetriesInARow()}). */ - interface RetrySignal { + public interface RetrySignal { /** * The ZERO BASED index number of this error (can also be read as how many retries have occurred @@ -98,24 +111,47 @@ default RetrySignal copy() { * * @param maxAttempts the maximum number of retry attempts to allow * @param minBackoff the minimum {@link Duration} for the first backoff - * @return the builder for further configuration + * @return the exponential backoff spec for further configuration * @see RetryBackoffSpec#maxAttempts(long) * @see RetryBackoffSpec#minBackoff(Duration) */ - static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { + //FIXME marble diagram + public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { return new RetryBackoffSpec(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel(), NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); } + /** + * A {@link RetryBackoffSpec} preconfigured for fixed delays (min backoff equals max backoff, no jitter), given a maximum number of retry attempts + * and the fixed {@link Duration} for the backoff. + *

+ * Note that calling {@link RetryBackoffSpec#minBackoff(Duration)} or {@link RetryBackoffSpec#maxBackoff(Duration)} would switch + * back to an exponential backoff strategy. + * + * @param maxAttempts the maximum number of retry attempts to allow + * @param fixedDelay the {@link Duration} of the fixed delays + * @return the fixed delays spec for further configuration + * @see RetryBackoffSpec#maxAttempts(long) + * @see RetryBackoffSpec#minBackoff(Duration) + * @see RetryBackoffSpec#maxBackoff(Duration) + */ + //FIXME marble diagram + public static RetryBackoffSpec fixedDelays(long maxAttempts, Duration fixedDelay) { + return new RetryBackoffSpec(maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers.parallel(), + NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, + RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); + } + /** * A {@link RetrySpec} preconfigured for a simple strategy with maximum number of retry attempts. * * @param max the maximum number of retry attempts to allow - * @return the builder for further configuration + * @return the max attempt spec for further configuration * @see RetrySpec#maxAttempts(long) */ - static RetrySpec max(long max) { + //FIXME marble diagram + public static RetrySpec max(long max) { return new RetrySpec(max, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetrySpec.RETRY_EXCEPTION_GENERATOR); } @@ -126,13 +162,40 @@ static RetrySpec max(long max) { * errors resets the counter (see {@link RetrySpec#transientErrors(boolean)}). * * @param maxInARow the maximum number of retry attempts to allow in a row, reset by successful onNext - * @return the builder for further configuration + * @return the max in a row spec for further configuration * @see RetrySpec#maxAttempts(long) * @see RetrySpec#transientErrors(boolean) */ - static RetrySpec maxInARow(long maxInARow) { + //FIXME marble diagram, point to it in RetrySpec#transientErrors javadoc + public static RetrySpec maxInARow(long maxInARow) { return new RetrySpec(maxInARow, t -> true, true, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RETRY_EXCEPTION_GENERATOR); } + /** + * A {@link RetrySpec} preconfigured for the most simplistic retry strategy: retry immediately and indefinitely + * (similar to {@link Flux#retry()}). + * + * @return the retry indefinitely spec for further configuration + */ + public static RetrySpec indefinitely() { + return new RetrySpec(Long.MAX_VALUE, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, + RetrySpec.RETRY_EXCEPTION_GENERATOR); + } + + /** + * A wrapper around {@link Function} to provide {@link Retry} by using lambda expressions. + * + * @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda + * @return the {@link Retry} strategy adapted from the {@link Function} + */ + public static final Retry fromFunction(Function, Publisher> function) { + return new Retry() { + @Override + public Publisher generateCompanion(Flux retrySignalCompanion) { + return function.apply(retrySignalCompanion); + } + }; + } + } diff --git a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java index 9517732eb3..405b69bb4a 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java @@ -23,7 +23,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import reactor.core.Exceptions; import reactor.core.publisher.Flux; @@ -32,12 +31,15 @@ import reactor.core.scheduler.Schedulers; /** - * A builder for a retry strategy based on exponential backoffs, with fine grained options. + * A {@link Retry} strategy based on exponential backoffs, with configurable features. Use {@link Retry#backoff(long, Duration)} + * to obtain a preconfigured instance to start with. + *

* Retry delays are randomized with a user-provided {@link #jitter(double)} factor between {@code 0.d} (no jitter) * and {@code 1.0} (default is {@code 0.5}). * Even with the jitter, the effective backoff delay cannot be less than {@link #minBackoff(Duration)} * nor more than {@link #maxBackoff(Duration)}. The delays and subsequent attempts are executed on the - * provided backoff {@link #scheduler(Scheduler)}. + * provided backoff {@link #scheduler(Scheduler)}. Alternatively, {@link Retry#fixedDelays(long, Duration)} provides + * a strategy where the min and max backoffs are the same and jitters are deactivated. *

* Only errors that match the {@link #filter(Predicate)} are retried (by default all), * and the number of attempts can also limited with {@link #maxAttempts(long)}. @@ -56,7 +58,7 @@ * * @author Simon Baslé */ -public final class RetryBackoffSpec implements Retry, Supplier { +public final class RetryBackoffSpec extends Retry { static final BiFunction BACKOFF_EXCEPTION_GENERATOR = (builder, rs) -> Exceptions.retryExhausted("Retries exhausted: " + ( @@ -558,9 +560,4 @@ public Flux generateCompanion(Flux t) { syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry); }); } - - @Override - public Retry get() { - return this; - } } diff --git a/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java b/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java index ade4efa99a..fd46e8b568 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetrySpec.java @@ -22,15 +22,16 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import java.util.function.Supplier; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * A builder for a simple count-based retry strategy with fine grained options: errors that match - * the {@link #filter(Predicate)} are retried (by default all), up to {@link #maxAttempts(long)} times. + * A simple count-based {@link Retry} strategy with configurable features. Use {@link Retry#max(long)}, + * {@link Retry#maxInARow(long)} or {@link Retry#indefinitely()} to obtain a preconfigured instance to start with. + *

+ * Only errors that match the {@link #filter(Predicate)} are retried (by default all), up to {@link #maxAttempts(long)} times. *

* When the maximum attempt of retries is reached, a runtime exception is propagated downstream which * can be pinpointed with {@link reactor.core.Exceptions#isRetryExhausted(Throwable)}. The cause of @@ -47,7 +48,7 @@ * * @author Simon Baslé */ -public final class RetrySpec implements Retry, Supplier { +public final class RetrySpec extends Retry { static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE); static final Consumer NO_OP_CONSUMER = rs -> {}; @@ -373,9 +374,4 @@ static Mono applyHooks(RetrySignal copyOfSignal, return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn); } - - @Override - public Retry get() { - return this; - } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index e7a652fe9a..fc5921480a 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -58,7 +58,7 @@ public class FluxRetryWhenTest { @Test(expected = NullPointerException.class) public void sourceNull() { - new FluxRetryWhen<>(null, v -> v); + new FluxRetryWhen<>(null, Retry.fromFunction(v -> v)); } @SuppressWarnings("deprecation") @@ -80,7 +80,7 @@ public void cancelsOther() { Flux when = Flux.range(1, 10) .doOnCancel(() -> cancelled.set(true)); - StepVerifier.create(justError.retryWhen((Retry) other -> when)) + StepVerifier.create(justError.retryWhen(Retry.fromFunction(other -> when))) .thenCancel() .verify(); @@ -93,7 +93,7 @@ public void cancelTwiceCancelsOtherOnce() { Flux when = Flux.range(1, 10) .doOnCancel(cancelled::incrementAndGet); - justError.retryWhen((Retry) other -> when) + justError.retryWhen(Retry.fromFunction(other -> when)) .subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { @@ -114,7 +114,7 @@ public void directOtherErrorPreventsSubscribe() { .doOnSubscribe(sub -> sourceSubscribed.set(true)) .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen((Retry) other -> Mono.error(new IllegalStateException("boom"))); + Flux retry = source.retryWhen(Retry.fromFunction(other -> Mono.error(new IllegalStateException("boom")))); StepVerifier.create(retry) .expectSubscription() @@ -134,8 +134,8 @@ public void lateOtherErrorCancelsSource() { .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen((Retry) other -> other.flatMap(l -> - count.getAndIncrement() == 0 ? Mono.just(l) : Mono.error(new IllegalStateException("boom")))); + Flux retry = source.retryWhen(Retry.fromFunction(other -> other.flatMap(l -> + count.getAndIncrement() == 0 ? Mono.just(l) : Mono.error(new IllegalStateException("boom"))))); StepVerifier.create(retry) .expectSubscription() @@ -155,7 +155,7 @@ public void directOtherEmptyPreventsSubscribeAndCompletes() { .doOnSubscribe(sub -> sourceSubscribed.set(true)) .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen((Retry) other -> Flux.empty()); + Flux retry = source.retryWhen(Retry.fromFunction(other -> Flux.empty())); StepVerifier.create(retry) .expectSubscription() @@ -173,7 +173,7 @@ public void lateOtherEmptyCancelsSourceAndCompletes() { .doOnSubscribe(sub -> sourceSubscribed.set(true)) .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen((Retry) other -> other.take(1)); + Flux retry = source.retryWhen(Retry.fromFunction(other -> other.take(1))); StepVerifier.create(retry) .expectSubscription() @@ -189,7 +189,7 @@ public void lateOtherEmptyCancelsSourceAndCompletes() { public void coldRepeater() { AssertSubscriber ts = AssertSubscriber.create(); - justError.retryWhen((Retry) other -> Flux.range(1, 10)) + justError.retryWhen(Retry.fromFunction(other -> Flux.range(1, 10))) .subscribe(ts); ts.assertValues(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1) @@ -201,7 +201,7 @@ public void coldRepeater() { public void coldRepeaterBackpressured() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen((Retry) other -> Flux.range(1, 5)) + rangeError.retryWhen(Retry.fromFunction(other -> Flux.range(1, 5))) .subscribe(ts); ts.assertNoValues() @@ -237,7 +237,7 @@ public void coldRepeaterBackpressured() { public void coldEmpty() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen((Retry) other -> Flux.empty()) + rangeError.retryWhen(Retry.fromFunction(other -> Flux.empty())) .subscribe(ts); ts.assertNoValues() @@ -249,7 +249,7 @@ public void coldEmpty() { public void coldError() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen((Retry) other -> Flux.error(new RuntimeException("forced failure"))) + rangeError.retryWhen(Retry.fromFunction(other -> Flux.error(new RuntimeException("forced failure")))) .subscribe(ts); ts.assertNoValues() @@ -262,9 +262,9 @@ public void coldError() { public void whenFactoryThrows() { AssertSubscriber ts = AssertSubscriber.create(); - rangeError.retryWhen((Retry) other -> { + rangeError.retryWhen(Retry.fromFunction(other -> { throw new RuntimeException("forced failure"); - }) + })) .subscribe(ts); ts.assertNoValues() @@ -277,7 +277,7 @@ public void whenFactoryThrows() { public void whenFactoryReturnsNull() { AssertSubscriber ts = AssertSubscriber.create(); - rangeError.retryWhen((Retry) other -> null) + rangeError.retryWhen(Retry.fromFunction(other -> null)) .subscribe(ts); ts.assertNoValues() @@ -289,9 +289,9 @@ public void whenFactoryReturnsNull() { public void retryErrorsInResponse() { AssertSubscriber ts = AssertSubscriber.create(); - rangeError.retryWhen((Retry) v -> v.map(a -> { + rangeError.retryWhen(Retry.fromFunction(v -> v.map(a -> { throw new RuntimeException("forced failure"); - })) + }))) .subscribe(ts); ts.assertValues(1, 2) @@ -305,7 +305,7 @@ public void retryErrorsInResponse() { public void retryAlways() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen((Retry) v -> v) + rangeError.retryWhen(Retry.fromFunction(v -> v)) .subscribe(ts); ts.request(8); @@ -332,6 +332,21 @@ Flux linearRetryScenario() { ); } + Flux fixedDelaysRetryScenario() { + AtomicInteger i = new AtomicInteger(); + return Flux.create(s -> { + if (i.incrementAndGet() == 4) { + s.next("hey"); + s.complete(); + } + else { + s.error(new RuntimeException("test " + i)); + } + }).retryWhen(Retry.fixedDelays(3, Duration.ofSeconds(3)) + .doBeforeRetry(rs -> System.out.println(rs.copy())) + ); + } + @Test public void linearRetry() { StepVerifier.withVirtualTime(this::linearRetryScenario) @@ -341,6 +356,16 @@ public void linearRetry() { .verify(); } + @Test + public void fixedDelaysRetry() { + StepVerifier.withVirtualTime(this::fixedDelaysRetryScenario) + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(3 * 3)) + .expectNext("hey") + .expectComplete() + .verify(); + } + @Test public void scanMainSubscriber() { CoreSubscriber actual = new LambdaSubscriber<>(null, e -> {}, null, null); @@ -399,7 +424,7 @@ public void retryWhenContextTrigger_MergesOriginalContext() { contextPerRetry.add(sig.getContext()); } }) - .retryWhen((Retry) retrySignalFlux -> retrySignalFlux.handle((rs, sink) -> { + .retryWhen(Retry.fromFunction(retrySignalFlux -> retrySignalFlux.handle((rs, sink) -> { Context ctx = sink.currentContext(); int rl = ctx.getOrDefault("retriesLeft", 0); if (rl > 0) { @@ -408,7 +433,7 @@ public void retryWhenContextTrigger_MergesOriginalContext() { else { sink.error(Exceptions.retryExhausted("retries exhausted", rs.failure())); } - })) + }))) .subscriberContext(Context.of("retriesLeft", RETRY_COUNT)) .subscriberContext(Context.of("thirdPartyContext", "present")); @@ -838,4 +863,33 @@ public static Flux transientErrorSource() { }); } + @Test + public void retryWhenThrowableCompanionIsComparableToRetryWhenRetryFromFunction() { + AtomicInteger sourceHelper = new AtomicInteger(); + Flux source = Flux.create(sink -> { + if (sourceHelper.getAndIncrement() == 3) { + sink.next(1).next(2).next(3).complete(); + } + else { + sink.error(new IllegalStateException("boom")); + } + }); + + //noinspection deprecation + StepVerifier.withVirtualTime(() -> source.retryWhen(companion -> companion.delayElements(Duration.ofSeconds(3)))) + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(3 * 3)) + .expectNext(1, 2, 3) + .verifyComplete(); + + sourceHelper.set(0); + StepVerifier.withVirtualTime(() -> source.retryWhen( + Retry.fromFunction(companion -> companion.delayElements(Duration.ofSeconds(3)))) + ) + .expectSubscription() + .expectNoEvent(Duration.ofSeconds(3 * 3)) + .expectNext(1, 2, 3) + .verifyComplete(); + } + } diff --git a/reactor-core/src/test/java/reactor/guide/GuideTests.java b/reactor-core/src/test/java/reactor/guide/GuideTests.java index f805ef73b2..dfa524a16c 100644 --- a/reactor-core/src/test/java/reactor/guide/GuideTests.java +++ b/reactor-core/src/test/java/reactor/guide/GuideTests.java @@ -658,7 +658,8 @@ public void errorHandlingRetryWhenApproximateRetry() { Flux flux = Flux.error(new IllegalArgumentException()) // <1> .doOnError(System.out::println) // <2> - .retryWhen((Function, Publisher>) companion -> companion.take(3)); // <3> + .retryWhen(Retry.fromFunction(companion -> // <3> + companion.take(3))); // <4> StepVerifier.create(flux) .verifyComplete(); @@ -673,12 +674,12 @@ public void errorHandlingRetryWhenEquatesRetry() { Flux flux = Flux.error(new IllegalArgumentException()) .doOnError(e -> errorCount.incrementAndGet()) - .retryWhen((Retry) companion -> // <1> + .retryWhen(Retry.fromFunction(companion -> // <1> companion.map(rs -> { // <2> if (rs.totalRetries() < 3) return rs.totalRetries(); // <3> else throw Exceptions.propagate(rs.failure()); // <4> }) - ); + )); StepVerifier.create(flux) .verifyError(IllegalArgumentException.class); @@ -1047,7 +1048,7 @@ private void printAndAssert(Throwable t, boolean checkForAssemblySuppressed) { assertThat(withSuppressed.getSuppressed()).hasSize(1); assertThat(withSuppressed.getSuppressed()[0]) .hasMessageStartingWith("\nAssembly trace from producer [reactor.core.publisher.MonoSingle] :") - .hasMessageContaining("Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1011)\n"); + .hasMessageContaining("Flux.single ⇢ at reactor.guide.GuideTests.scatterAndGather(GuideTests.java:1012)\n"); }); } } diff --git a/reactor-core/src/test/java/reactor/util/retry/RetrySpecTest.java b/reactor-core/src/test/java/reactor/util/retry/RetrySpecTest.java index 0da140112a..81dfefe627 100644 --- a/reactor-core/src/test/java/reactor/util/retry/RetrySpecTest.java +++ b/reactor-core/src/test/java/reactor/util/retry/RetrySpecTest.java @@ -377,28 +377,4 @@ public void cumulatedRetryHooksWithTransient() { "AsyncAfter D" ); } - - @SuppressWarnings("deprecation") - @Test - public void smokeTestLambdaAmbiguity() { - //the following should just compile - - Function, Publisher> functionBased = companion -> companion.take(3); - - Flux.range(1, 10) - .retryWhen(functionBased) - .blockLast(); - - Flux.range(1, 10) - .retryWhen(Retry.max(1).get()) - .blockLast(); - - Mono.just(1) - .retryWhen(functionBased) - .block(); - - Mono.just(1) - .retryWhen(Retry.max(1)) - .block(); - } } \ No newline at end of file From d1cde9c8ffcbb597e48d095698ade93d55310ef0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 19 Mar 2020 15:38:27 +0100 Subject: [PATCH 2/7] Internally store backoff Scheduler as Supplier, lazily evaluate it, allow reset to default --- .../main/java/reactor/util/retry/Retry.java | 4 +- .../reactor/util/retry/RetryBackoffSpec.java | 49 ++++++++++--------- .../util/retry/RetryBackoffSpecTest.java | 43 +++++++++++++++- 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index b770ff3df1..bd4bc56112 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -117,7 +117,7 @@ default RetrySignal copy() { */ //FIXME marble diagram public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { - return new RetryBackoffSpec(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel(), + return new RetryBackoffSpec(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); } @@ -138,7 +138,7 @@ public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { */ //FIXME marble diagram public static RetryBackoffSpec fixedDelays(long maxAttempts, Duration fixedDelay) { - return new RetryBackoffSpec(maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers.parallel(), + return new RetryBackoffSpec(maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); } diff --git a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java index 405b69bb4a..0c31f4511e 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java @@ -23,12 +23,14 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; +import java.util.function.Supplier; import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.util.annotation.Nullable; /** * A {@link Retry} strategy based on exponential backoffs, with configurable features. Use {@link Retry#backoff(long, Duration)} @@ -86,10 +88,10 @@ public final class RetryBackoffSpec extends Retry { public final double jitterFactor; /** - * The configured {@link Scheduler} on which to execute backoffs. + * The configured {@link Supplier} of {@link Scheduler} on which to execute backoffs. * @see #scheduler(Scheduler) */ - public final Scheduler backoffScheduler; + public final Supplier backoffSchedulerSupplier; /** * The configured maximum for retry attempts. @@ -125,7 +127,7 @@ public final class RetryBackoffSpec extends Retry { Predicate aThrowablePredicate, boolean isTransientErrors, Duration minBackoff, Duration maxBackoff, double jitterFactor, - Scheduler backoffScheduler, + Supplier backoffSchedulerSupplier, Consumer doPreRetry, Consumer doPostRetry, BiFunction, Mono> asyncPreRetry, @@ -137,7 +139,7 @@ public final class RetryBackoffSpec extends Retry { this.minBackoff = minBackoff; this.maxBackoff = maxBackoff; this.jitterFactor = jitterFactor; - this.backoffScheduler = backoffScheduler; + this.backoffSchedulerSupplier = backoffSchedulerSupplier; this.syncPreRetry = doPreRetry; this.syncPostRetry = doPostRetry; this.asyncPreRetry = asyncPreRetry; @@ -161,7 +163,7 @@ public RetryBackoffSpec maxAttempts(long maxAttempts) { this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry, this.syncPostRetry, this.asyncPreRetry, @@ -185,7 +187,7 @@ public RetryBackoffSpec filter(Predicate errorFilter) { this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry, this.syncPostRetry, this.asyncPreRetry, @@ -224,7 +226,7 @@ public RetryBackoffSpec modifyErrorFilter( this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry, this.syncPostRetry, this.asyncPreRetry, @@ -250,7 +252,7 @@ public RetryBackoffSpec doBeforeRetry( this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry.andThen(doBeforeRetry), this.syncPostRetry, this.asyncPreRetry, @@ -275,7 +277,7 @@ public RetryBackoffSpec doAfterRetry(Consumer doAfterRetry) { this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry, this.syncPostRetry.andThen(doAfterRetry), this.asyncPreRetry, @@ -299,7 +301,7 @@ public RetryBackoffSpec doBeforeRetryAsync( this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry, this.syncPostRetry, (rs, m) -> asyncPreRetry.apply(rs, m).then(doAsyncBeforeRetry.apply(rs)), @@ -323,7 +325,7 @@ public RetryBackoffSpec doAfterRetryAsync( this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler, + this.backoffSchedulerSupplier, this.syncPreRetry, this.syncPostRetry, this.asyncPreRetry, @@ -350,7 +352,7 @@ public RetryBackoffSpec onRetryExhaustedThrow(BiFunction backoffScheduler, this.syncPreRetry, this.syncPostRetry, this.asyncPreRetry, @@ -490,7 +491,6 @@ public RetryBackoffSpec scheduler(Scheduler backoffScheduler) { protected void validateArguments() { if (jitterFactor < 0 || jitterFactor > 1) throw new IllegalArgumentException("jitterFactor must be between 0 and 1 (default 0.5)"); - Objects.requireNonNull(this.backoffScheduler, "backoffScheduler must not be null (default Schedulers.parallel())"); } @Override @@ -556,7 +556,8 @@ public Flux generateCompanion(Flux t) { jitter = random.nextLong(lowBound, highBound); } Duration effectiveBackoff = nextBackoff.plusMillis(jitter); - return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, backoffScheduler), + return RetrySpec.applyHooks(copy, Mono.delay(effectiveBackoff, + backoffSchedulerSupplier.get()), syncPreRetry, syncPostRetry, asyncPreRetry, asyncPostRetry); }); } diff --git a/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java b/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java index f36ddfe16c..1f92adbbfa 100644 --- a/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java +++ b/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java @@ -19,6 +19,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -28,12 +29,13 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxRetryWhenTest; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.StepVerifierOptions; +import reactor.test.scheduler.VirtualTimeScheduler; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import static org.assertj.core.api.Assertions.*; public class RetryBackoffSpecTest { @@ -387,4 +389,41 @@ public void cumulatedRetryHooksWithTransient() { ); } + @Test + public void backoffSchedulerDefaultIsLazilyResolved() { + RetryBackoffSpec spec = Retry.backoff(3, Duration.ofMillis(10)); + + StepVerifier.withVirtualTime(() -> Flux.error(new IllegalStateException("boom")) + .retryWhen(spec) + ) + .expectSubscription() + .thenAwait(Duration.ofHours(1)) + .expectErrorMatches(Exceptions::isRetryExhausted) + .verify(Duration.ofSeconds(1)); + } + + @Test + public void backoffSchedulerIsEagerlyCaptured() { + RetryBackoffSpec spec = Retry.backoff(3, Duration.ofMillis(10)) + .scheduler(Schedulers.parallel()); + + StepVerifier.withVirtualTime(() -> Flux.error(new IllegalStateException("boom")) + .retryWhen(spec) + ) + .expectSubscription() + .thenAwait(Duration.ofHours(1)) + .expectErrorSatisfies(e -> assertThat(e).isInstanceOf(RejectedExecutionException.class)) + .verify(Duration.ofSeconds(1)); + } + + @Test + public void backoffSchedulerNullResetToDefaultSupplier() { + RetryBackoffSpec specTemplate = Retry.backoff(3, Duration.ofMillis(10)) + .scheduler(Schedulers.single()); + + RetryBackoffSpec spec = specTemplate.scheduler(null); + + assertThat(spec.backoffSchedulerSupplier.get()).isSameAs(Schedulers.parallel()); + } + } \ No newline at end of file From 01b59276fc237d434ab22cfb8578b9701bdd045b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 19 Mar 2020 15:38:49 +0100 Subject: [PATCH 3/7] rename fixedDelays to fixedDelay --- reactor-core/src/main/java/reactor/util/retry/Retry.java | 4 ++-- .../src/main/java/reactor/util/retry/RetryBackoffSpec.java | 2 +- .../test/java/reactor/core/publisher/FluxRetryWhenTest.java | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index bd4bc56112..0d07e59b5d 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -34,7 +34,7 @@ *

  • {@link #indefinitely()}
  • *
  • {@link #max(long)}
  • *
  • {@link #maxInARow(long)}
  • - *
  • {@link #fixedDelays(long, Duration)}
  • + *
  • {@link #fixedDelay(long, Duration)}
  • *
  • {@link #backoff(long, Duration)}
  • * *

    @@ -137,7 +137,7 @@ public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { * @see RetryBackoffSpec#maxBackoff(Duration) */ //FIXME marble diagram - public static RetryBackoffSpec fixedDelays(long maxAttempts, Duration fixedDelay) { + public static RetryBackoffSpec fixedDelay(long maxAttempts, Duration fixedDelay) { return new RetryBackoffSpec(maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR); diff --git a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java index 0c31f4511e..b69cbd4241 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java @@ -40,7 +40,7 @@ * and {@code 1.0} (default is {@code 0.5}). * Even with the jitter, the effective backoff delay cannot be less than {@link #minBackoff(Duration)} * nor more than {@link #maxBackoff(Duration)}. The delays and subsequent attempts are executed on the - * provided backoff {@link #scheduler(Scheduler)}. Alternatively, {@link Retry#fixedDelays(long, Duration)} provides + * provided backoff {@link #scheduler(Scheduler)}. Alternatively, {@link Retry#fixedDelay(long, Duration)} provides * a strategy where the min and max backoffs are the same and jitters are deactivated. *

    * Only errors that match the {@link #filter(Predicate)} are retried (by default all), diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index fc5921480a..0469ec365c 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -342,7 +342,7 @@ Flux fixedDelaysRetryScenario() { else { s.error(new RuntimeException("test " + i)); } - }).retryWhen(Retry.fixedDelays(3, Duration.ofSeconds(3)) + }).retryWhen(Retry.fixedDelay(3, Duration.ofSeconds(3)) .doBeforeRetry(rs -> System.out.println(rs.copy())) ); } From 3e90adb1b409dcbdbad98cb71465f77132e61be0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 19 Mar 2020 15:42:33 +0100 Subject: [PATCH 4/7] further remove builder word from javadocs --- .../reactor/util/retry/RetryBackoffSpec.java | 26 +++++++++---------- .../java/reactor/util/retry/RetrySpec.java | 18 ++++++------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java index b69cbd4241..15045a7374 100644 --- a/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java +++ b/reactor-core/src/main/java/reactor/util/retry/RetryBackoffSpec.java @@ -153,7 +153,7 @@ public final class RetryBackoffSpec extends Retry { * no more. * * @param maxAttempts the new retry attempt limit - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} */ public RetryBackoffSpec maxAttempts(long maxAttempts) { return new RetryBackoffSpec( @@ -177,7 +177,7 @@ public RetryBackoffSpec maxAttempts(long maxAttempts) { * sequence. Defaults to allowing retries for all exceptions. * * @param errorFilter the predicate to filter which exceptions can be retried - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} */ public RetryBackoffSpec filter(Predicate errorFilter) { return new RetryBackoffSpec( @@ -212,7 +212,7 @@ public RetryBackoffSpec filter(Predicate errorFilter) { * * @param predicateAdjuster a {@link Function} that returns a new {@link Predicate} given the * currently in place {@link Predicate} (usually deriving from the old predicate). - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} */ public RetryBackoffSpec modifyErrorFilter( Function, Predicate> predicateAdjuster) { @@ -240,7 +240,7 @@ public RetryBackoffSpec modifyErrorFilter( * might be executing in a shared thread. * * @param doBeforeRetry the synchronous hook to execute before retry trigger is emitted - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} * @see #doBeforeRetryAsync(Function) andDelayRetryWith for an asynchronous version */ public RetryBackoffSpec doBeforeRetry( @@ -266,7 +266,7 @@ public RetryBackoffSpec doBeforeRetry( * might be publishing events in a shared thread. * * @param doAfterRetry the synchronous hook to execute after retry trigger is started - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} * @see #doAfterRetryAsync(Function) andRetryThen for an asynchronous version */ public RetryBackoffSpec doAfterRetry(Consumer doAfterRetry) { @@ -290,7 +290,7 @@ public RetryBackoffSpec doAfterRetry(Consumer doAfterRetry) { * thus delaying the resulting retry trigger with the additional {@link Mono}. * * @param doAsyncBeforeRetry the asynchronous hook to execute before original retry trigger is emitted - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} */ public RetryBackoffSpec doBeforeRetryAsync( Function> doAsyncBeforeRetry) { @@ -314,7 +314,7 @@ public RetryBackoffSpec doBeforeRetryAsync( * thus delaying the resulting retry trigger with the additional {@link Mono}. * * @param doAsyncAfterRetry the asynchronous hook to execute after original retry trigger is emitted - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} */ public RetryBackoffSpec doAfterRetryAsync( Function> doAsyncAfterRetry) { @@ -342,7 +342,7 @@ public RetryBackoffSpec doAfterRetryAsync( * * @param retryExhaustedGenerator the {@link Function} that generates the {@link Throwable} for the last * {@link RetrySignal} - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetryBackoffSpec} which can either be further configured or used as {@link Retry} */ public RetryBackoffSpec onRetryExhaustedThrow(BiFunction retryExhaustedGenerator) { return new RetryBackoffSpec( @@ -370,7 +370,7 @@ public RetryBackoffSpec onRetryExhaustedThrow(BiFunction errorFilter) { return new RetrySpec( @@ -168,7 +168,7 @@ public RetrySpec filter(Predicate errorFilter) { * * @param predicateAdjuster a {@link Function} that returns a new {@link Predicate} given the * currently in place {@link Predicate} (usually deriving from the old predicate). - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetrySpec} which can either be further configured or used as {@link Retry} */ public RetrySpec modifyErrorFilter( Function, Predicate> predicateAdjuster) { @@ -192,7 +192,7 @@ public RetrySpec modifyErrorFilter( * might be executing in a shared thread. * * @param doBeforeRetry the synchronous hook to execute before retry trigger is emitted - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetrySpec} which can either be further configured or used as {@link Retry} * @see #doBeforeRetryAsync(Function) andDelayRetryWith for an asynchronous version */ public RetrySpec doBeforeRetry( @@ -214,7 +214,7 @@ public RetrySpec doBeforeRetry( * might be publishing events in a shared thread. * * @param doAfterRetry the synchronous hook to execute after retry trigger is started - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetrySpec} which can either be further configured or used as {@link Retry} * @see #doAfterRetryAsync(Function) andRetryThen for an asynchronous version */ public RetrySpec doAfterRetry(Consumer doAfterRetry) { @@ -234,7 +234,7 @@ public RetrySpec doAfterRetry(Consumer doAfterRetry) { * thus delaying the resulting retry trigger with the additional {@link Mono}. * * @param doAsyncBeforeRetry the asynchronous hook to execute before original retry trigger is emitted - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetrySpec} which can either be further configured or used as {@link Retry} */ public RetrySpec doBeforeRetryAsync( Function> doAsyncBeforeRetry) { @@ -254,7 +254,7 @@ public RetrySpec doBeforeRetryAsync( * thus delaying the resulting retry trigger with the additional {@link Mono}. * * @param doAsyncAfterRetry the asynchronous hook to execute after original retry trigger is emitted - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetrySpec} which can either be further configured or used as {@link Retry} */ public RetrySpec doAfterRetryAsync( Function> doAsyncAfterRetry) { @@ -277,7 +277,7 @@ public RetrySpec doAfterRetryAsync( * * @param retryExhaustedGenerator the {@link Function} that generates the {@link Throwable} for the last * {@link RetrySignal} - * @return a new copy of the builder which can either be further configured or used as {@link Retry} + * @return a new copy of the {@link RetrySpec} which can either be further configured or used as {@link Retry} */ public RetrySpec onRetryExhaustedThrow(BiFunction retryExhaustedGenerator) { return new RetrySpec( @@ -301,7 +301,7 @@ public RetrySpec onRetryExhaustedThrow(BiFunction Date: Thu, 19 Mar 2020 17:39:17 +0100 Subject: [PATCH 5/7] Add marble diagrams for Retry spec factories --- .../main/java/reactor/util/retry/Retry.java | 12 +- .../doc-files/marbles/retrySpecAttempts.svg | 4006 +++++++++++++++++ .../doc-files/marbles/retrySpecBackoff.svg | 1756 ++++++++ .../doc-files/marbles/retrySpecFixed.svg | 3399 ++++++++++++++ .../doc-files/marbles/retrySpecInARow.svg | 2875 ++++++++++++ 5 files changed, 12044 insertions(+), 4 deletions(-) create mode 100644 reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecAttempts.svg create mode 100644 reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecBackoff.svg create mode 100644 reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecFixed.svg create mode 100644 reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecInARow.svg diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index 0d07e59b5d..8979f7b68f 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -108,6 +108,8 @@ default RetrySignal copy() { /** * A {@link RetryBackoffSpec} preconfigured for exponential backoff strategy with jitter, given a maximum number of retry attempts * and a minimum {@link Duration} for the backoff. + *

    + * * * @param maxAttempts the maximum number of retry attempts to allow * @param minBackoff the minimum {@link Duration} for the first backoff @@ -115,7 +117,6 @@ default RetrySignal copy() { * @see RetryBackoffSpec#maxAttempts(long) * @see RetryBackoffSpec#minBackoff(Duration) */ - //FIXME marble diagram public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { return new RetryBackoffSpec(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, @@ -126,6 +127,8 @@ public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { * A {@link RetryBackoffSpec} preconfigured for fixed delays (min backoff equals max backoff, no jitter), given a maximum number of retry attempts * and the fixed {@link Duration} for the backoff. *

    + * + *

    * Note that calling {@link RetryBackoffSpec#minBackoff(Duration)} or {@link RetryBackoffSpec#maxBackoff(Duration)} would switch * back to an exponential backoff strategy. * @@ -136,7 +139,6 @@ public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) { * @see RetryBackoffSpec#minBackoff(Duration) * @see RetryBackoffSpec#maxBackoff(Duration) */ - //FIXME marble diagram public static RetryBackoffSpec fixedDelay(long maxAttempts, Duration fixedDelay) { return new RetryBackoffSpec(maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers::parallel, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, @@ -145,12 +147,13 @@ public static RetryBackoffSpec fixedDelay(long maxAttempts, Duration fixedDelay) /** * A {@link RetrySpec} preconfigured for a simple strategy with maximum number of retry attempts. + *

    + * * * @param max the maximum number of retry attempts to allow * @return the max attempt spec for further configuration * @see RetrySpec#maxAttempts(long) */ - //FIXME marble diagram public static RetrySpec max(long max) { return new RetrySpec(max, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RetrySpec.RETRY_EXCEPTION_GENERATOR); @@ -160,13 +163,14 @@ public static RetrySpec max(long max) { * A {@link RetrySpec} preconfigured for a simple strategy with maximum number of retry attempts over * subsequent transient errors. An {@link org.reactivestreams.Subscriber#onNext(Object)} between * errors resets the counter (see {@link RetrySpec#transientErrors(boolean)}). + *

    + * * * @param maxInARow the maximum number of retry attempts to allow in a row, reset by successful onNext * @return the max in a row spec for further configuration * @see RetrySpec#maxAttempts(long) * @see RetrySpec#transientErrors(boolean) */ - //FIXME marble diagram, point to it in RetrySpec#transientErrors javadoc public static RetrySpec maxInARow(long maxInARow) { return new RetrySpec(maxInARow, t -> true, true, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION, RETRY_EXCEPTION_GENERATOR); diff --git a/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecAttempts.svg b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecAttempts.svg new file mode 100644 index 0000000000..d807fc3404 --- /dev/null +++ b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecAttempts.svg @@ -0,0 +1,4006 @@ + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + retryWhen(spec ) + + + + + subscribe() + + + + subscribe() + + + + + subscribe() + + + + + + + + + + + + + + + + + + + + + + + + retryWhen(spec ) + + + + subscribe() + + subscribe() + + + + + subscribe() + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Retry.max(1) + ) + + + + + + + + + + + + + + + ! + + 0 + + 1 + spec = + + + + + + + + + diff --git a/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecBackoff.svg b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecBackoff.svg new file mode 100644 index 0000000000..7513f3b16a --- /dev/null +++ b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecBackoff.svg @@ -0,0 +1,1756 @@ + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + subscribe() + + + + + + + + + subscribe() + + + + + subscribe() + + + + subscribe() + + + + + + + + + + + + + + + + + + subscribe() + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ? + + + + ? + retryWhen(spec) + + + + + + jitter + + + + Retry.backoff(3,Duration.ofMillis(100)) + ) + + + + + + + + + + + + + + + + + + + + + + + + + ! + + ! + + 0 + + 1 + + 2 + spec = + + + + + + + + + + + diff --git a/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecFixed.svg b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecFixed.svg new file mode 100644 index 0000000000..481400ab8c --- /dev/null +++ b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecFixed.svg @@ -0,0 +1,3399 @@ + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + subscribe() + + + + + + + subscribe() + + + + subscribe() + + + + + + retryWhen(spec) + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + ! + + + Retry.fixedDelay(2,Duration.ofMillis(100)) + ) + + + + + + + + + + + + + + + + + + + + + + + + + ! + + ! + + 0 + + 1 + + 2 + spec = + + + + + + + + + + + + + + + + subscribe() + + + + ! + + + + + + + + + + diff --git a/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecInARow.svg b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecInARow.svg new file mode 100644 index 0000000000..d26b9b4c61 --- /dev/null +++ b/reactor-core/src/main/java/reactor/util/retry/doc-files/marbles/retrySpecInARow.svg @@ -0,0 +1,2875 @@ + + + + + + image/svg+xml + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + subscribe() + + + + subscribe() + + subscribe() + + + + + + + + retryWhen(spec) + + + + + + + + + + + + + + + Retry.maxInARow(1) + ) + + + + + + + + + + + + + + + ! + + 0 + + 1 + spec = + + + + + + + + + + + + + + + + subscribe() + + + + + + + subscribe() + + + + + 0 + -1 + 0 + -1 + 0 + 1 + From 761977575210f1a6a770f74b8674a153835f65e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 19 Mar 2020 18:58:36 +0100 Subject: [PATCH 6/7] remove outdated test --- .../java/reactor/util/retry/RetryBackoffSpecTest.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java b/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java index 1f92adbbfa..c3d252179a 100644 --- a/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java +++ b/reactor-core/src/test/java/reactor/util/retry/RetryBackoffSpecTest.java @@ -29,22 +29,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxRetryWhenTest; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.StepVerifierOptions; -import reactor.test.scheduler.VirtualTimeScheduler; -import static org.assertj.core.api.Assertions.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; public class RetryBackoffSpecTest { - @Test - public void suppressingSchedulerFails() { - assertThatNullPointerException().isThrownBy(() -> Retry.backoff(1, Duration.ZERO).scheduler(null)) - .withMessage("backoffScheduler"); - } - @Test public void builderMethodsProduceNewInstances() { RetryBackoffSpec init = Retry.backoff(1, Duration.ZERO); From f304598274d80c4f195a753e54d3f4a2e042a6b0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Thu, 19 Mar 2020 19:05:15 +0100 Subject: [PATCH 7/7] Rename Retry#fromFunction to Retry#from --- docs/asciidoc/coreFeatures.adoc | 4 +-- docs/asciidoc/snippetRetryWhenRetry.adoc | 2 +- .../java/reactor/core/publisher/Flux.java | 4 +-- .../java/reactor/core/publisher/Mono.java | 4 +-- .../main/java/reactor/util/retry/Retry.java | 2 +- .../core/publisher/FluxRetryWhenTest.java | 34 +++++++++---------- .../test/java/reactor/guide/GuideTests.java | 4 +-- 7 files changed, 27 insertions(+), 27 deletions(-) diff --git a/docs/asciidoc/coreFeatures.adoc b/docs/asciidoc/coreFeatures.adoc index 27f758b5f5..1a600e12dc 100644 --- a/docs/asciidoc/coreFeatures.adoc +++ b/docs/asciidoc/coreFeatures.adoc @@ -882,7 +882,7 @@ condition. The companion `Flux` is a `Flux` that gets passed to a `Retry` strategy/function, supplied as the sole parameter of `retryWhen`. As the user, you define that function and make it return a new `Publisher`. The `Retry` class is an abstract class, but it offers a factory method if you -want to transform the companion with a simple lambda (`Retry.fromFunction(Function)`). +want to transform the companion with a simple lambda (`Retry.from(Function)`). Retry cycles go as follows: @@ -906,7 +906,7 @@ companion would effectively swallow an error. Consider the following way of emul Flux flux = Flux .error(new IllegalArgumentException()) // <1> .doOnError(System.out::println) // <2> - .retryWhen(Retry.fromFunction(companion -> // <3> + .retryWhen(Retry.from(companion -> // <3> companion.take(3))); // <4> ---- <1> This continuously produces errors, calling for retry attempts. diff --git a/docs/asciidoc/snippetRetryWhenRetry.adoc b/docs/asciidoc/snippetRetryWhenRetry.adoc index 6c4ec7e70b..75aad1989e 100644 --- a/docs/asciidoc/snippetRetryWhenRetry.adoc +++ b/docs/asciidoc/snippetRetryWhenRetry.adoc @@ -5,7 +5,7 @@ AtomicInteger errorCount = new AtomicInteger(); Flux flux = Flux.error(new IllegalArgumentException()) .doOnError(e -> errorCount.incrementAndGet()) - .retryWhen(Retry.fromFunction(companion -> // <1> + .retryWhen(Retry.from(companion -> // <1> companion.map(rs -> { // <2> if (rs.totalRetries() < 3) return rs.totalRetries(); // <3> else throw Exceptions.propagate(rs.failure()); // <4> diff --git a/reactor-core/src/main/java/reactor/core/publisher/Flux.java b/reactor-core/src/main/java/reactor/core/publisher/Flux.java index 9199c34856..2b31edb817 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Flux.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Flux.java @@ -7241,14 +7241,14 @@ public final Flux retry(long numRetries, Predicate retryMa * @return a {@link Flux} that retries on onError when the companion {@link Publisher} produces an * onNext signal * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make - * use of the error can simply be converted by wrapping via {@link Retry#fromFunction(Function)}. + * use of the error can simply be converted by wrapping via {@link Retry#from(Function)}. * Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal} * emitted by the companion to its {@link Retry.RetrySignal#failure()}. */ @Deprecated public final Flux retryWhen(Function, ? extends Publisher> whenFactory) { Objects.requireNonNull(whenFactory, "whenFactory"); - return onAssembly(new FluxRetryWhen<>(this, Retry.fromFunction(fluxRetryWhenState -> fluxRetryWhenState + return onAssembly(new FluxRetryWhen<>(this, Retry.from(fluxRetryWhenState -> fluxRetryWhenState .map(Retry.RetrySignal::failure) .as(whenFactory))) ); diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 190b339676..f5f51fde47 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -3717,14 +3717,14 @@ public final Mono retry(long numRetries, Predicate retryMa * @return a {@link Mono} that retries on onError when the companion {@link Publisher} produces an * onNext signal * @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make - * use of the error can simply be converted by wrapping via {@link Retry#fromFunction(Function)}. + * use of the error can simply be converted by wrapping via {@link Retry#from(Function)}. * Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal} * emitted by the companion to its {@link Retry.RetrySignal#failure()}. */ @Deprecated public final Mono retryWhen(Function, ? extends Publisher> whenFactory) { Objects.requireNonNull(whenFactory, "whenFactory"); - return onAssembly(new MonoRetryWhen<>(this, Retry.fromFunction(rws -> whenFactory.apply(rws.map( + return onAssembly(new MonoRetryWhen<>(this, Retry.from(rws -> whenFactory.apply(rws.map( Retry.RetrySignal::failure))))); } diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index 8979f7b68f..dcda1b3974 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -193,7 +193,7 @@ public static RetrySpec indefinitely() { * @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda * @return the {@link Retry} strategy adapted from the {@link Function} */ - public static final Retry fromFunction(Function, Publisher> function) { + public static final Retry from(Function, Publisher> function) { return new Retry() { @Override public Publisher generateCompanion(Flux retrySignalCompanion) { diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index 0469ec365c..4a1642cbc6 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -58,7 +58,7 @@ public class FluxRetryWhenTest { @Test(expected = NullPointerException.class) public void sourceNull() { - new FluxRetryWhen<>(null, Retry.fromFunction(v -> v)); + new FluxRetryWhen<>(null, Retry.from(v -> v)); } @SuppressWarnings("deprecation") @@ -80,7 +80,7 @@ public void cancelsOther() { Flux when = Flux.range(1, 10) .doOnCancel(() -> cancelled.set(true)); - StepVerifier.create(justError.retryWhen(Retry.fromFunction(other -> when))) + StepVerifier.create(justError.retryWhen(Retry.from(other -> when))) .thenCancel() .verify(); @@ -93,7 +93,7 @@ public void cancelTwiceCancelsOtherOnce() { Flux when = Flux.range(1, 10) .doOnCancel(cancelled::incrementAndGet); - justError.retryWhen(Retry.fromFunction(other -> when)) + justError.retryWhen(Retry.from(other -> when)) .subscribe(new BaseSubscriber() { @Override protected void hookOnSubscribe(Subscription subscription) { @@ -114,7 +114,7 @@ public void directOtherErrorPreventsSubscribe() { .doOnSubscribe(sub -> sourceSubscribed.set(true)) .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen(Retry.fromFunction(other -> Mono.error(new IllegalStateException("boom")))); + Flux retry = source.retryWhen(Retry.from(other -> Mono.error(new IllegalStateException("boom")))); StepVerifier.create(retry) .expectSubscription() @@ -134,7 +134,7 @@ public void lateOtherErrorCancelsSource() { .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen(Retry.fromFunction(other -> other.flatMap(l -> + Flux retry = source.retryWhen(Retry.from(other -> other.flatMap(l -> count.getAndIncrement() == 0 ? Mono.just(l) : Mono.error(new IllegalStateException("boom"))))); StepVerifier.create(retry) @@ -155,7 +155,7 @@ public void directOtherEmptyPreventsSubscribeAndCompletes() { .doOnSubscribe(sub -> sourceSubscribed.set(true)) .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen(Retry.fromFunction(other -> Flux.empty())); + Flux retry = source.retryWhen(Retry.from(other -> Flux.empty())); StepVerifier.create(retry) .expectSubscription() @@ -173,7 +173,7 @@ public void lateOtherEmptyCancelsSourceAndCompletes() { .doOnSubscribe(sub -> sourceSubscribed.set(true)) .doOnCancel(() -> sourceCancelled.set(true)); - Flux retry = source.retryWhen(Retry.fromFunction(other -> other.take(1))); + Flux retry = source.retryWhen(Retry.from(other -> other.take(1))); StepVerifier.create(retry) .expectSubscription() @@ -189,7 +189,7 @@ public void lateOtherEmptyCancelsSourceAndCompletes() { public void coldRepeater() { AssertSubscriber ts = AssertSubscriber.create(); - justError.retryWhen(Retry.fromFunction(other -> Flux.range(1, 10))) + justError.retryWhen(Retry.from(other -> Flux.range(1, 10))) .subscribe(ts); ts.assertValues(1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1) @@ -201,7 +201,7 @@ public void coldRepeater() { public void coldRepeaterBackpressured() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen(Retry.fromFunction(other -> Flux.range(1, 5))) + rangeError.retryWhen(Retry.from(other -> Flux.range(1, 5))) .subscribe(ts); ts.assertNoValues() @@ -237,7 +237,7 @@ public void coldRepeaterBackpressured() { public void coldEmpty() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen(Retry.fromFunction(other -> Flux.empty())) + rangeError.retryWhen(Retry.from(other -> Flux.empty())) .subscribe(ts); ts.assertNoValues() @@ -249,7 +249,7 @@ public void coldEmpty() { public void coldError() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen(Retry.fromFunction(other -> Flux.error(new RuntimeException("forced failure")))) + rangeError.retryWhen(Retry.from(other -> Flux.error(new RuntimeException("forced failure")))) .subscribe(ts); ts.assertNoValues() @@ -262,7 +262,7 @@ public void coldError() { public void whenFactoryThrows() { AssertSubscriber ts = AssertSubscriber.create(); - rangeError.retryWhen(Retry.fromFunction(other -> { + rangeError.retryWhen(Retry.from(other -> { throw new RuntimeException("forced failure"); })) .subscribe(ts); @@ -277,7 +277,7 @@ public void whenFactoryThrows() { public void whenFactoryReturnsNull() { AssertSubscriber ts = AssertSubscriber.create(); - rangeError.retryWhen(Retry.fromFunction(other -> null)) + rangeError.retryWhen(Retry.from(other -> null)) .subscribe(ts); ts.assertNoValues() @@ -289,7 +289,7 @@ public void whenFactoryReturnsNull() { public void retryErrorsInResponse() { AssertSubscriber ts = AssertSubscriber.create(); - rangeError.retryWhen(Retry.fromFunction(v -> v.map(a -> { + rangeError.retryWhen(Retry.from(v -> v.map(a -> { throw new RuntimeException("forced failure"); }))) .subscribe(ts); @@ -305,7 +305,7 @@ public void retryErrorsInResponse() { public void retryAlways() { AssertSubscriber ts = AssertSubscriber.create(0); - rangeError.retryWhen(Retry.fromFunction(v -> v)) + rangeError.retryWhen(Retry.from(v -> v)) .subscribe(ts); ts.request(8); @@ -424,7 +424,7 @@ public void retryWhenContextTrigger_MergesOriginalContext() { contextPerRetry.add(sig.getContext()); } }) - .retryWhen(Retry.fromFunction(retrySignalFlux -> retrySignalFlux.handle((rs, sink) -> { + .retryWhen(Retry.from(retrySignalFlux -> retrySignalFlux.handle((rs, sink) -> { Context ctx = sink.currentContext(); int rl = ctx.getOrDefault("retriesLeft", 0); if (rl > 0) { @@ -884,7 +884,7 @@ public void retryWhenThrowableCompanionIsComparableToRetryWhenRetryFromFunction( sourceHelper.set(0); StepVerifier.withVirtualTime(() -> source.retryWhen( - Retry.fromFunction(companion -> companion.delayElements(Duration.ofSeconds(3)))) + Retry.from(companion -> companion.delayElements(Duration.ofSeconds(3)))) ) .expectSubscription() .expectNoEvent(Duration.ofSeconds(3 * 3)) diff --git a/reactor-core/src/test/java/reactor/guide/GuideTests.java b/reactor-core/src/test/java/reactor/guide/GuideTests.java index dfa524a16c..3ac712f1d3 100644 --- a/reactor-core/src/test/java/reactor/guide/GuideTests.java +++ b/reactor-core/src/test/java/reactor/guide/GuideTests.java @@ -658,7 +658,7 @@ public void errorHandlingRetryWhenApproximateRetry() { Flux flux = Flux.error(new IllegalArgumentException()) // <1> .doOnError(System.out::println) // <2> - .retryWhen(Retry.fromFunction(companion -> // <3> + .retryWhen(Retry.from(companion -> // <3> companion.take(3))); // <4> StepVerifier.create(flux) @@ -674,7 +674,7 @@ public void errorHandlingRetryWhenEquatesRetry() { Flux flux = Flux.error(new IllegalArgumentException()) .doOnError(e -> errorCount.incrementAndGet()) - .retryWhen(Retry.fromFunction(companion -> // <1> + .retryWhen(Retry.from(companion -> // <1> companion.map(rs -> { // <2> if (rs.totalRetries() < 3) return rs.totalRetries(); // <3> else throw Exceptions.propagate(rs.failure()); // <4>