Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Follow up to #1979 to avoid retryWhen ambiguity #2079

Merged
merged 7 commits into from Mar 19, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/asciidoc/apdx-operatorChoice.adoc
Expand Up @@ -230,7 +230,7 @@ I want to deal with:
** by retrying...
*** ...with a simple policy (max number of attempts): `retry()`, `retry(long)`
*** ...triggered by a companion control Flux: `retryWhen`
*** ...using a standard backoff strategy (exponential backoff with jitter): `retryWhen(Retry.backoff(...))`
*** ...using a standard backoff strategy (exponential backoff with jitter): `retryWhen(Retry.backoff(...))` (see also other factory methods in `Retry`)

* I want to deal with backpressure "errors" (request max from upstream and apply the strategy when downstream does not produce enough request)...
** by throwing a special `IllegalStateException`: `Flux#onBackpressureError`
Expand Down
20 changes: 12 additions & 8 deletions docs/asciidoc/coreFeatures.adoc
Expand Up @@ -881,11 +881,15 @@ condition.

The companion `Flux` is a `Flux<RetrySignal>` that gets passed to a `Retry` strategy/function,
supplied as the sole parameter of `retryWhen`. As the user, you define that function and make it return a new
`Publisher<?>`. Retry cycles go as follows:
`Publisher<?>`. The `Retry` class is an abstract class, but it offers a factory method if you
want to transform the companion with a simple lambda (`Retry.fromFunction(Function)`).

. Each time an error happens (giving potential for a retry), the error is emitted into the
Retry cycles go as follows:

. Each time an error happens (giving potential for a retry), a `RetrySignal` is emitted into the
companion `Flux`, which has been decorated by your function. Having a `Flux` here
gives a bird eye's view of all the attempts so far.
gives a bird eye's view of all the attempts so far. The `RetrySignal` gives access to the error
as well as metadata around it.
. If the companion `Flux` emits a value, a retry happens.
. If the companion `Flux` completes, the error is swallowed, the retry cycle stops,
and the resulting sequence completes, too.
Expand All @@ -902,12 +906,12 @@ companion would effectively swallow an error. Consider the following way of emul
Flux<String> flux = Flux
.<String>error(new IllegalArgumentException()) // <1>
.doOnError(System.out::println) // <2>
.retryWhen(() -> // <3>
companion -> companion.take(3)); // <4>
.retryWhen(Retry.fromFunction(companion -> // <3>
companion.take(3))); // <4>
----
<1> This continuously produces errors, calling for retry attempts.
<2> `doOnError` before the retry lets us log and see all failures.
<3> The `Retry` function is passed as a `Supplier`
<3> The `Retry` is adapted from a very simple `Function` lambda
<4> Here, we consider the first three errors as retry-able (`take(3)`) and then give up.
====

Expand All @@ -919,12 +923,12 @@ Getting to the same behavior involves a few additional tricks:
include::snippetRetryWhenRetry.adoc[]

TIP: One can use the builders exposed in `Retry` to achieve the same in a more fluent manner, as
well as more finely tuned retry strategies: `errorFlux.retryWhen(Retry.max(3));`.
well as more finely tuned retry strategies. For example: `errorFlux.retryWhen(Retry.max(3));`.

TIP: You can use similar code to implement an "`exponential backoff and retry`" pattern,
as shown in the <<faq.exponentialBackoff,FAQ>>.

The core-provided `Retry` builders, `RetrySpec` and `RetryBackoffSpec`, both allow advanced customizations like:
The core-provided `Retry` helpers, `RetrySpec` and `RetryBackoffSpec`, both allow advanced customizations like:

- setting the `filter(Predicate)` for the exceptions that can trigger a retry
- modifying such a previously set filter through `modifyErrorFilter(Function)`
Expand Down
2 changes: 1 addition & 1 deletion docs/asciidoc/faq.adoc
Expand Up @@ -176,7 +176,7 @@ an unstable state and is not likely to immediately recover from it. So blindly
retrying immediately is likely to produce yet another error and add to the
instability.

Since `3.3.4.RELEASE`, Reactor comes with a builder for such a retry baked in: `Retry.backoff`.
Since `3.3.4.RELEASE`, Reactor comes with a builder for such a retry, to be used with `Flux#retryWhen`: `Retry.backoff`.

