/
ClientConnector.java
624 lines (561 loc) · 22.5 KB
/
ClientConnector.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License v. 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.file.Path;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The client-side component that connects to server sockets.</p>
* <p>ClientConnector delegates the handling of {@link SocketChannel}s
* to a {@link SelectorManager}, and centralizes the configuration of
* necessary components such as the executor, the scheduler, etc.</p>
* <p>ClientConnector offers a low-level API that can be used to
* connect {@link SocketChannel}s to listening servers via the
* {@link #connect(SocketAddress, Map)} method.</p>
* <p>However, a ClientConnector instance is typically just configured
* and then passed to an HttpClient transport, so that applications
* can use high-level APIs to make HTTP requests to servers:</p>
* <pre>
* // Create a ClientConnector instance.
* ClientConnector connector = new ClientConnector();
*
* // Configure the ClientConnector.
* connector.setSelectors(1);
* connector.setSslContextFactory(new SslContextFactory.Client());
*
* // Pass it to the HttpClient transport.
* HttpClientTransport transport = new HttpClientTransportDynamic(connector);
* HttpClient httpClient = new HttpClient(transport);
* httpClient.start();
* </pre>
*/
@ManagedObject
public class ClientConnector extends ContainerLifeCycle
{
public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector";
public static final String REMOTE_SOCKET_ADDRESS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".remoteSocketAddress";
public static final String CLIENT_CONNECTION_FACTORY_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".clientConnectionFactory";
public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
private static final Logger LOG = LoggerFactory.getLogger(ClientConnector.class);
/**
* <p>Creates a ClientConnector configured to connect via Unix-Domain sockets to the given Unix-Domain path</p>
*
* @param path the Unix-Domain path to connect to
* @return a ClientConnector that connects to the given Unix-Domain path
*/
public static ClientConnector forUnixDomain(Path path)
{
return new ClientConnector(SocketChannelWithAddress.Factory.forUnixDomain(path));
}
private final SocketChannelWithAddress.Factory factory;
private Executor executor;
private Scheduler scheduler;
private ByteBufferPool byteBufferPool;
private SslContextFactory.Client sslContextFactory;
private SelectorManager selectorManager;
private int selectors = 1;
private boolean connectBlocking;
private Duration connectTimeout = Duration.ofSeconds(5);
private Duration idleTimeout = Duration.ofSeconds(30);
private SocketAddress bindAddress;
private boolean tcpNoDelay = true;
private boolean reuseAddress = true;
private boolean reusePort;
private int receiveBufferSize = -1;
private int sendBufferSize = -1;
public ClientConnector()
{
this((address, context) -> new SocketChannelWithAddress(SocketChannel.open(), address));
}
private ClientConnector(SocketChannelWithAddress.Factory factory)
{
this.factory = Objects.requireNonNull(factory);
}
public Executor getExecutor()
{
return executor;
}
public void setExecutor(Executor executor)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.executor, executor);
this.executor = executor;
}
public Scheduler getScheduler()
{
return scheduler;
}
public void setScheduler(Scheduler scheduler)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.scheduler, scheduler);
this.scheduler = scheduler;
}
public ByteBufferPool getByteBufferPool()
{
return byteBufferPool;
}
public void setByteBufferPool(ByteBufferPool byteBufferPool)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.byteBufferPool, byteBufferPool);
this.byteBufferPool = byteBufferPool;
}
public SslContextFactory.Client getSslContextFactory()
{
return sslContextFactory;
}
public void setSslContextFactory(SslContextFactory.Client sslContextFactory)
{
if (isStarted())
throw new IllegalStateException();
updateBean(this.sslContextFactory, sslContextFactory);
this.sslContextFactory = sslContextFactory;
}
/**
* @return the number of NIO selectors
*/
@ManagedAttribute("The number of NIO selectors")
public int getSelectors()
{
return selectors;
}
public void setSelectors(int selectors)
{
if (isStarted())
throw new IllegalStateException();
this.selectors = selectors;
}
/**
* @return whether {@link #connect(SocketAddress, Map)} operations are performed in blocking mode
*/
@ManagedAttribute("Whether connect operations are performed in blocking mode")
public boolean isConnectBlocking()
{
return connectBlocking;
}
public void setConnectBlocking(boolean connectBlocking)
{
this.connectBlocking = connectBlocking;
}
/**
* @return the timeout of {@link #connect(SocketAddress, Map)} operations
*/
@ManagedAttribute("The timeout of connect operations")
public Duration getConnectTimeout()
{
return connectTimeout;
}
public void setConnectTimeout(Duration connectTimeout)
{
this.connectTimeout = connectTimeout;
if (selectorManager != null)
selectorManager.setConnectTimeout(connectTimeout.toMillis());
}
/**
* @return the max duration for which a connection can be idle (that is, without traffic of bytes in either direction)
*/
@ManagedAttribute("The duration for which a connection can be idle")
public Duration getIdleTimeout()
{
return idleTimeout;
}
public void setIdleTimeout(Duration idleTimeout)
{
this.idleTimeout = idleTimeout;
}
/**
* @return the address to bind a socket to before the connect operation
*/
@ManagedAttribute("The socket address to bind sockets to before the connect operation")
public SocketAddress getBindAddress()
{
return bindAddress;
}
/**
* <p>Sets the bind address of sockets before the connect operation.</p>
* <p>In multi-homed hosts, you may want to connect from a specific address:</p>
* <pre>
* clientConnector.setBindAddress(new InetSocketAddress("127.0.0.2", 0));
* </pre>
* <p>Note the use of the port {@code 0} to indicate that a different ephemeral port
* should be used for each different connection.</p>
* <p>In the rare cases where you want to use the same port for all connections,
* you must also call {@link #setReusePort(boolean) setReusePort(true)}.</p>
*
* @param bindAddress the socket address to bind to before the connect operation
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
/**
* @return whether small TCP packets are sent without delay
*/
@ManagedAttribute("Whether small TCP packets are sent without delay")
public boolean isTCPNoDelay()
{
return tcpNoDelay;
}
public void setTCPNoDelay(boolean tcpNoDelay)
{
this.tcpNoDelay = tcpNoDelay;
}
/**
* @return whether rebinding is allowed with sockets in tear-down states
*/
@ManagedAttribute("Whether rebinding is allowed with sockets in tear-down states")
public boolean getReuseAddress()
{
return reuseAddress;
}
/**
* <p>Sets whether it is allowed to bind a socket to a socket address
* that may be in use by another socket in tear-down state, for example
* in TIME_WAIT state.</p>
* <p>This is useful when ClientConnector is restarted: an existing connection
* may still be using a network address (same host and same port) that is also
* chosen for a new connection.</p>
*
* @param reuseAddress whether rebinding is allowed with sockets in tear-down states
* @see #setReusePort(boolean)
*/
public void setReuseAddress(boolean reuseAddress)
{
this.reuseAddress = reuseAddress;
}
/**
* @return whether binding to same host and port is allowed
*/
@ManagedAttribute("Whether binding to same host and port is allowed")
public boolean isReusePort()
{
return reusePort;
}
/**
* <p>Sets whether it is allowed to bind multiple sockets to the same
* socket address (same host and same port).</p>
*
* @param reusePort whether binding to same host and port is allowed
*/
public void setReusePort(boolean reusePort)
{
this.reusePort = reusePort;
}
/**
* @return the receive buffer size in bytes, or -1 for the default value
*/
@ManagedAttribute("The receive buffer size in bytes")
public int getReceiveBufferSize()
{
return receiveBufferSize;
}
public void setReceiveBufferSize(int receiveBufferSize)
{
this.receiveBufferSize = receiveBufferSize;
}
/**
* @return the send buffer size in bytes, or -1 for the default value
*/
@ManagedAttribute("The send buffer size in bytes")
public int getSendBufferSize()
{
return sendBufferSize;
}
public void setSendBufferSize(int sendBufferSize)
{
this.sendBufferSize = sendBufferSize;
}
@Override
protected void doStart() throws Exception
{
if (executor == null)
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName(String.format("client-pool@%x", hashCode()));
setExecutor(clientThreads);
}
if (scheduler == null)
setScheduler(new ScheduledExecutorScheduler(String.format("client-scheduler@%x", hashCode()), false));
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool());
if (sslContextFactory == null)
setSslContextFactory(newSslContextFactory());
selectorManager = newSelectorManager();
selectorManager.setConnectTimeout(getConnectTimeout().toMillis());
addBean(selectorManager);
super.doStart();
}
@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(selectorManager);
}
protected SslContextFactory.Client newSslContextFactory()
{
SslContextFactory.Client sslContextFactory = new SslContextFactory.Client(false);
sslContextFactory.setEndpointIdentificationAlgorithm("HTTPS");
return sslContextFactory;
}
protected SelectorManager newSelectorManager()
{
return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
}
public void connect(SocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
try
{
if (context == null)
context = new HashMap<>();
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
context.putIfAbsent(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY, address);
SocketChannelWithAddress channelWithAddress = factory.newSocketChannelWithAddress(address, context);
channel = channelWithAddress.getSocketChannel();
address = channelWithAddress.getSocketAddress();
configure(channel);
SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
bind(channel, bindAddress);
boolean connected = true;
boolean blocking = isConnectBlocking() && address instanceof InetSocketAddress;
if (LOG.isDebugEnabled())
LOG.debug("Connecting {} to {}", blocking ? "blocking" : "non-blocking", address);
if (blocking)
{
channel.socket().connect(address, (int)getConnectTimeout().toMillis());
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
connected = channel.connect(address);
}
if (connected)
selectorManager.accept(channel, context);
else
selectorManager.connect(channel, context);
}
// Must catch all exceptions, since some like
// UnresolvedAddressException are not IOExceptions.
catch (Throwable x)
{
// If IPv6 is not deployed, a generic SocketException "Network is unreachable"
// exception is being thrown, so we attempt to provide a better error message.
if (x.getClass() == SocketException.class)
x = new SocketException("Could not connect to " + address).initCause(x);
IO.close(channel);
connectFailed(x, context);
}
}
public void accept(SocketChannel channel, Map<String, Object> context)
{
try
{
context.put(ClientConnector.CLIENT_CONNECTOR_CONTEXT_KEY, this);
if (!channel.isConnected())
throw new IllegalStateException("SocketChannel must be connected");
configure(channel);
channel.configureBlocking(false);
selectorManager.accept(channel, context);
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not accept {}", channel);
IO.close(channel);
Promise<?> promise = (Promise<?>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.failed(failure);
}
}
private void bind(SocketChannel channel, SocketAddress bindAddress) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("Binding {} to {}", channel, bindAddress);
channel.bind(bindAddress);
}
protected void configure(SocketChannel channel) throws IOException
{
setSocketOption(channel, StandardSocketOptions.TCP_NODELAY, isTCPNoDelay());
setSocketOption(channel, StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
setSocketOption(channel, StandardSocketOptions.SO_REUSEPORT, isReusePort());
int receiveBufferSize = getReceiveBufferSize();
if (receiveBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
int sendBufferSize = getSendBufferSize();
if (sendBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_SNDBUF, sendBufferSize);
}
private <T> void setSocketOption(SocketChannel channel, SocketOption<T> option, T value)
{
try
{
channel.setOption(option, value);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not configure {} to {} on {}", option, value, channel, x);
}
}
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
return new SocketChannelEndPoint(channel, selector, selectionKey, getScheduler());
}
protected void connectFailed(Throwable failure, Map<String, Object> context)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(REMOTE_SOCKET_ADDRESS_CONTEXT_KEY));
Promise<?> promise = (Promise<?>)context.get(CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.failed(failure);
}
protected class ClientSelectorManager extends SelectorManager
{
public ClientSelectorManager(Executor executor, Scheduler scheduler, int selectors)
{
super(executor, scheduler, selectors);
}
@Override
protected EndPoint newEndPoint(SelectableChannel channel, ManagedSelector selector, SelectionKey selectionKey)
{
EndPoint endPoint = ClientConnector.this.newEndPoint((SocketChannel)channel, selector, selectionKey);
endPoint.setIdleTimeout(getIdleTimeout().toMillis());
return endPoint;
}
@Override
public Connection newConnection(SelectableChannel channel, EndPoint endPoint, Object attachment) throws IOException
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
ClientConnectionFactory factory = (ClientConnectionFactory)context.get(CLIENT_CONNECTION_FACTORY_CONTEXT_KEY);
return factory.newConnection(endPoint, context);
}
@Override
public void connectionOpened(Connection connection, Object context)
{
super.connectionOpened(connection, context);
@SuppressWarnings("unchecked")
Map<String, Object> contextMap = (Map<String, Object>)context;
@SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)contextMap.get(CONNECTION_PROMISE_CONTEXT_KEY);
if (promise != null)
promise.succeeded(connection);
}
@Override
protected void connectionFailed(SelectableChannel channel, Throwable failure, Object attachment)
{
@SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment;
connectFailed(failure, context);
}
}
/**
* <p>A pair/record holding a {@link SocketChannel} and a {@link SocketAddress} to connect to.</p>
*/
private static class SocketChannelWithAddress
{
private final SocketChannel channel;
private final SocketAddress address;
private SocketChannelWithAddress(SocketChannel channel, SocketAddress address)
{
this.channel = channel;
this.address = address;
}
private SocketChannel getSocketChannel()
{
return channel;
}
private SocketAddress getSocketAddress()
{
return address;
}
/**
* <p>A factory for {@link SocketChannelWithAddress} instances.</p>
*/
private interface Factory
{
private static Factory forUnixDomain(Path path)
{
return (address, context) ->
{
try
{
ProtocolFamily family = Enum.valueOf(StandardProtocolFamily.class, "UNIX");
SocketChannel socketChannel = (SocketChannel)SocketChannel.class.getMethod("open", ProtocolFamily.class).invoke(null, family);
Class<?> addressClass = Class.forName("java.net.UnixDomainSocketAddress");
SocketAddress socketAddress = (SocketAddress)addressClass.getMethod("of", Path.class).invoke(null, path);
return new SocketChannelWithAddress(socketChannel, socketAddress);
}
catch (Throwable x)
{
String message = "Unix-Domain SocketChannels are available starting from Java 16, your Java version is: " + JavaVersion.VERSION;
throw new UnsupportedOperationException(message, x);
}
};
}
/**
* <p>Creates a new {@link SocketChannel} to connect to a {@link SocketAddress}
* derived from the input socket address.</p>
* <p>The input socket address represents the destination socket address to
* connect to, as it is typically specified by a URI authority, for example
* {@code localhost:8080} if the URI is {@code http://localhost:8080/path}.</p>
* <p>However, the returned socket address may be different as the implementation
* may use a Unix-Domain socket address to physically connect to the virtual
* destination socket address given as input.</p>
* <p>The return type is a pair/record holding the socket channel and the
* socket address, with the socket channel not yet connected.
* The implementation of this methods must not call
* {@link SocketChannel#connect(SocketAddress)}, as this is done later,
* after configuring the socket, by the {@link ClientConnector} implementation.</p>
*
* @param address the destination socket address, typically specified in a URI
* @param context the context to create the new socket channel
* @return a new {@link SocketChannel} with an associated {@link SocketAddress} to connect to
* @throws IOException if the socket channel or the socket address cannot be created
*/
public SocketChannelWithAddress newSocketChannelWithAddress(SocketAddress address, Map<String, Object> context) throws IOException;
}
}
}