-
Notifications
You must be signed in to change notification settings - Fork 123
/
MultiOnSubscribeInvokeOp.java
63 lines (50 loc) · 1.97 KB
/
MultiOnSubscribeInvokeOp.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package io.smallrye.mutiny.operators.multi;
import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;
import java.util.Objects;
import java.util.function.Consumer;
import org.reactivestreams.Subscription;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.MultiSubscriber;
/**
* Execute a given (async) callback on subscription.
*
* The subscription is only sent downstream once the action completes successfully (provides an item, potentially
* {@code null}). If the action fails, the failure is propagated downstream.
*
* @param <T> the value type
*/
public final class MultiOnSubscribeInvokeOp<T> extends AbstractMultiOperator<T, T> {
private final Consumer<? super Subscription> onSubscribe;
public MultiOnSubscribeInvokeOp(Multi<? extends T> upstream,
Consumer<? super Subscription> onSubscribe) {
super(upstream);
this.onSubscribe = onSubscribe;
}
@Override
public void subscribe(MultiSubscriber<? super T> actual) {
OnSubscribeSubscriber subscriber = new OnSubscribeSubscriber(
Objects.requireNonNull(actual, "Subscriber must not be `null`"));
upstream.subscribe().withSubscriber(subscriber);
}
private final class OnSubscribeSubscriber extends MultiOperatorProcessor<T, T> {
OnSubscribeSubscriber(MultiSubscriber<? super T> downstream) {
super(downstream);
}
@Override
public void onSubscribe(Subscription s) {
if (compareAndSetUpstreamSubscription(null, s)) {
try {
onSubscribe.accept(s);
} catch (Throwable e) {
Subscriptions.fail(downstream, e);
getAndSetUpstreamSubscription(CANCELLED).cancel();
return;
}
downstream.onSubscribe(this);
} else {
s.cancel();
}
}
}
}