The following example showcases a simple use of the builder, with hooks logging message right before
and after the retry attempt delays.
Expand Down
6 changes: 3 additions & 3 deletions docs/asciidoc/snippetRetryWhenRetry.adoc
Expand Up @@ -5,14 +5,14 @@ AtomicInteger errorCount = new AtomicInteger();
Flux<String> flux =
Flux.<String>error(new IllegalArgumentException())
.doOnError(e -> errorCount.incrementAndGet())
.retryWhen(() -> companion -> // <1>
.retryWhen(Retry.fromFunction(companion -> // <1>
companion.map(rs -> { // <2>
if (rs.totalRetries() < 3) return rs.totalRetries(); // <3>
else throw Exceptions.propagate(rs.failure()); // <4>
})
);
));
----
<1> `retryWhen` expects a `Supplier<Retry>`
<1> We customize `Retry` by adapting from a `Function` lambda rather than providing a concrete class
<2> The companion emits `RetrySignal` objects, which bear number of retries so far and last failure
<3> To allow for three retries, we consider indexes < 3 and return a value to emit (here we simply return the index).
<4> In order to terminate the sequence in error, we throw the original exception after
Expand Down
14 changes: 9 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Flux.java
Expand Up @@ -7240,14 +7240,18 @@ public final Flux<T> retry(long numRetries, Predicate<? super Throwable> retryMa
*
* @return a {@link Flux} that retries on onError when the companion {@link Publisher} produces an
* onNext signal
* @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4
* @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make
* use of the error can simply be converted by wrapping via {@link Retry#fromFunction(Function)}.
* Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal}
* emitted by the companion to its {@link Retry.RetrySignal#failure()}.
*/
@Deprecated
public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
Objects.requireNonNull(whenFactory, "whenFactory");
return onAssembly(new FluxRetryWhen<>(this, fluxRetryWhenState -> fluxRetryWhenState
return onAssembly(new FluxRetryWhen<>(this, Retry.fromFunction(fluxRetryWhenState -> fluxRetryWhenState
.map(Retry.RetrySignal::failure)
.as(whenFactory)));
.as(whenFactory)))
);
}

/**
Expand Down Expand Up @@ -7276,7 +7280,7 @@ public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
* the previous Context:
* <blockquote><pre>
* {@code
* Retry customStrategy = companion -> companion.handle((retrySignal, sink) -> {
* Retry customStrategy = Retry.fromFunction(companion -> companion.handle((retrySignal, sink) -> {
* Context ctx = sink.currentContext();
* int rl = ctx.getOrDefault("retriesLeft", 0);
* if (rl > 0) {
Expand All @@ -7287,7 +7291,7 @@ public final Flux<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
* } else {
* sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
* }
* });
* }));
* Flux<T> retried = originalFlux.retryWhen(customStrategy);
* }</pre>
* </blockquote>
Expand Down
13 changes: 8 additions & 5 deletions reactor-core/src/main/java/reactor/core/publisher/Mono.java
Expand Up @@ -3716,13 +3716,16 @@ public final Mono<T> retry(long numRetries, Predicate<? super Throwable> retryMa
*
* @return a {@link Mono} that retries on onError when the companion {@link Publisher} produces an
* onNext signal
* @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4
* @deprecated use {@link #retryWhen(Retry)} instead, to be removed in 3.4. Lambda Functions that don't make
* use of the error can simply be converted by wrapping via {@link Retry#fromFunction(Function)}.
* Functions that do use the error will additionally need to map the {@link reactor.util.retry.Retry.RetrySignal}
* emitted by the companion to its {@link Retry.RetrySignal#failure()}.
*/
@Deprecated
public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>> whenFactory) {
Objects.requireNonNull(whenFactory, "whenFactory");
return onAssembly(new MonoRetryWhen<>(this, (Flux<Retry.RetrySignal> rws) -> whenFactory.apply(rws.map(
Retry.RetrySignal::failure))));
return onAssembly(new MonoRetryWhen<>(this, Retry.fromFunction(rws -> whenFactory.apply(rws.map(
Retry.RetrySignal::failure)))));
}

