/
DemandingFlusher.java
196 lines (173 loc) · 6.87 KB
/
DemandingFlusher.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//
// ========================================================================
// Copyright (c) 1995-2022 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.websocket.core.internal;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.LongConsumer;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.websocket.core.Extension;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.exception.SentinelWebSocketCloseException;
/**
* <p>This flusher can be used to mutated and fragment {@link Frame}s and forwarded them on towards the application using the
* {@link IncomingFrames} provided in the constructor. This can split a single incoming frame into n {@link Frame}s which are
* passed on to the {@link IncomingFrames} one at a time.</p>
*
* <p>The asynchronous operation performed by this {@link IteratingCallback} is demanding from upper layer after which
* {@link #onFrame(Frame, Callback)} will called with the new content.</p>
*
* <p>This flusher relies on the interception of demand, and because of this it can only be used in an {@link Extension} which
* implements the {@link DemandChain} interface. The methods of {@link DemandChain} from the {@link Extension}
* must be forwarded to this flusher.</p>
*/
public abstract class DemandingFlusher extends IteratingCallback implements DemandChain
{
private static final Throwable SENTINEL_CLOSE_EXCEPTION = new SentinelWebSocketCloseException();
private final IncomingFrames _emitFrame;
private final AtomicLong _demand = new AtomicLong();
private final AtomicReference<Throwable> _failure = new AtomicReference<>();
private LongConsumer _nextDemand;
private Frame _frame;
private Callback _callback;
private boolean _needContent = true;
private boolean _first = true;
/**
* @param emitFrame where frames generated by {@link #handle(Frame, Callback, boolean)} are forwarded.
*/
public DemandingFlusher(IncomingFrames emitFrame)
{
_emitFrame = emitFrame;
}
/**
* <p>Called when there is demand for a single frame to be produced. During this method a single call can be made
* to {@link #emitFrame(Frame, Callback)} which will forward this frame towards the application. Returning true
* from this method signals that you are done processing the current Frame, and the next invocation of this
* method will have the next frame.</p>
*
* <p>Note that the callback supplied here is specially wrapped so that you can call
* it multiple times and it will not be completed more than once. This simplifies the
* handling of failure cases.</p>
* @param frame the original frame.
* @param callback to succeed to release the frame payload.
* @param first if this is the first time this method has been called for this frame.
* @return false to continue processing this frame, true to complete processing and get a new frame.
*/
protected abstract boolean handle(Frame frame, Callback callback, boolean first);
@Override
public void demand(long n)
{
_demand.getAndUpdate(d -> Math.addExact(d, n));
iterate();
}
@Override
public void setNextDemand(LongConsumer nextDemand)
{
_nextDemand = nextDemand;
}
/**
* Used to supply the flusher with a new frame. This frame should only arrive if demanded
* through the {@link LongConsumer} provided by {@link #setNextDemand(LongConsumer)}.
* @param frame the WebSocket frame.
* @param callback to release frame payload.
*/
public void onFrame(Frame frame, Callback callback)
{
if (_frame != null || _callback != null)
throw new IllegalStateException("Not expecting onFrame");
_frame = frame;
_callback = new CountingCallback(callback, 1);
succeeded();
}
/**
* Used to close this flusher when there is no explicit failure.
*/
public void closeFlusher()
{
if (_failure.compareAndSet(null, SENTINEL_CLOSE_EXCEPTION))
{
failed(SENTINEL_CLOSE_EXCEPTION);
iterate();
}
}
/**
* Used to fail this flusher possibly from an external event such as a callback.
* @param t the failure.
*/
public void failFlusher(Throwable t)
{
if (_failure.compareAndSet(null, t))
{
failed(t);
iterate();
}
}
/**
* <p>This is used within an implementation of {@link #handle(Frame, Callback, boolean)}
* to forward a frame onto the next layer of processing.</p>
*
* <p>This method should only be called ONCE within each invocation of {@link #handle(Frame, Callback, boolean)}
* otherwise</p>
* @param frame the WebSocket frame.
* @param callback to release frame payload.
*/
public void emitFrame(Frame frame, Callback callback)
{
if (_demand.decrementAndGet() < 0)
throw new IllegalStateException("Negative Demand");
_emitFrame.onFrame(frame, callback);
}
@Override
protected Action process() throws Throwable
{
while (true)
{
Throwable failure = _failure.get();
if (failure != null)
throw failure;
if (_demand.get() <= 0)
break;
if (_needContent)
{
_needContent = false;
_nextDemand.accept(1);
return Action.SCHEDULED;
}
boolean first = _first;
_first = false;
boolean needContent = handle(_frame, _callback, first);
if (needContent)
{
_needContent = true;
_first = true;
_frame = null;
_callback = null;
}
}
return Action.IDLE;
}
@Override
protected void onCompleteFailure(Throwable cause)
{
Throwable suppressed = _failure.getAndSet(cause);
if (suppressed != null && suppressed != cause)
cause.addSuppressed(suppressed);
// This is wrapped with CountingCallback so protects against double succeed/failed.
if (_callback != null)
_callback.failed(cause);
_frame = null;
_callback = null;
}
}