/
FallbackExecutor.java
122 lines (108 loc) · 4.24 KB
/
FallbackExecutor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/*
* Copyright 2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License
*/
package net.jodah.failsafe;
import net.jodah.failsafe.internal.EventListener;
import net.jodah.failsafe.util.concurrent.Scheduler;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* A PolicyExecutor that handles failures according to a {@link Fallback}.
*
* @param <R> result type
*/
class FallbackExecutor<R> extends PolicyExecutor<R, Fallback<R>> {
private final EventListener failedAttemptListener;
FallbackExecutor(Fallback<R> fallback, AbstractExecution<R> execution, EventListener failedAttemptListener) {
super(fallback, execution);
this.failedAttemptListener = failedAttemptListener;
}
/**
* Performs an execution by calling pre-execute else calling the supplier, applying a fallback if it fails, and
* calling post-execute.
*/
@Override
@SuppressWarnings("unchecked")
protected Supplier<ExecutionResult> supply(Supplier<ExecutionResult> supplier, Scheduler scheduler) {
return () -> {
ExecutionResult result = supplier.get();
if (executionCancelled())
return result;
if (isFailure(result)) {
if (failedAttemptListener != null)
failedAttemptListener.handle(result, execution);
try {
result = policy == Fallback.VOID ?
result.withNonResult() :
result.withResult(policy.apply((R) result.getResult(), result.getFailure(), execution.copy()));
} catch (Throwable t) {
result = ExecutionResult.failure(t);
}
}
return postExecute(result);
};
}
/**
* Performs an async execution by calling pre-execute else calling the supplier and doing a post-execute.
*/
@Override
@SuppressWarnings("unchecked")
protected Supplier<CompletableFuture<ExecutionResult>> supplyAsync(
Supplier<CompletableFuture<ExecutionResult>> supplier, Scheduler scheduler, FailsafeFuture<R> future) {
return () -> supplier.get().thenCompose(result -> {
if (result == null || future.isDone())
return ExecutionResult.NULL_FUTURE;
if (executionCancelled())
return CompletableFuture.completedFuture(result);
if (!isFailure(result))
return postExecuteAsync(result, scheduler, future);
if (failedAttemptListener != null)
failedAttemptListener.handle(result, execution);
CompletableFuture<ExecutionResult> promise = new CompletableFuture<>();
Callable<R> callable = () -> {
try {
CompletableFuture<R> fallback = policy.applyStage((R) result.getResult(), result.getFailure(),
execution.copy());
fallback.whenComplete((innerResult, failure) -> {
if (failure instanceof CompletionException)
failure = failure.getCause();
ExecutionResult r = failure == null ? result.withResult(innerResult) : ExecutionResult.failure(failure);
promise.complete(r);
});
} catch (Throwable t) {
promise.complete(ExecutionResult.failure(t));
}
return null;
};
try {
if (!policy.isAsync())
callable.call();
else {
Future<?> scheduledFallback = scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS);
// Propagate cancellation to the scheduled fallback and its promise
future.injectCancelFn((mayInterrupt, promiseResult) -> {
scheduledFallback.cancel(mayInterrupt);
if (executionCancelled())
promise.complete(promiseResult);
});
}
} catch (Throwable t) {
// Hard scheduling failure
promise.completeExceptionally(t);
}
return promise.thenCompose(ss -> postExecuteAsync(ss, scheduler, future));
});
}
}