-
Notifications
You must be signed in to change notification settings - Fork 123
/
MultiOperatorProcessor.java
131 lines (107 loc) · 4.44 KB
/
MultiOperatorProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package io.smallrye.mutiny.operators.multi;
import static io.smallrye.mutiny.helpers.Subscriptions.CANCELLED;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.reactivestreams.Subscription;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.MultiSubscriber;
public abstract class MultiOperatorProcessor<I, O> implements MultiSubscriber<I>, Subscription {
/*
* We used to have an interpretation of the RS TCK that it had to be null on cancellation to release the subscriber.
* It's actually not necessary (and NPE-prone) since operators are instantiated per-subscription, so the *publisher*
* does not actually keep references on cancelled subscribers.
*/
protected volatile MultiSubscriber<? super O> downstream;
protected volatile Subscription upstream = null;
private volatile int cancellationRequested = 0;
private static final AtomicReferenceFieldUpdater<MultiOperatorProcessor, Subscription> UPSTREAM_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(MultiOperatorProcessor.class, Subscription.class, "upstream");
private static final AtomicIntegerFieldUpdater<MultiOperatorProcessor> CANCELLATION_REQUESTED_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(MultiOperatorProcessor.class, "cancellationRequested");
public MultiOperatorProcessor(MultiSubscriber<? super O> downstream) {
this.downstream = ParameterValidation.nonNull(downstream, "downstream");
}
void failAndCancel(Throwable throwable) {
Subscription subscription = getUpstreamSubscription();
if (subscription != null) {
subscription.cancel();
}
onFailure(throwable);
}
protected Subscription getUpstreamSubscription() {
return upstream;
}
protected boolean compareAndSetUpstreamSubscription(Subscription expectedValue, Subscription newValue) {
return UPSTREAM_UPDATER.compareAndSet(this, expectedValue, newValue);
}
protected Subscription getAndSetUpstreamSubscription(Subscription newValue) {
return UPSTREAM_UPDATER.getAndSet(this, newValue);
}
protected boolean isDone() {
return getUpstreamSubscription() == CANCELLED;
}
protected boolean isCancelled() {
return cancellationRequested == 1;
}
@Override
public void onSubscribe(Subscription subscription) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
// Propagate subscription to downstream.
downstream.onSubscribe(this);
} else {
subscription.cancel();
}
}
@Override
public void onFailure(Throwable throwable) {
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
downstream.onFailure(throwable);
} else {
Infrastructure.handleDroppedException(throwable);
}
}
@SuppressWarnings("unchecked")
@Override
public void onItem(I item) {
Subscription subscription = getUpstreamSubscription();
if (subscription != CANCELLED) {
downstream.onItem((O) item);
}
}
@Override
public void onCompletion() {
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
downstream.onCompletion();
}
}
@Override
public void request(long numberOfItems) {
Subscription subscription = getUpstreamSubscription();
if (subscription != CANCELLED) {
if (numberOfItems <= 0) {
onFailure(new IllegalArgumentException("Invalid number of request, must be greater than 0"));
return;
}
subscription.request(numberOfItems);
}
}
@Override
public void cancel() {
if (compareAndSwapDownstreamCancellationRequest()) {
cancelUpstream();
}
}
protected final boolean compareAndSwapDownstreamCancellationRequest() {
return CANCELLATION_REQUESTED_UPDATER.compareAndSet(this, 0, 1);
}
protected void cancelUpstream() {
this.cancellationRequested = 1;
Subscription actual = UPSTREAM_UPDATER.getAndSet(this, CANCELLED);
if (actual != null && actual != CANCELLED) {
actual.cancel();
}
}
}