Skip to content

Commit

Permalink
WIP #2063 Add pre/post attempt hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Mar 16, 2020
1 parent c7c97f3 commit 7dfab58
Show file tree
Hide file tree
Showing 5 changed files with 312 additions and 35 deletions.
Expand Up @@ -47,8 +47,9 @@ class ExponentialBackoffFunction extends SimpleRetryFunction {
public Publisher<?> apply(Flux<RetrySignal> t) {
return t.flatMap(retryWhenState -> {
//capture the state immediately
Throwable currentFailure = retryWhenState.failure();
long iteration = isTransientErrors ? retryWhenState.failureSubsequentIndex() : retryWhenState.failureTotalIndex();
RetrySignal copy = retryWhenState.retain();
Throwable currentFailure = copy.failure();
long iteration = isTransientErrors ? copy.failureSubsequentIndex() : copy.failureTotalIndex();

if (currentFailure == null) {
return Mono.error(new IllegalStateException("Retry.RetrySignal#failure() not expected to be null"));
Expand All @@ -75,7 +76,7 @@ public Publisher<?> apply(Flux<RetrySignal> t) {

//short-circuit delay == 0 case
if (nextBackoff.isZero()) {
return Mono.just(iteration);
return applyHooks(copy, Mono.just(iteration));
}

ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down Expand Up @@ -103,7 +104,7 @@ public Publisher<?> apply(Flux<RetrySignal> t) {
jitter = random.nextLong(lowBound, highBound);
}
Duration effectiveBackoff = nextBackoff.plusMillis(jitter);
return Mono.delay(effectiveBackoff, backoffScheduler);
return applyHooks(copy, Mono.delay(effectiveBackoff, backoffScheduler));
});
}
}
161 changes: 141 additions & 20 deletions reactor-core/src/main/java/reactor/util/retry/Retry.java
Expand Up @@ -18,13 +18,16 @@

import java.time.Duration;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import org.reactivestreams.Publisher;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.util.annotation.Nullable;
Expand All @@ -39,6 +42,9 @@ public class Retry {

static final Duration MAX_BACKOFF = Duration.ofMillis(Long.MAX_VALUE);

static final Consumer<RetrySignal> NO_OP_CONSUMER = rs -> {};
static final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> NO_OP_BIFUNCTION = (rs, m) -> m;

/**
* State for a {@link Flux#retry(Supplier) Flux retry} or {@link reactor.core.publisher.Mono#retry(Supplier) Mono retry}.
* The state is passed to the retry function inside a publisher and gives information about the
Expand Down Expand Up @@ -95,7 +101,8 @@ default RetrySignal retain() {
* @see Builder#minBackoff(Duration)
*/
public static Builder backoff(long maxAttempts, Duration minBackoff) {
return new Builder(true, maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel());
return new Builder(true, maxAttempts, t -> true, false, minBackoff, MAX_BACKOFF, 0.5d, Schedulers.parallel(),
NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION);
}

/**
Expand All @@ -106,7 +113,8 @@ public static Builder backoff(long maxAttempts, Duration minBackoff) {
* @see Builder#maxAttempts(long)
*/
public static Builder max(long max) {
return new Builder(false, max, t -> true, false, Duration.ZERO, MAX_BACKOFF, 0d, null);
return new Builder(false, max, t -> true, false, Duration.ZERO, MAX_BACKOFF, 0d,null,
NO_OP_CONSUMER, NO_OP_CONSUMER, NO_OP_BIFUNCTION, NO_OP_BIFUNCTION);
}

/**
Expand Down Expand Up @@ -148,26 +156,34 @@ public static Builder maxInARow(long maxInARow) {
*/
public static class Builder implements Supplier<Function<Flux<RetrySignal>, Publisher<?>>> {

final Duration minBackoff;
final Duration maxBackoff;
final double jitterFactor;
final Duration minBackoff;
final Duration maxBackoff;
final double jitterFactor;
@Nullable
final Scheduler backoffScheduler;

final long maxAttempts;
final Predicate<Throwable> throwablePredicate;
final boolean isTransientErrors;
final boolean isConfiguredForBackoff;
final long maxAttempts;
final Predicate<Throwable> throwablePredicate;
final boolean isTransientErrors;
final boolean isConfiguredForBackoff;

final Consumer<RetrySignal> doPreRetry;
final Consumer<RetrySignal> doPostRetry;
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPreRetry;
final BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry;

/**
* Copy constructor.
*/
Builder(boolean isConfiguredForBackoff,
long max,
Builder(boolean isConfiguredForBackoff, long max,
Predicate<? super Throwable> aThrowablePredicate,
boolean isTransientErrors,
Duration minBackoff, Duration maxBackoff, double jitterFactor,
@Nullable Scheduler backoffScheduler) {
@Nullable Scheduler backoffScheduler,
Consumer<RetrySignal> doPreRetry,
Consumer<RetrySignal> doPostRetry,
BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPreRetry,
BiFunction<RetrySignal, Mono<Void>, Mono<Void>> asyncPostRetry) {
this.isConfiguredForBackoff = isConfiguredForBackoff;
this.maxAttempts = max;
this.throwablePredicate = aThrowablePredicate::test; //massaging type
Expand All @@ -176,6 +192,10 @@ public static class Builder implements Supplier<Function<Flux<RetrySignal>, Publ
this.maxBackoff = maxBackoff;
this.jitterFactor = jitterFactor;
this.backoffScheduler = backoffScheduler;
this.doPreRetry = doPreRetry;
this.doPostRetry = doPostRetry;
this.asyncPreRetry = asyncPreRetry;
this.asyncPostRetry = asyncPostRetry;
}

/**
Expand Down Expand Up @@ -206,7 +226,11 @@ public Builder maxAttempts(long maxAttempts) {
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand All @@ -226,7 +250,11 @@ public Builder throwablePredicate(Predicate<? super Throwable> predicate) {
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand Down Expand Up @@ -261,7 +289,75 @@ public Builder throwablePredicateModifiedWith(
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

public Builder andDoBeforeRetry(Consumer<RetrySignal> doBeforeRetry) {
return new Builder(
this.isConfiguredForBackoff,
this.maxAttempts,
this.throwablePredicate,
this.isTransientErrors,
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler,
this.doPreRetry.andThen(doBeforeRetry),
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

public Builder andDoAfterRetry(Consumer<RetrySignal> doAfterRetry) {
return new Builder(
this.isConfiguredForBackoff,
this.maxAttempts,
this.throwablePredicate,
this.isTransientErrors,
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry.andThen(doAfterRetry),
this.asyncPreRetry,
this.asyncPostRetry);
}

public Builder andDelayRetryWith(Function<RetrySignal, Mono<Void>> doAsyncBeforeRetry) {
return new Builder(
this.isConfiguredForBackoff,
this.maxAttempts,
this.throwablePredicate,
this.isTransientErrors,
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
(rs, m) -> asyncPreRetry.apply(rs, m).then(doAsyncBeforeRetry.apply(rs)),
this.asyncPostRetry);
}

public Builder andRetryThen(Function<RetrySignal, Mono<Void>> doAsyncAfterRetry) {
return new Builder(
this.isConfiguredForBackoff,
this.maxAttempts,
this.throwablePredicate,
this.isTransientErrors,
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
(rs, m) -> asyncPostRetry.apply(rs, m).then(doAsyncAfterRetry.apply(rs)));
}

/**
Expand All @@ -287,7 +383,11 @@ public Builder transientErrors(boolean isTransientErrors) {
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand All @@ -307,7 +407,11 @@ public Builder minBackoff(Duration minBackoff) {
Objects.requireNonNull(minBackoff, "minBackoff"),
this.maxBackoff,
this.jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand All @@ -327,7 +431,11 @@ public Builder maxBackoff(Duration maxBackoff) {
this.minBackoff,
Objects.requireNonNull(maxBackoff, "maxBackoff"),
this.jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand All @@ -348,7 +456,11 @@ public Builder jitter(double jitterFactor) {
this.minBackoff,
this.maxBackoff,
jitterFactor,
this.backoffScheduler);
this.backoffScheduler,
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand All @@ -369,7 +481,11 @@ public Builder scheduler(Scheduler backoffScheduler) {
this.minBackoff,
this.maxBackoff,
this.jitterFactor,
Objects.requireNonNull(backoffScheduler, "backoffScheduler"));
Objects.requireNonNull(backoffScheduler, "backoffScheduler"),
this.doPreRetry,
this.doPostRetry,
this.asyncPreRetry,
this.asyncPostRetry);
}

/**
Expand Down Expand Up @@ -418,5 +534,10 @@ public Throwable failure() {
public RetrySignal retain() {
return this;
}

@Override
public String toString() {
return "attempt #" + (failureTotalIndex + 1) + " (" + (failureSubsequentIndex + 1) + " in a row), last failure={" + failure + '}';
}
}
}

0 comments on commit 7dfab58

Please sign in to comment.