/**
Expand Down Expand Up @@ -3752,7 +3755,7 @@ public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
* <blockquote>
* <pre>
* {@code
* Retry customStrategy = companion -> companion.handle((retrySignal, sink) -> {
* Retry customStrategy = Retry.fromFunction(companion -> companion.handle((retrySignal, sink) -> {
* Context ctx = sink.currentContext();
* int rl = ctx.getOrDefault("retriesLeft", 0);
* if (rl > 0) {
Expand All @@ -3763,7 +3766,7 @@ public final Mono<T> retryWhen(Function<Flux<Throwable>, ? extends Publisher<?>>
* } else {
* sink.error(Exceptions.retryExhausted("retries exhausted", retrySignal.failure()));
* }
* });
* }));
* Mono<T> retried = originalMono.retryWhen(customStrategy);
* }</pre>
* </blockquote>
Expand Down
93 changes: 78 additions & 15 deletions reactor-core/src/main/java/reactor/util/retry/Retry.java
Expand Up @@ -17,6 +17,7 @@
package reactor.util.retry;

import java.time.Duration;
import java.util.function.Function;

import org.reactivestreams.Publisher;

Expand All @@ -26,13 +27,25 @@
import static reactor.util.retry.RetrySpec.*;

/**
* Functional interface to configure retries depending on a companion {@link Flux} of {@link RetrySignal},
* as well as builders for such {@link Flux#retryWhen(Retry)} retries} companions.
* Base abstract class for a strategy to decide when to retry given a companion {@link Flux} of {@link RetrySignal},
* for use with {@link Flux#retryWhen(Retry)} and {@link reactor.core.publisher.Mono#retryWhen(Retry)}.
* Also provides access to configurable built-in strategies via static factory methods:
* <ul>
* <li>{@link #indefinitely()}</li>
* <li>{@link #max(long)}</li>
* <li>{@link #maxInARow(long)}</li>
* <li>{@link #fixedDelay(long, Duration)}</li>
* <li>{@link #backoff(long, Duration)}</li>
* </ul>
* <p>
* Users are encouraged to provide either concrete custom {@link Retry} strategies or builders that produce
* such concrete {@link Retry}. The {@link RetrySpec} returned by eg. {@link #max(long)} is a good inspiration
* for a fluent approach that generates a {@link Retry} at each step and uses immutability/copy-on-write to enable
* sharing of intermediate steps (that can thus be considered templates).
*
* @author Simon Baslé
*/
@FunctionalInterface
public interface Retry {
public abstract class Retry {

/**
* The intent of the functional {@link Retry} class is to let users configure how to react to {@link RetrySignal}
Expand All @@ -41,19 +54,19 @@ public interface Retry {
* the attempt is delayed as well. This method generates the companion, out of a {@link Flux} of {@link RetrySignal},
* which itself can serve as the simplest form of retry companion (indefinitely and immediately retry on any error).
*
* @param retrySignalCompanion the original {@link Flux} of {@link RetrySignal}, notifying of each source error that
* _might_ result in a retry attempt, with context around the error and current retry cycle.
* @param retrySignals the original {@link Flux} of {@link RetrySignal}, notifying of each source error that
* <i>might</i> result in a retry attempt, with context around the error and current retry cycle.
* @return the actual companion to use, which might delay or limit retry attempts
*/
Publisher<?> generateCompanion(Flux<RetrySignal> retrySignalCompanion);
public abstract Publisher<?> generateCompanion(Flux<RetrySignal> retrySignals);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in an abstract class, you may want to have a final method that wraps the user-provided one, so that we keep some control. Usually it makes it easier to make changes in future


/**
* State for a {@link Flux#retryWhen(Retry)} Flux retry} or {@link reactor.core.publisher.Mono#retryWhen(Retry) Mono retry}.
* A flux of states is passed to the user, which gives information about the {@link #failure()} that potentially triggers
* a retry as well as two indexes: the number of errors that happened so far (and were retried) and the same number,
* but only taking into account <strong>subsequent</strong> errors (see {@link #totalRetriesInARow()}).
*/
interface RetrySignal {
public interface RetrySignal {

/**
* The ZERO BASED index number of this error (can also be read as how many retries have occurred
Expand Down Expand Up @@ -98,12 +111,34 @@ default RetrySignal copy() {
*
* @param maxAttempts the maximum number of retry attempts to allow
* @param minBackoff the minimum {@link Duration} for the first backoff
* @return the builder for further configuration
* @return the exponential backoff spec for further configuration
* @see RetryBackoffSpec#maxAttempts(long)
* @see RetryBackoffSpec#minBackoff(Duration)
*/
static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) {
return new RetryBackoffSpec(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel(),
//FIXME marble diagram
public static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) {
return new RetryBackoffSpec(maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers::parallel,
NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION,
RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR);
}

/**
* A {@link RetryBackoffSpec} preconfigured for fixed delays (min backoff equals max backoff, no jitter), given a maximum number of retry attempts
* and the fixed {@link Duration} for the backoff.
* <p>
* Note that calling {@link RetryBackoffSpec#minBackoff(Duration)} or {@link RetryBackoffSpec#maxBackoff(Duration)} would switch
* back to an exponential backoff strategy.
*
* @param maxAttempts the maximum number of retry attempts to allow
* @param fixedDelay the {@link Duration} of the fixed delays
* @return the fixed delays spec for further configuration
* @see RetryBackoffSpec#maxAttempts(long)
* @see RetryBackoffSpec#minBackoff(Duration)
* @see RetryBackoffSpec#maxBackoff(Duration)
*/
//FIXME marble diagram
public static RetryBackoffSpec fixedDelay(long maxAttempts, Duration fixedDelay) {
return new RetryBackoffSpec(maxAttempts, t -> true, false, fixedDelay, fixedDelay, 0d, Schedulers::parallel,
NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION,
RetryBackoffSpec.BACKOFF_EXCEPTION_GENERATOR);
}
Expand All @@ -112,10 +147,11 @@ static RetryBackoffSpec backoff(long maxAttempts, Duration minBackoff) {
* A {@link RetrySpec} preconfigured for a simple strategy with maximum number of retry attempts.
*
* @param max the maximum number of retry attempts to allow
* @return the builder for further configuration
* @return the max attempt spec for further configuration
* @see RetrySpec#maxAttempts(long)
*/
static RetrySpec max(long max) {
//FIXME marble diagram
public static RetrySpec max(long max) {
return new RetrySpec(max, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION,
RetrySpec.RETRY_EXCEPTION_GENERATOR);
}
Expand All @@ -126,13 +162,40 @@ static RetrySpec max(long max) {
* errors resets the counter (see {@link RetrySpec#transientErrors(boolean)}).
*
* @param maxInARow the maximum number of retry attempts to allow in a row, reset by successful onNext
* @return the builder for further configuration
* @return the max in a row spec for further configuration
* @see RetrySpec#maxAttempts(long)
* @see RetrySpec#transientErrors(boolean)
*/
static RetrySpec maxInARow(long maxInARow) {
//FIXME marble diagram, point to it in RetrySpec#transientErrors javadoc
public static RetrySpec maxInARow(long maxInARow) {
return new RetrySpec(maxInARow, t -> true, true, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION,
RETRY_EXCEPTION_GENERATOR);
}

/**
* A {@link RetrySpec} preconfigured for the most simplistic retry strategy: retry immediately and indefinitely
* (similar to {@link Flux#retry()}).
*
* @return the retry indefinitely spec for further configuration
*/
public static RetrySpec indefinitely() {
return new RetrySpec(Long.MAX_VALUE, t -> true, false, NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION,
RetrySpec.RETRY_EXCEPTION_GENERATOR);
}

/**
* A wrapper around {@link Function} to provide {@link Retry} by using lambda expressions.
*
* @param function the {@link Function} representing the desired {@link Retry} strategy as a lambda
* @return the {@link Retry} strategy adapted from the {@link Function}
*/
public static final Retry fromFunction(Function<Flux<RetrySignal>, Publisher<?>> function) {
simonbasle marked this conversation as resolved.
Show resolved Hide resolved
return new Retry() {
@Override
public Publisher<?> generateCompanion(Flux<RetrySignal> retrySignalCompanion) {
return function.apply(retrySignalCompanion);
}
};
}

}