/
BufferingFlowControlStrategy.java
216 lines (195 loc) · 8.48 KB
/
BufferingFlowControlStrategy.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>A flow control strategy that accumulates updates and emits window control
* frames when the accumulated value reaches a threshold.</p>
* <p>The sender flow control window is represented in the receiver as two
* buckets: a bigger bucket, initially full, that is drained when data is
* received, and a smaller bucket, initially empty, that is filled when data is
* consumed. Only the smaller bucket can refill the bigger bucket.</p>
* <p>The smaller bucket is defined as a fraction of the bigger bucket.</p>
* <p>For a more visual representation, see the
* <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo fountain</a>.</p>
* <p>The algorithm works in this way.</p>
* <p>The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller
* bucket (SB) being 40% of the bigger bucket: 40.</p>
* <p>The receiver receives a data frame of 60, so now BB=40; the data frame is
* passed to the application that consumes 25, so now SB=25. Since SB is not full,
* no window control frames are emitted.</p>
* <p>The application consumes other 20, so now SB=45. Since SB is full, its 45
* are transferred to BB, which is now BB=85, and a window control frame is sent
* with delta=45.</p>
* <p>The application consumes the remaining 15, so now SB=15, and no window
* control frame is emitted.</p>
*/
@ManagedObject
public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
{
private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
private final AtomicInteger sessionLevel = new AtomicInteger();
private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
private float bufferRatio;
public BufferingFlowControlStrategy(float bufferRatio)
{
this(DEFAULT_WINDOW_SIZE, bufferRatio);
}
public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
{
super(initialStreamSendWindow);
this.bufferRatio = bufferRatio;
}
@ManagedAttribute("The ratio between the receive buffer and the consume buffer")
public float getBufferRatio()
{
return bufferRatio;
}
public void setBufferRatio(float bufferRatio)
{
this.bufferRatio = bufferRatio;
}
@Override
public void onStreamCreated(IStream stream)
{
super.onStreamCreated(stream);
streamLevels.put(stream, new AtomicInteger());
}
@Override
public void onStreamDestroyed(IStream stream)
{
streamLevels.remove(stream);
super.onStreamDestroyed(stream);
}
@Override
public void onDataConsumed(ISession session, IStream stream, int length)
{
if (length <= 0)
return;
float ratio = bufferRatio;
int level = sessionLevel.addAndGet(length);
int maxLevel = (int)(maxSessionRecvWindow.get() * ratio);
if (level > maxLevel)
{
if (sessionLevel.compareAndSet(level, 0))
{
session.updateRecvWindow(level);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session);
sendWindowUpdate(null, session, new WindowUpdateFrame(0, level));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, concurrent session recv window level {}/{} for {}", length, sessionLevel, maxLevel, session);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, session recv window level {}/{} for {}", length, level, maxLevel, session);
}
if (stream != null)
{
if (stream.isRemotelyClosed())
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for remotely closed {}", length, stream);
}
else
{
AtomicInteger streamLevel = streamLevels.get(stream);
if (streamLevel != null)
{
level = streamLevel.addAndGet(length);
maxLevel = (int)(getInitialStreamRecvWindow() * ratio);
if (level > maxLevel)
{
level = streamLevel.getAndSet(0);
stream.updateRecvWindow(level);
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream);
sendWindowUpdate(stream, session, new WindowUpdateFrame(stream.getId(), level));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Data consumed, {} bytes, stream recv window level {}/{} for {}", length, level, maxLevel, stream);
}
}
}
}
}
protected void sendWindowUpdate(IStream stream, ISession session, WindowUpdateFrame frame)
{
session.frames(stream, Collections.singletonList(frame), Callback.NOOP);
}
@Override
public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
{
super.windowUpdate(session, stream, frame);
// Window updates cannot be negative.
// The SettingsFrame.INITIAL_WINDOW_SIZE setting
// only influences the *stream* window size.
// Therefore the session window can only be enlarged,
// and here we keep track of its max value.
// Updating the max session recv window is done here
// so that if a peer decides to send a unilateral
// window update to enlarge the session window,
// without the corresponding data consumption, here
// we can track it.
// Note that it is not perfect, since there is a time
// window between the session recv window being updated
// before the window update frame is sent, and the
// invocation of this method: in between data may arrive
// and reduce the session recv window size.
// But eventually the max value will be seen.
// Note that we cannot avoid the time window described
// above by updating the session recv window from here
// because there is a race between the sender and the
// receiver: the sender may receive a window update and
// send more data, while this method has not yet been
// invoked; when the data is received the session recv
// window may become negative and the connection will
// be closed (per specification).
if (frame.getStreamId() == 0)
{
int sessionWindow = session.updateRecvWindow(0);
Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
}
}
@Override
public String toString()
{
return String.format("%s@%x[ratio=%.2f,sessionLevel=%s,sessionStallTime=%dms,streamsStallTime=%dms]",
getClass().getSimpleName(),
hashCode(),
bufferRatio,
sessionLevel,
getSessionStallTime(),
getStreamsStallTime());
}
}