-
Notifications
You must be signed in to change notification settings - Fork 124
/
UniOnCancellation.java
71 lines (56 loc) · 2.23 KB
/
UniOnCancellation.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package io.smallrye.mutiny.operators.uni;
import static io.smallrye.mutiny.helpers.EmptyUniSubscription.CANCELLED;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.operators.AbstractUni;
import io.smallrye.mutiny.operators.UniOperator;
import io.smallrye.mutiny.subscription.UniSubscriber;
import io.smallrye.mutiny.subscription.UniSubscription;
public class UniOnCancellation<T> extends UniOperator<T, T> {
private final Runnable callback;
public UniOnCancellation(Uni<T> upstream, Runnable callback) {
super(upstream);
this.callback = callback;
}
@Override
public void subscribe(UniSubscriber<? super T> subscriber) {
AbstractUni.subscribe(upstream(), new UniOnCancellationProcessor<T>(callback, subscriber));
}
private enum State {
INIT,
DONE,
CANCELLED
}
private static class UniOnCancellationProcessor<T> extends UniOperatorProcessor<T, T> {
private final Runnable callback;
private volatile State state = State.INIT;
private static final AtomicReferenceFieldUpdater<UniOnCancellationProcessor, State> stateUpdater = AtomicReferenceFieldUpdater
.newUpdater(UniOnCancellationProcessor.class, State.class, "state");
public UniOnCancellationProcessor(Runnable callback, UniSubscriber<? super T> downstream) {
super(downstream);
this.callback = callback;
}
@Override
public void onItem(T item) {
if (stateUpdater.compareAndSet(this, State.INIT, State.DONE)) {
downstream.onItem(item);
}
}
@Override
public void onFailure(Throwable failure) {
if (stateUpdater.compareAndSet(this, State.INIT, State.DONE)) {
downstream.onFailure(failure);
}
}
@Override
public void cancel() {
if (stateUpdater.compareAndSet(this, State.INIT, State.CANCELLED)) {
UniSubscription sub = getAndSetUpstreamSubscription(CANCELLED);
callback.run();
if (sub != null) {
sub.cancel();
}
}
}
}
}