Skip to content

Commit

Permalink
Add better support for potentially blocked operations
Browse files Browse the repository at this point in the history
- Add better support for potentially blocked operations, such as when scheduled executions, fallbacks, or retries are blocked due to thread limitations. Ensure that promises still complete as expected.
- Add test coverage for blocked operation scenarios.
- Add test for issue 260
- Add test for issue 231
  • Loading branch information
jhalterman committed Aug 9, 2021
1 parent 65f4bb1 commit 449d741
Show file tree
Hide file tree
Showing 15 changed files with 144 additions and 79 deletions.
24 changes: 16 additions & 8 deletions src/main/java/net/jodah/failsafe/AbstractExecution.java
Expand Up @@ -34,11 +34,15 @@ public abstract class AbstractExecution extends ExecutionContext {
final FailsafeExecutor<Object> executor;
final List<PolicyExecutor<Policy<Object>>> policyExecutors;

enum Status {
NOT_RUNNING, RUNNING, TIMED_OUT
}

// Internally mutable state
/* The status of an execution */
volatile Status status = Status.NOT_RUNNING;
/* Whether the execution attempt has been recorded */
volatile boolean attemptRecorded;
/* Whether the execution result has been recorded */
volatile boolean executionRecorded;
/* Whether a result has been post-executed */
volatile boolean resultHandled;
/* Whether the execution can be interrupted */
Expand Down Expand Up @@ -73,15 +77,19 @@ public abstract class AbstractExecution extends ExecutionContext {
* @throws IllegalStateException if the execution is already complete
*/
void record(ExecutionResult result) {
record(result, false);
}

void record(ExecutionResult result, boolean timeout) {
Assert.state(!completed, "Execution has already been completed");
if (!interrupted) {
recordAttempt();
if (!executionRecorded) {
if (Status.RUNNING.equals(status)) {
lastResult = result.getResult();
lastFailure = result.getFailure();
executions.incrementAndGet();
executionRecorded = true;
status = timeout ? Status.TIMED_OUT : Status.NOT_RUNNING;
}
lastResult = result.getResult();
lastFailure = result.getFailure();
}
}

Expand All @@ -96,12 +104,12 @@ void recordAttempt() {
}
}

void preExecute() {
synchronized void preExecute() {
attemptStartTime = Duration.ofNanos(System.nanoTime());
if (startTime == Duration.ZERO)
startTime = attemptStartTime;
status = Status.RUNNING;
attemptRecorded = false;
executionRecorded = false;
resultHandled = false;
cancelledIndex = 0;
canInterrupt = true;
Expand Down
9 changes: 6 additions & 3 deletions src/main/java/net/jodah/failsafe/AsyncExecution.java
Expand Up @@ -21,6 +21,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand All @@ -46,7 +47,7 @@ <T> AsyncExecution(Scheduler scheduler, FailsafeFuture<T> future, FailsafeExecut

void inject(Supplier<CompletableFuture<ExecutionResult>> syncSupplier, boolean asyncExecution) {
if (!asyncExecution) {
outerExecutionSupplier = Functions.getPromiseAsync(syncSupplier, scheduler, future);
outerExecutionSupplier = Functions.getPromiseAsync(syncSupplier, scheduler, this);
} else {
outerExecutionSupplier = innerExecutionSupplier = Functions.toSettableSupplier(syncSupplier);
}
Expand Down Expand Up @@ -181,8 +182,10 @@ ExecutionResult postExecute(ExecutionResult result) {
void executeAsync(boolean asyncExecution) {
if (!asyncExecution)
outerExecutionSupplier.get().whenComplete(this::complete);
else
future.injectPolicy(scheduler.schedule(innerExecutionSupplier::get, 0, TimeUnit.NANOSECONDS));
else {
Future<?> scheduledSupply = scheduler.schedule(innerExecutionSupplier::get, 0, TimeUnit.NANOSECONDS);
future.injectCancelFn((mayInterrupt, result) -> scheduledSupply.cancel(mayInterrupt));
}
}

/**
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/net/jodah/failsafe/FailsafeExecutor.java
Expand Up @@ -31,7 +31,8 @@
/**
* <p>
* An executor that handles failures according to configured {@link FailurePolicy policies}. Can be created via {@link
* Failsafe#with(Policy[])}.
* Failsafe#with(Policy, Policy[])} to support policy based execution failure handling, or {@link Failsafe#none()} to
* support execution with no failure handling.
* <p>
* Async executions are run by default on the {@link ForkJoinPool#commonPool()}. Alternative executors can be configured
* via {@link #with(ScheduledExecutorService)} and similar methods. All async executions are cancellable and
Expand Down Expand Up @@ -402,7 +403,7 @@ private <T> T call(Function<Execution, CheckedSupplier<?>> supplierFn) {
* @throws NullPointerException if the {@code supplierFn} is null
* @throws RejectedExecutionException if the {@code supplierFn} cannot be scheduled for execution
*/
@SuppressWarnings("unchecked")
@SuppressWarnings({ "unchecked", "rawtypes" })
private <T> CompletableFuture<T> callAsync(
Function<AsyncExecution, Supplier<CompletableFuture<ExecutionResult>>> supplierFn, boolean asyncExecution) {
FailsafeFuture<T> future = new FailsafeFuture(this);
Expand Down
35 changes: 10 additions & 25 deletions src/main/java/net/jodah/failsafe/FailsafeFuture.java
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;

/**
* A CompletableFuture implementation that propagates cancellations and calls completion handlers.
Expand All @@ -34,9 +35,8 @@ public class FailsafeFuture<T> extends CompletableFuture<T> {
private AbstractExecution execution;

// Mutable state, guarded by "this"
private Future<T> policyExecFuture;
private Future<?> dependentStageFuture;
private Runnable cancelFn;
private BiConsumer<Boolean, ExecutionResult> cancelFn;
private List<Future<T>> timeoutFutures;
private boolean cancelWithInterrupt;

Expand Down Expand Up @@ -72,7 +72,7 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) {
this.cancelWithInterrupt = mayInterruptIfRunning;
execution.cancelledIndex = Integer.MAX_VALUE;
boolean cancelResult = super.cancel(mayInterruptIfRunning);
cancelResult = cancelDependencies(mayInterruptIfRunning, cancelResult);
cancelDependencies(mayInterruptIfRunning, null);
ExecutionResult result = ExecutionResult.failure(new CancellationException());
super.completeExceptionally(result.getFailure());
executor.handleComplete(result, execution);
Expand All @@ -98,46 +98,31 @@ synchronized boolean completeResult(ExecutionResult result) {
return completed;
}

synchronized Future<T> getDependency() {
return policyExecFuture;
}

synchronized List<Future<T>> getTimeoutDelegates() {
return timeoutFutures;
}

/**
* Cancels the dependency passing in the {@code interruptDelegate} flag, applies the retry cancel fn, and cancels all
* Cancels the dependency passing in the {@code mayInterrupt} flag, applies the retry cancel fn, and cancels all
* timeout dependencies.
*/
synchronized boolean cancelDependencies(boolean interruptDelegate, boolean result) {
execution.interrupted = interruptDelegate;
if (policyExecFuture != null)
result = policyExecFuture.cancel(interruptDelegate);
synchronized void cancelDependencies(boolean mayInterrupt, ExecutionResult cancelResult) {
execution.interrupted = mayInterrupt;
if (dependentStageFuture != null)
dependentStageFuture.cancel(interruptDelegate);
if (cancelFn != null)
cancelFn.run();
dependentStageFuture.cancel(mayInterrupt);
if (timeoutFutures != null) {
for (Future<T> timeoutDelegate : timeoutFutures)
timeoutDelegate.cancel(false);
timeoutFutures.clear();
}
return result;
if (cancelFn != null)
cancelFn.accept(mayInterrupt, cancelResult);
}

void inject(AbstractExecution execution) {
this.execution = execution;
}

/**
* Injects a {@code policyExecFuture} to be cancelled when this future is cancelled.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
synchronized void injectPolicy(Future<?> policyExecFuture) {
this.policyExecFuture = (Future) policyExecFuture;
}

/**
* Injects a {@code dependentStageFuture} to be cancelled when this future is cancelled.
*/
Expand All @@ -152,7 +137,7 @@ synchronized void injectStage(Future<?> dependentStageFuture) {
/**
* Injects a {@code cancelFn} to be called when this future is cancelled.
*/
synchronized void injectCancelFn(Runnable cancelFn) {
synchronized void injectCancelFn(BiConsumer<Boolean, ExecutionResult> cancelFn) {
this.cancelFn = cancelFn;
}

Expand Down
9 changes: 4 additions & 5 deletions src/main/java/net/jodah/failsafe/FallbackExecutor.java
Expand Up @@ -96,12 +96,11 @@ protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
else {
Future<?> scheduledFallback = scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS);

// Propagate cancellation to the scheduled retry and promise
future.injectCancelFn(() -> {
System.out.println("cancelling scheduled fallback isdone: " + scheduledFallback.isDone());
scheduledFallback.cancel(false);
// Propagate cancellation to the scheduled fallback and its promise
future.injectCancelFn((mayInterrupt, promiseResult) -> {
scheduledFallback.cancel(mayInterrupt);
if (executionCancelled())
promise.complete(null);
promise.complete(promiseResult);
});
}
} catch (Throwable t) {
Expand Down
15 changes: 13 additions & 2 deletions src/main/java/net/jodah/failsafe/Functions.java
Expand Up @@ -15,6 +15,7 @@
*/
package net.jodah.failsafe;

import net.jodah.failsafe.AbstractExecution.Status;
import net.jodah.failsafe.function.*;
import net.jodah.failsafe.internal.util.Assert;
import net.jodah.failsafe.util.concurrent.Scheduler;
Expand Down Expand Up @@ -90,7 +91,8 @@ static <T> Supplier<CompletableFuture<ExecutionResult>> getPromise(ContextualSup
* calls, and returns a promise containing the result.
*/
static Supplier<CompletableFuture<ExecutionResult>> getPromiseAsync(
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, FailsafeFuture<Object> future) {
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, AsyncExecution execution) {

AtomicBoolean scheduled = new AtomicBoolean();
return () -> {
if (scheduled.get()) {
Expand All @@ -106,7 +108,16 @@ static Supplier<CompletableFuture<ExecutionResult>> getPromiseAsync(

try {
scheduled.set(true);
future.injectPolicy(scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS));
Future<?> scheduledSupply = scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS);

// Propagate cancellation to the scheduled supplier and its promise
execution.future.injectCancelFn((mayInterrupt, cancelResult) -> {
scheduledSupply.cancel(mayInterrupt);

// Cancel a pending promise if the execution is not yet running
if (Status.NOT_RUNNING.equals(execution.status))
promise.complete(cancelResult);
});
} catch (Throwable t) {
promise.completeExceptionally(t);
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/net/jodah/failsafe/PolicyExecutor.java
Expand Up @@ -31,7 +31,8 @@
public abstract class PolicyExecutor<P extends Policy> {
protected final P policy;
protected final AbstractExecution execution;
/* Index of the policy relative to other policies in a composition, inner-most first */ int policyIndex;
// Index of the policy relative to other policies in a composition, inner-most first
int policyIndex;

protected PolicyExecutor(P policy, AbstractExecution execution) {
this.policy = policy;
Expand Down
20 changes: 14 additions & 6 deletions src/main/java/net/jodah/failsafe/RetryPolicyExecutor.java
Expand Up @@ -21,6 +21,7 @@
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

Expand Down Expand Up @@ -114,14 +115,18 @@ public Object call() {
supplier.get().whenComplete((result, error) -> {
if (error != null)
promise.completeExceptionally(error);
else if (result != null) {
else if (result == null)
promise.complete(null);
else {
if (retriesExceeded || executionCancelled()) {
promise.complete(result);
} else {
postExecuteAsync(result, scheduler, future).whenComplete((postResult, postError) -> {
if (postError != null)
promise.completeExceptionally(postError);
else if (postResult != null) {
else if (postResult == null)
promise.complete(null);
else {
if (postResult.isComplete() || executionCancelled()) {
promise.complete(postResult);
} else {
Expand All @@ -133,11 +138,14 @@ else if (postResult != null) {
retryScheduledListener.handle(postResult, execution);

previousResult = postResult;
future.injectPolicy(scheduler.schedule(this, postResult.getWaitNanos(), TimeUnit.NANOSECONDS));
future.injectCancelFn(() -> {
// Ensure that the promise completes if a scheduled retry is cancelled
Future<?> scheduledRetry = scheduler.schedule(this, postResult.getWaitNanos(),
TimeUnit.NANOSECONDS);

// Propagate cancellation to the scheduled retry and its promise
future.injectCancelFn((mayInterrupt, cancelResult) -> {
scheduledRetry.cancel(mayInterrupt);
if (executionCancelled())
promise.complete(null);
promise.complete(cancelResult);
});
} catch (Throwable t) {
// Hard scheduling failure
Expand Down
12 changes: 6 additions & 6 deletions src/main/java/net/jodah/failsafe/TimeoutExecutor.java
Expand Up @@ -73,7 +73,7 @@ protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, S
// Guard against race with the execution completing
synchronized (execution) {
if (execution.canInterrupt) {
execution.record(result.get());
execution.record(result.get(), true);
execution.interrupted = true;
executionThread.interrupt();
}
Expand Down Expand Up @@ -115,17 +115,17 @@ protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
try {
// Schedule timeout check
timeoutFuture.set(Scheduler.DEFAULT.schedule(() -> {
// Guard against race with execution completion
if (executionResult.compareAndSet(null,
ExecutionResult.failure(new TimeoutExceededException(policy)))) {
ExecutionResult cancelResult = ExecutionResult.failure(new TimeoutExceededException(policy));

// Guard against race with execution completion
if (executionResult.compareAndSet(null, cancelResult)) {
boolean canInterrupt = policy.canInterrupt();
if (canInterrupt)
execution.record(executionResult.get());
execution.record(executionResult.get(), true);

// Cancel and interrupt
execution.cancelledIndex = policyIndex;
future.cancelDependencies(canInterrupt, false);
future.cancelDependencies(canInterrupt, cancelResult);
}
return null;
}, policy.getTimeout().toNanos(), TimeUnit.NANOSECONDS));
Expand Down
2 changes: 1 addition & 1 deletion src/test/java/net/jodah/failsafe/AbstractFailsafeTest.java
Expand Up @@ -347,7 +347,7 @@ public void shouldTimeout() throws Throwable {
};

// When / Then
FailsafeExecutor<Object> failsafe = Failsafe.with(rp, timeout).onSuccess(e -> {
FailsafeExecutor<Object> failsafe = Failsafe.with(rp, timeout).onComplete(e -> {
waiter.assertEquals(e.getAttemptCount(), 3);
waiter.assertEquals(e.getExecutionCount(), 3);
waiter.assertEquals("foo2", e.getResult());
Expand Down

0 comments on commit 449d741

Please sign in to comment.