forked from elastic/elasticsearch
/
Netty4HttpServerTransport.java
353 lines (306 loc) · 19 KB
/
Netty4HttpServerTransport.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.http.netty4;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.FixedRecvByteBufAllocator;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.AttributeKey;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.network.NetworkService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.core.internal.net.NetUtils;
import org.elasticsearch.http.AbstractHttpServerTransport;
import org.elasticsearch.http.HttpChannel;
import org.elasticsearch.http.HttpHandlingSettings;
import org.elasticsearch.http.HttpReadTimeoutException;
import org.elasticsearch.http.HttpServerChannel;
import org.elasticsearch.http.netty4.cors.Netty4CorsHandler;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.CopyBytesServerSocketChannel;
import org.elasticsearch.transport.SharedGroupFactory;
import org.elasticsearch.transport.netty4.Netty4Utils;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CHUNK_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_CONTENT_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_HEADER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_MAX_INITIAL_LINE_LENGTH;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_READ_TIMEOUT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_ALIVE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_COUNT;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_IDLE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_KEEP_INTERVAL;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_NO_DELAY;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_REUSE_ADDRESS;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_HTTP_TCP_SEND_BUFFER_SIZE;
import static org.elasticsearch.http.HttpTransportSettings.SETTING_PIPELINING_MAX_EVENTS;
public class Netty4HttpServerTransport extends AbstractHttpServerTransport {
private static final Logger logger = LogManager.getLogger(Netty4HttpServerTransport.class);
/*
* Size in bytes of an individual message received by io.netty.handler.codec.MessageAggregator which accumulates the content for an
* HTTP request. This number is used for estimating the maximum number of allowed buffers before the MessageAggregator's internal
* collection of buffers is resized.
*
* By default we assume the Ethernet MTU (1500 bytes) but users can override it with a system property.
*/
private static final ByteSizeValue MTU = new ByteSizeValue(Long.parseLong(System.getProperty("es.net.mtu", "1500")));
private static final String SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS = "http.netty.max_composite_buffer_components";
public static Setting<Integer> SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS =
new Setting<>(SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS, (s) -> {
ByteSizeValue maxContentLength = SETTING_HTTP_MAX_CONTENT_LENGTH.get(s);
/*
* Netty accumulates buffers containing data from all incoming network packets that make up one HTTP request in an instance of
* io.netty.buffer.CompositeByteBuf (think of it as a buffer of buffers). Once its capacity is reached, the buffer will iterate
* over its individual entries and put them into larger buffers (see io.netty.buffer.CompositeByteBuf#consolidateIfNeeded()
* for implementation details). We want to to resize that buffer because this leads to additional garbage on the heap and also
* increases the application's native memory footprint (as direct byte buffers hold their contents off-heap).
*
* With this setting we control the CompositeByteBuf's capacity (which is by default 1024, see
* io.netty.handler.codec.MessageAggregator#DEFAULT_MAX_COMPOSITEBUFFER_COMPONENTS). To determine a proper default capacity for
* that buffer, we need to consider that the upper bound for the size of HTTP requests is determined by `maxContentLength`. The
* number of buffers that are needed depend on how often Netty reads network packets which depends on the network type (MTU).
* We assume here that Elasticsearch receives HTTP requests via an Ethernet connection which has a MTU of 1500 bytes.
*
* Note that we are *not* pre-allocating any memory based on this setting but rather determine the CompositeByteBuf's capacity.
* The tradeoff is between less (but larger) buffers that are contained in the CompositeByteBuf and more (but smaller) buffers.
* With the default max content length of 100MB and a MTU of 1500 bytes we would allow 69905 entries.
*/
long maxBufferComponentsEstimate = Math.round((double) (maxContentLength.getBytes() / MTU.getBytes()));
// clamp value to the allowed range
long maxBufferComponents = Math.max(2, Math.min(maxBufferComponentsEstimate, Integer.MAX_VALUE));
return String.valueOf(maxBufferComponents);
// Netty's CompositeByteBuf implementation does not allow less than two components.
}, s -> Setting.parseInt(s, 2, Integer.MAX_VALUE, SETTING_KEY_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS), Property.NodeScope);
public static final Setting<Integer> SETTING_HTTP_WORKER_COUNT = new Setting<>("http.netty.worker_count", "0",
(s) -> Setting.parseInt(s, 0, "http.netty.worker_count"), Property.NodeScope);
public static final Setting<ByteSizeValue> SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE =
Setting.byteSizeSetting("http.netty.receive_predictor_size", new ByteSizeValue(64, ByteSizeUnit.KB), Property.NodeScope);
private final ByteSizeValue maxInitialLineLength;
private final ByteSizeValue maxHeaderSize;
private final ByteSizeValue maxChunkSize;
private final int pipeliningMaxEvents;
private final SharedGroupFactory sharedGroupFactory;
private final RecvByteBufAllocator recvByteBufAllocator;
private final int readTimeoutMillis;
private final int maxCompositeBufferComponents;
private volatile ServerBootstrap serverBootstrap;
private volatile SharedGroupFactory.SharedGroup sharedGroup;
public Netty4HttpServerTransport(Settings settings, NetworkService networkService, BigArrays bigArrays, ThreadPool threadPool,
NamedXContentRegistry xContentRegistry, Dispatcher dispatcher,
SharedGroupFactory sharedGroupFactory) {
super(settings, networkService, bigArrays, threadPool, xContentRegistry, dispatcher);
Netty4Utils.setAvailableProcessors(EsExecutors.NODE_PROCESSORS_SETTING.get(settings));
this.sharedGroupFactory = sharedGroupFactory;
this.maxChunkSize = SETTING_HTTP_MAX_CHUNK_SIZE.get(settings);
this.maxHeaderSize = SETTING_HTTP_MAX_HEADER_SIZE.get(settings);
this.maxInitialLineLength = SETTING_HTTP_MAX_INITIAL_LINE_LENGTH.get(settings);
this.pipeliningMaxEvents = SETTING_PIPELINING_MAX_EVENTS.get(settings);
this.maxCompositeBufferComponents = SETTING_HTTP_NETTY_MAX_COMPOSITE_BUFFER_COMPONENTS.get(settings);
this.readTimeoutMillis = Math.toIntExact(SETTING_HTTP_READ_TIMEOUT.get(settings).getMillis());
ByteSizeValue receivePredictor = SETTING_HTTP_NETTY_RECEIVE_PREDICTOR_SIZE.get(settings);
recvByteBufAllocator = new FixedRecvByteBufAllocator(receivePredictor.bytesAsInt());
logger.debug("using max_chunk_size[{}], max_header_size[{}], max_initial_line_length[{}], max_content_length[{}], " +
"receive_predictor[{}], max_composite_buffer_components[{}], pipelining_max_events[{}]",
maxChunkSize, maxHeaderSize, maxInitialLineLength, maxContentLength, receivePredictor, maxCompositeBufferComponents,
pipeliningMaxEvents);
}
public Settings settings() {
return this.settings;
}
@Override
protected void doStart() {
boolean success = false;
try {
sharedGroup = sharedGroupFactory.getHttpGroup();
serverBootstrap = new ServerBootstrap();
serverBootstrap.group(sharedGroup.getLowLevelGroup());
// If direct buffer pooling is disabled, use the CopyBytesServerSocketChannel which will create child
// channels of type CopyBytesSocketChannel. CopyBytesSocketChannel pool a single direct buffer
// per-event-loop thread to be used for IO operations.
if (ByteBufAllocator.DEFAULT.isDirectBufferPooled()) {
serverBootstrap.channel(NioServerSocketChannel.class);
} else {
serverBootstrap.channel(CopyBytesServerSocketChannel.class);
}
serverBootstrap.childHandler(configureServerChannelHandler());
serverBootstrap.handler(new ServerChannelExceptionHandler(this));
serverBootstrap.childOption(ChannelOption.TCP_NODELAY, SETTING_HTTP_TCP_NO_DELAY.get(settings));
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, SETTING_HTTP_TCP_KEEP_ALIVE.get(settings));
if (SETTING_HTTP_TCP_KEEP_ALIVE.get(settings)) {
// Netty logs a warning if it can't set the option, so try this only on supported platforms
if (IOUtils.LINUX || IOUtils.MAC_OS_X) {
if (SETTING_HTTP_TCP_KEEP_IDLE.get(settings) >= 0) {
final SocketOption<Integer> keepIdleOption = NetUtils.getTcpKeepIdleSocketOptionOrNull();
if (keepIdleOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIdleOption), SETTING_HTTP_TCP_KEEP_IDLE.get(settings));
}
}
if (SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings) >= 0) {
final SocketOption<Integer> keepIntervalOption = NetUtils.getTcpKeepIntervalSocketOptionOrNull();
if (keepIntervalOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepIntervalOption),
SETTING_HTTP_TCP_KEEP_INTERVAL.get(settings));
}
}
if (SETTING_HTTP_TCP_KEEP_COUNT.get(settings) >= 0) {
final SocketOption<Integer> keepCountOption = NetUtils.getTcpKeepCountSocketOptionOrNull();
if (keepCountOption != null) {
serverBootstrap.childOption(NioChannelOption.of(keepCountOption), SETTING_HTTP_TCP_KEEP_COUNT.get(settings));
}
}
}
}
final ByteSizeValue tcpSendBufferSize = SETTING_HTTP_TCP_SEND_BUFFER_SIZE.get(settings);
if (tcpSendBufferSize.getBytes() > 0) {
serverBootstrap.childOption(ChannelOption.SO_SNDBUF, Math.toIntExact(tcpSendBufferSize.getBytes()));
}
final ByteSizeValue tcpReceiveBufferSize = SETTING_HTTP_TCP_RECEIVE_BUFFER_SIZE.get(settings);
if (tcpReceiveBufferSize.getBytes() > 0) {
serverBootstrap.childOption(ChannelOption.SO_RCVBUF, Math.toIntExact(tcpReceiveBufferSize.getBytes()));
}
serverBootstrap.option(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, recvByteBufAllocator);
final boolean reuseAddress = SETTING_HTTP_TCP_REUSE_ADDRESS.get(settings);
serverBootstrap.option(ChannelOption.SO_REUSEADDR, reuseAddress);
serverBootstrap.childOption(ChannelOption.SO_REUSEADDR, reuseAddress);
bindServer();
success = true;
} finally {
if (success == false) {
doStop(); // otherwise we leak threads since we never moved to started
}
}
}
@Override
protected HttpServerChannel bind(InetSocketAddress socketAddress) throws Exception {
ChannelFuture future = serverBootstrap.bind(socketAddress).sync();
Channel channel = future.channel();
Netty4HttpServerChannel httpServerChannel = new Netty4HttpServerChannel(channel);
channel.attr(HTTP_SERVER_CHANNEL_KEY).set(httpServerChannel);
return httpServerChannel;
}
@Override
protected void stopInternal() {
if (sharedGroup != null) {
sharedGroup.shutdownGracefully();
sharedGroup = null;
}
}
@Override
public void onException(HttpChannel channel, Exception cause) {
if (cause instanceof ReadTimeoutException) {
super.onException(channel, new HttpReadTimeoutException(readTimeoutMillis, cause));
} else {
super.onException(channel, cause);
}
}
public ChannelHandler configureServerChannelHandler() {
return new HttpChannelHandler(this, handlingSettings);
}
static final AttributeKey<Netty4HttpChannel> HTTP_CHANNEL_KEY = AttributeKey.newInstance("es-http-channel");
static final AttributeKey<Netty4HttpServerChannel> HTTP_SERVER_CHANNEL_KEY = AttributeKey.newInstance("es-http-server-channel");
protected static class HttpChannelHandler extends ChannelInitializer<Channel> {
private final Netty4HttpServerTransport transport;
private final Netty4HttpRequestHandler requestHandler;
private final HttpHandlingSettings handlingSettings;
protected HttpChannelHandler(final Netty4HttpServerTransport transport, final HttpHandlingSettings handlingSettings) {
this.transport = transport;
this.handlingSettings = handlingSettings;
this.requestHandler = new Netty4HttpRequestHandler(transport);
}
@Override
protected void initChannel(Channel ch) throws Exception {
Netty4HttpChannel nettyHttpChannel = new Netty4HttpChannel(ch);
ch.attr(HTTP_CHANNEL_KEY).set(nettyHttpChannel);
ch.pipeline().addLast("read_timeout", new ReadTimeoutHandler(transport.readTimeoutMillis, TimeUnit.MILLISECONDS));
final HttpRequestDecoder decoder = new HttpRequestDecoder(
handlingSettings.getMaxInitialLineLength(),
handlingSettings.getMaxHeaderSize(),
handlingSettings.getMaxChunkSize());
decoder.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
ch.pipeline().addLast("decoder", decoder);
ch.pipeline().addLast("decoder_compress", new HttpContentDecompressor());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
final HttpObjectAggregator aggregator = new HttpObjectAggregator(handlingSettings.getMaxContentLength());
aggregator.setMaxCumulationBufferComponents(transport.maxCompositeBufferComponents);
ch.pipeline().addLast("aggregator", aggregator);
if (handlingSettings.isCompression()) {
ch.pipeline().addLast("encoder_compress", new HttpContentCompressor(handlingSettings.getCompressionLevel()));
}
if (handlingSettings.isCorsEnabled()) {
ch.pipeline().addLast("cors", new Netty4CorsHandler(transport.corsConfig));
}
ch.pipeline().addLast("pipelining", new Netty4HttpPipeliningHandler(logger, transport.pipeliningMaxEvents));
ch.pipeline().addLast("handler", requestHandler);
transport.serverAcceptedChannel(nettyHttpChannel);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
super.exceptionCaught(ctx, cause);
}
}
@ChannelHandler.Sharable
private static class ServerChannelExceptionHandler extends ChannelInboundHandlerAdapter {
private final Netty4HttpServerTransport transport;
private ServerChannelExceptionHandler(Netty4HttpServerTransport transport) {
this.transport = transport;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ExceptionsHelper.maybeDieOnAnotherThread(cause);
Netty4HttpServerChannel httpServerChannel = ctx.channel().attr(HTTP_SERVER_CHANNEL_KEY).get();
if (cause instanceof Error) {
transport.onServerException(httpServerChannel, new Exception(cause));
} else {
transport.onServerException(httpServerChannel, (Exception) cause);
}
}
}
}