From 7dfab58868acbbef7758aa5989427394c95e4689 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Simon=20Basl=C3=A9?= Date: Tue, 18 Feb 2020 18:46:38 +0100 Subject: [PATCH] WIP #2063 Add pre/post attempt hooks --- .../retry/ExponentialBackoffFunction.java | 9 +- .../main/java/reactor/util/retry/Retry.java | 161 +++++++++++++++--- .../util/retry/SimpleRetryFunction.java | 56 ++++-- .../core/publisher/FluxRetryWhenTest.java | 49 ++++++ .../reactor/util/retry/RetryBuilderTest.java | 72 +++++++- 5 files changed, 312 insertions(+), 35 deletions(-) diff --git a/reactor-core/src/main/java/reactor/util/retry/ExponentialBackoffFunction.java b/reactor-core/src/main/java/reactor/util/retry/ExponentialBackoffFunction.java index c9a0afa42f..c500882ed2 100644 --- a/reactor-core/src/main/java/reactor/util/retry/ExponentialBackoffFunction.java +++ b/reactor-core/src/main/java/reactor/util/retry/ExponentialBackoffFunction.java @@ -47,8 +47,9 @@ class ExponentialBackoffFunction extends SimpleRetryFunction { public Publisher apply(Flux t) { return t.flatMap(retryWhenState -> { //capture the state immediately - Throwable currentFailure = retryWhenState.failure(); - long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex(); + RetrySignal copy = retryWhenState.retain(); + Throwable currentFailure = copy.failure(); + long iteration = isTransientErrors ? copy.failureSubsequentIndex() : copy.failureTotalIndex(); if (currentFailure == null) { return Mono.error(new IllegalStateException("Retry.RetrySignal#failure() not expected to be null")); @@ -75,7 +76,7 @@ public Publisher apply(Flux t) { //short-circuit delay == 0 case if (nextBackoff.isZero()) { - return Mono.just(iteration); + return applyHooks(copy, Mono.just(iteration)); } ThreadLocalRandom random = ThreadLocalRandom.current(); @@ -103,7 +104,7 @@ public Publisher apply(Flux t) { jitter = random.nextLong(lowBound, highBound); } Duration effectiveBackoff = nextBackoff.plusMillis(jitter); - return Mono.delay(effectiveBackoff, backoffScheduler); + return applyHooks(copy, Mono.delay(effectiveBackoff, backoffScheduler)); }); } } diff --git a/reactor-core/src/main/java/reactor/util/retry/Retry.java b/reactor-core/src/main/java/reactor/util/retry/Retry.java index d43bd3c9ae..321a2f6397 100644 --- a/reactor-core/src/main/java/reactor/util/retry/Retry.java +++ b/reactor-core/src/main/java/reactor/util/retry/Retry.java @@ -18,6 +18,8 @@ import java.time.Duration; import java.util.Objects; +import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -25,6 +27,7 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import reactor.util.annotation.Nullable; @@ -39,6 +42,9 @@ public class Retry { static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE); + static final Consumer NO_OP_CONSUMER = rs -> {}; + static final BiFunction, Mono> NO_OP_BIFUNCTION = (rs, m) -> m; + /** * State for a {@link Flux#retry(Supplier) Flux retry} or {@link reactor.core.publisher.Mono#retry(Supplier) Mono retry}. * The state is passed to the retry function inside a publisher and gives information about the @@ -95,7 +101,8 @@ default RetrySignal retain() { * @see Builder#minBackoff(Duration) */ public static Builder backoff(long maxAttempts, Duration minBackoff) { - return new Builder(true, maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel()); + return new Builder(true, maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel(), + NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION); } /** @@ -106,7 +113,8 @@ public static Builder backoff(long maxAttempts, Duration minBackoff) { * @see Builder#maxAttempts(long) */ public static Builder max(long max) { - return new Builder(false, max, t -> true, false, Duration.ZERO, MAX_BACKOFF, 0d, null); + return new Builder(false, max, t -> true, false, Duration.ZERO, MAX_BACKOFF, 0d,null, + NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION); } /** @@ -148,26 +156,34 @@ public static Builder maxInARow(long maxInARow) { */ public static class Builder implements Supplier, Publisher>> { - final Duration minBackoff; - final Duration maxBackoff; - final double jitterFactor; + final Duration minBackoff; + final Duration maxBackoff; + final double jitterFactor; @Nullable final Scheduler backoffScheduler; - final long maxAttempts; - final Predicate throwablePredicate; - final boolean isTransientErrors; - final boolean isConfiguredForBackoff; + final long maxAttempts; + final Predicate throwablePredicate; + final boolean isTransientErrors; + final boolean isConfiguredForBackoff; + + final Consumer doPreRetry; + final Consumer doPostRetry; + final BiFunction, Mono> asyncPreRetry; + final BiFunction, Mono> asyncPostRetry; /** * Copy constructor. */ - Builder(boolean isConfiguredForBackoff, - long max, + Builder(boolean isConfiguredForBackoff, long max, Predicate aThrowablePredicate, boolean isTransientErrors, Duration minBackoff, Duration maxBackoff, double jitterFactor, - @Nullable Scheduler backoffScheduler) { + @Nullable Scheduler backoffScheduler, + Consumer doPreRetry, + Consumer doPostRetry, + BiFunction, Mono> asyncPreRetry, + BiFunction, Mono> asyncPostRetry) { this.isConfiguredForBackoff = isConfiguredForBackoff; this.maxAttempts = max; this.throwablePredicate = aThrowablePredicate::test; //massaging type @@ -176,6 +192,10 @@ public static class Builder implements Supplier, Publ this.maxBackoff = maxBackoff; this.jitterFactor = jitterFactor; this.backoffScheduler = backoffScheduler; + this.doPreRetry = doPreRetry; + this.doPostRetry = doPostRetry; + this.asyncPreRetry = asyncPreRetry; + this.asyncPostRetry = asyncPostRetry; } /** @@ -206,7 +226,11 @@ public Builder maxAttempts(long maxAttempts) { this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -226,7 +250,11 @@ public Builder throwablePredicate(Predicate predicate) { this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -261,7 +289,75 @@ public Builder throwablePredicateModifiedWith( this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); + } + + public Builder andDoBeforeRetry(Consumer doBeforeRetry) { + return new Builder( + this.isConfiguredForBackoff, + this.maxAttempts, + this.throwablePredicate, + this.isTransientErrors, + this.minBackoff, + this.maxBackoff, + this.jitterFactor, + this.backoffScheduler, + this.doPreRetry.andThen(doBeforeRetry), + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); + } + + public Builder andDoAfterRetry(Consumer doAfterRetry) { + return new Builder( + this.isConfiguredForBackoff, + this.maxAttempts, + this.throwablePredicate, + this.isTransientErrors, + this.minBackoff, + this.maxBackoff, + this.jitterFactor, + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry.andThen(doAfterRetry), + this.asyncPreRetry, + this.asyncPostRetry); + } + + public Builder andDelayRetryWith(Function> doAsyncBeforeRetry) { + return new Builder( + this.isConfiguredForBackoff, + this.maxAttempts, + this.throwablePredicate, + this.isTransientErrors, + this.minBackoff, + this.maxBackoff, + this.jitterFactor, + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + (rs, m) -> asyncPreRetry.apply(rs, m).then(doAsyncBeforeRetry.apply(rs)), + this.asyncPostRetry); + } + + public Builder andRetryThen(Function> doAsyncAfterRetry) { + return new Builder( + this.isConfiguredForBackoff, + this.maxAttempts, + this.throwablePredicate, + this.isTransientErrors, + this.minBackoff, + this.maxBackoff, + this.jitterFactor, + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + (rs, m) -> asyncPostRetry.apply(rs, m).then(doAsyncAfterRetry.apply(rs))); } /** @@ -287,7 +383,11 @@ public Builder transientErrors(boolean isTransientErrors) { this.minBackoff, this.maxBackoff, this.jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -307,7 +407,11 @@ public Builder minBackoff(Duration minBackoff) { Objects.requireNonNull(minBackoff, "minBackoff"), this.maxBackoff, this.jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -327,7 +431,11 @@ public Builder maxBackoff(Duration maxBackoff) { this.minBackoff, Objects.requireNonNull(maxBackoff, "maxBackoff"), this.jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -348,7 +456,11 @@ public Builder jitter(double jitterFactor) { this.minBackoff, this.maxBackoff, jitterFactor, - this.backoffScheduler); + this.backoffScheduler, + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -369,7 +481,11 @@ public Builder scheduler(Scheduler backoffScheduler) { this.minBackoff, this.maxBackoff, this.jitterFactor, - Objects.requireNonNull(backoffScheduler, "backoffScheduler")); + Objects.requireNonNull(backoffScheduler, "backoffScheduler"), + this.doPreRetry, + this.doPostRetry, + this.asyncPreRetry, + this.asyncPostRetry); } /** @@ -418,5 +534,10 @@ public Throwable failure() { public RetrySignal retain() { return this; } + + @Override + public String toString() { + return "attempt #" + (failureTotalIndex + 1) + " (" + (failureSubsequentIndex + 1) + " in a row), last failure={" + failure + '}'; + } } } diff --git a/reactor-core/src/main/java/reactor/util/retry/SimpleRetryFunction.java b/reactor-core/src/main/java/reactor/util/retry/SimpleRetryFunction.java index 6627ee55f8..d55b406614 100644 --- a/reactor-core/src/main/java/reactor/util/retry/SimpleRetryFunction.java +++ b/reactor-core/src/main/java/reactor/util/retry/SimpleRetryFunction.java @@ -16,44 +16,80 @@ package reactor.util.retry; +import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; class SimpleRetryFunction implements Function, Publisher> { - final long maxAttempts; - final Predicate throwablePredicate; - final boolean isTransientErrors; + final long maxAttempts; + final Predicate throwablePredicate; + final boolean isTransientErrors; + final Consumer doPreRetry; + final Consumer doPostRetry; + final BiFunction, Mono> asyncPreRetry; + final BiFunction, Mono> asyncPostRetry; SimpleRetryFunction(Retry.Builder builder) { this.maxAttempts = builder.maxAttempts; this.throwablePredicate = builder.throwablePredicate; this.isTransientErrors = builder.isTransientErrors; + this.doPreRetry = builder.doPreRetry; + this.doPostRetry = builder.doPostRetry; + this.asyncPreRetry = builder.asyncPreRetry; + this.asyncPostRetry= builder.asyncPostRetry; } @Override public Publisher apply(Flux flux) { - return flux.handle((retryWhenState, sink) -> { + return flux.flatMap(retryWhenState -> { //capture the state immediately - Throwable currentFailure = retryWhenState.failure(); - long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex(); + Retry.RetrySignal copy = retryWhenState.retain(); + Throwable currentFailure = copy.failure(); + long iteration = isTransientErrors ? copy.failureSubsequentIndex() : copy.failureTotalIndex(); if (currentFailure == null) { - sink.error(new IllegalStateException("RetryWhenState#failure() not expected to be null")); + return Mono.error(new IllegalStateException("RetryWhenState#failure() not expected to be null")); } else if (!throwablePredicate.test(currentFailure)) { - sink.error(currentFailure); + return Mono.error(currentFailure); } else if (iteration >= maxAttempts) { - sink.error(new IllegalStateException("Retries exhausted: " + iteration + "/" + maxAttempts, currentFailure)); + return Mono.error(new IllegalStateException("Retries exhausted: " + iteration + "/" + maxAttempts, currentFailure)); } else { - sink.next(iteration); + return applyHooks(copy, Mono.just(iteration)); } }); } + + protected Mono applyHooks(Retry.RetrySignal copyOfSignal, Mono originalCompanion) { + if (doPreRetry != Retry.NO_OP_CONSUMER) { + try { + doPreRetry.accept(copyOfSignal); + } + catch (Throwable e) { + return Mono.error(e); + } + } + + Mono postRetrySyncMono; + if (doPostRetry != Retry.NO_OP_CONSUMER) { + postRetrySyncMono = Mono.fromRunnable(() -> doPostRetry.accept(copyOfSignal)); + } + else { + postRetrySyncMono = Mono.empty(); + } + + Mono preRetryMono = asyncPreRetry == Retry.NO_OP_BIFUNCTION ? Mono.empty() : asyncPreRetry.apply(copyOfSignal, Mono.empty()); + Mono postRetryMono = asyncPostRetry != Retry.NO_OP_BIFUNCTION ? asyncPostRetry.apply(copyOfSignal, postRetrySyncMono) : postRetrySyncMono; + + return preRetryMono.then(originalCompanion).flatMap(postRetryMono::thenReturn); + } } diff --git a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java index 37b3b09bfb..f5ac6a8c89 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/FluxRetryWhenTest.java @@ -821,4 +821,53 @@ public void gh1978() { assertThat(pauses).allMatch(p -> p == 1, "pause is constantly 1s"); } + + @Test + public void cumulatedRetryHooks() { + AtomicInteger beforeHookTracker = new AtomicInteger(); + AtomicInteger afterHookTracker = new AtomicInteger(); + + Retry.Builder builder = Retry + .max(1) + .andDoBeforeRetry(s -> System.out.println("About to retry: " + s)) + .andDoBeforeRetry(s -> System.out.println("About to retry, tracking " + beforeHookTracker.incrementAndGet())) + .andDoAfterRetry(s -> System.out.println("Did retry: " + s)) + .andDoAfterRetry(s -> System.out.println("Did to retry, tracking " + afterHookTracker.incrementAndGet())) + .andDelayRetryWith(s -> Mono.delay(Duration.ofSeconds(1)).then()) + .andDelayRetryWith(s -> Mono.fromRunnable(() -> beforeHookTracker.addAndGet(100))) + .andRetryThen(s -> Mono.delay(Duration.ofSeconds(1)).doOnNext(delayed -> System.out.println("Post retry async " + s)).then()) + .andRetryThen(s -> Mono.fromRunnable(() -> afterHookTracker.addAndGet(100))); + + Mono.error(new IllegalStateException("boom")) + .retry(builder) + .as(StepVerifier::create) + .verifyError(); + + assertThat(beforeHookTracker).hasValue(101); + assertThat(afterHookTracker).hasValue(101); + } + + @Test + public void cumulatedRetryHooksWithTransient() { + AtomicInteger beforeHookTracker = new AtomicInteger(); + AtomicInteger afterHookTracker = new AtomicInteger(); + + Retry.Builder builder = Retry + .maxInARow(2) + .andDoBeforeRetry(s -> System.out.println("\nAbout to retry: " + s)) + .andDoBeforeRetry(s -> System.out.println("About to retry, tracking " + beforeHookTracker.incrementAndGet())) + .andDoAfterRetry(s -> System.out.println("Did retry: " + s)) + .andDoAfterRetry(s -> System.out.println("Did retry, tracking " + afterHookTracker.incrementAndGet())) + .andDelayRetryWith(s -> Mono.delay(Duration.ofMillis(100)).doOnNext(delayed -> System.out.println("Pre retry delay: " + s)).then()) + .andDelayRetryWith(s -> Mono.fromRunnable(() -> System.out.println("Pre retry delay, tracking " + beforeHookTracker.addAndGet(100)))) + .andRetryThen(s -> Mono.delay(Duration.ofMillis(100)).doOnNext(delayed -> System.out.println("Did retry async: " + s)).then()) + .andRetryThen(s -> Mono.fromRunnable(() -> System.out.println("Did retry async, tracking " + afterHookTracker.addAndGet(100)))); + + transientErrorSource() + .retry(builder) + .blockLast(); + + assertThat(beforeHookTracker).as("before hooks cumulated").hasValue(606); + assertThat(afterHookTracker).as("after hooks cumulated").hasValue(606); + } } diff --git a/reactor-core/src/test/java/reactor/util/retry/RetryBuilderTest.java b/reactor-core/src/test/java/reactor/util/retry/RetryBuilderTest.java index 6306d64558..c644d2b50a 100644 --- a/reactor-core/src/test/java/reactor/util/retry/RetryBuilderTest.java +++ b/reactor-core/src/test/java/reactor/util/retry/RetryBuilderTest.java @@ -24,6 +24,7 @@ import org.junit.Test; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.test.StepVerifierOptions; @@ -120,7 +121,11 @@ public void builderMethodsProduceNewInstances() { .isNotSameAs(init.maxAttempts(10)) .isNotSameAs(init.throwablePredicate(t -> true)) .isNotSameAs(init.throwablePredicateModifiedWith(predicate -> predicate.and(t -> true))) - .isNotSameAs(init.transientErrors(true)); + .isNotSameAs(init.transientErrors(true)) + .isNotSameAs(init.andDoBeforeRetry(rs -> {})) + .isNotSameAs(init.andDoAfterRetry(rs -> {})) + .isNotSameAs(init.andDelayRetryWith(rs -> Mono.empty())) + .isNotSameAs(init.andRetryThen(rs -> Mono.empty())); } @Test @@ -208,4 +213,69 @@ public void throwablePredicateModifierRejectsNullFunction() { .withMessage("predicateAdjuster"); } + @Test + public void doBeforeRetryIsCumulative() { + AtomicInteger atomic = new AtomicInteger(); + Retry.Builder builder = Retry + .max(1) + .andDoBeforeRetry(rs -> atomic.incrementAndGet()) + .andDoBeforeRetry(rs -> atomic.addAndGet(100)); + + builder.doPreRetry.accept(null); + + assertThat(atomic).hasValue(101); + } + + @Test + public void doAfterRetryIsCumulative() { + AtomicInteger atomic = new AtomicInteger(); + Retry.Builder builder = Retry + .max(1) + .andDoAfterRetry(rs -> atomic.incrementAndGet()) + .andDoAfterRetry(rs -> atomic.addAndGet(100)); + + builder.doPostRetry.accept(null); + + assertThat(atomic).hasValue(101); + } + + @Test + public void delayRetryWithIsCumulative() { + AtomicInteger atomic = new AtomicInteger(); + Retry.Builder builder = Retry + .max(1) + .andDelayRetryWith(rs -> Mono.fromRunnable(atomic::incrementAndGet)) + .andDelayRetryWith(rs -> Mono.fromRunnable(() -> atomic.addAndGet(100))); + + builder.asyncPreRetry.apply(null, Mono.empty()).block(); + + assertThat(atomic).hasValue(101); + } + + @Test + public void retryThenIsCumulative() { + AtomicInteger atomic = new AtomicInteger(); + Retry.Builder builder = Retry + .max(1) + .andRetryThen(rs -> Mono.fromRunnable(atomic::incrementAndGet)) + .andRetryThen(rs -> Mono.fromRunnable(() -> atomic.addAndGet(100))); + + builder.asyncPostRetry.apply(null, Mono.empty()).block(); + + assertThat(atomic).hasValue(101); + } + + @Test + public void settingHooksDoesntInduceBackoff() { + final Retry.Builder builder = Retry + .max(1) + .andDoBeforeRetry(rs -> { }) + .andDoAfterRetry(rs -> { }) + .andDelayRetryWith(rs -> Mono.empty()) + .andRetryThen(rs -> Mono.empty()); + + assertThat(builder).isNot(PRODUCING_BACKOFF_FUNCTION) + .isNot(CONFIGURED_FOR_BACKOFF); + } + } \ No newline at end of file