-
Notifications
You must be signed in to change notification settings - Fork 123
/
UniRetryAtMost.java
92 lines (79 loc) · 3.29 KB
/
UniRetryAtMost.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package io.smallrye.mutiny.operators.uni;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.ParameterValidation.positive;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.operators.UniOperator;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
public class UniRetryAtMost<T> extends UniOperator<T, T> {
private final Predicate<? super Throwable> predicate;
private final long maxAttempts;
public UniRetryAtMost(Uni<T> upstream, Predicate<? super Throwable> predicate, long maxAttempts) {
super(nonNull(upstream, "upstream"));
this.predicate = nonNull(predicate, "predicate");
this.maxAttempts = positive(maxAttempts, "maxAttempts");
}
@Override
public void subscribe(UniSubscriber<? super T> subscriber) {
AbstractUni.subscribe(upstream(), new UniRetryAtMostProcessor<T>(this, subscriber));
}
private static class UniRetryAtMostProcessor<T> extends UniOperatorProcessor<T, T> {
private final UniRetryAtMost<T> uniRetryAtMost;
private volatile int counter = 0;
private static final AtomicIntegerFieldUpdater<UniRetryAtMostProcessor> counterUpdater = AtomicIntegerFieldUpdater
.newUpdater(UniRetryAtMostProcessor.class, "counter");
public UniRetryAtMostProcessor(UniRetryAtMost<T> uniRetryAtMost, UniSubscriber<? super T> downstream) {
super(downstream);
this.uniRetryAtMost = uniRetryAtMost;
}
@Override
public void onSubscribe(UniSubscription subscription) {
int count = counterUpdater.incrementAndGet(this);
if (compareAndSetUpstreamSubscription(null, subscription)) {
if (count == 1) {
downstream.onSubscribe(this);
}
} else {
subscription.cancel();
}
}
@Override
public void onFailure(Throwable failure) {
if (isCancelled()) {
Infrastructure.handleDroppedException(failure);
return;
}
if (!testPredicate(failure)) {
return;
}
if (counter > uniRetryAtMost.maxAttempts) {
downstream.onFailure(failure);
return;
}
UniSubscription previousSubscription = getAndSetUpstreamSubscription(null);
if (previousSubscription != null) {
previousSubscription.cancel();
}
AbstractUni.subscribe(uniRetryAtMost.upstream(), this);
}
private boolean testPredicate(Throwable failure) {
boolean passes;
try {
passes = uniRetryAtMost.predicate.test(failure);
} catch (Throwable e) {
downstream.onFailure(e);
return false;
}
if (!passes) {
downstream.onFailure(failure);
return false;
} else {
return true;
}
}
}
}