-
Notifications
You must be signed in to change notification settings - Fork 124
/
MultiOnFailureInvoke.java
55 lines (44 loc) · 1.88 KB
/
MultiOnFailureInvoke.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package io.smallrye.mutiny.operators.multi;
import static io.smallrye.mutiny.helpers.ParameterValidation.nonNull;
import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.reactivestreams.Subscription;
import io.smallrye.mutiny.CompositeException;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiSubscriber;
public class MultiOnFailureInvoke<T> extends AbstractMultiOperator<T, T> {
private final Consumer<Throwable> callback;
private final Predicate<? super Throwable> predicate;
public MultiOnFailureInvoke(Multi<? extends T> upstream, Consumer<Throwable> callback,
Predicate<? super Throwable> predicate) {
super(upstream);
this.callback = callback;
this.predicate = predicate;
}
@Override
public void subscribe(MultiSubscriber<? super T> downstream) {
upstream.subscribe().withSubscriber(new MultiOnFailureInvokeProcessor(nonNull(downstream, "downstream")));
}
class MultiOnFailureInvokeProcessor extends MultiOperatorProcessor<T, T> {
public MultiOnFailureInvokeProcessor(MultiSubscriber<? super T> downstream) {
super(downstream);
}
@Override
public void onFailure(Throwable failure) {
Subscription up = getAndSetUpstreamSubscription(CANCELLED);
MultiSubscriber<? super T> subscriber = downstream;
if (up != Subscriptions.CANCELLED) {
try {
if (predicate.test(failure)) {
callback.accept(failure);
}
} catch (Throwable e) {
failure = new CompositeException(failure, e);
}
subscriber.onFailure(failure);
}
}
}
}