Skip to content

Commit

Permalink
Fixes #6372 - Review socket options configuration (#6610)
Browse files Browse the repository at this point in the history
* Fixes #6372 - Review socket options configuration

Introduced in ClientConnector:

* tcpNoDelay
* reusePort
* receiveBufferSize
* sendBufferSize

Reworked configuration of socket options in ClientConnector.
JMX-ified ClientConnector.

Introduced reusePort in ServerConnector.
Updated server modules with the new reusePort property.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 13, 2021
1 parent 2b0161e commit dbc0ce7
Show file tree
Hide file tree
Showing 10 changed files with 307 additions and 21 deletions.
Expand Up @@ -923,8 +923,10 @@ public void setMaxRedirects(int maxRedirects)

/**
* @return whether TCP_NODELAY is enabled
* @deprecated use {@link ClientConnector#isTCPNoDelay()} instead
*/
@ManagedAttribute(value = "Whether the TCP_NODELAY option is enabled", name = "tcpNoDelay")
@Deprecated
public boolean isTCPNoDelay()
{
return tcpNoDelay;
Expand All @@ -933,7 +935,9 @@ public boolean isTCPNoDelay()
/**
* @param tcpNoDelay whether TCP_NODELAY is enabled
* @see java.net.Socket#setTcpNoDelay(boolean)
* @deprecated use {@link ClientConnector#setTCPNoDelay(boolean)} instead
*/
@Deprecated
public void setTCPNoDelay(boolean tcpNoDelay)
{
this.tcpNoDelay = tcpNoDelay;
Expand Down
Expand Up @@ -73,6 +73,7 @@
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
Expand Down Expand Up @@ -1909,6 +1910,45 @@ public long getLength()
assertTrue(serverOnErrorLatch.await(5, TimeUnit.SECONDS), "serverOnErrorLatch didn't finish");
}

@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
public void testBindAddress(Scenario scenario) throws Exception
{
String bindAddress = "127.0.0.2";
start(scenario, new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
assertEquals(bindAddress, request.getRemoteAddr());
}
});

client.setBindAddress(new InetSocketAddress(bindAddress, 0));

CountDownLatch latch = new CountDownLatch(1);
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/1")
.onRequestBegin(r ->
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scenario.getScheme())
.path("/2")
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch.countDown();
});
})
.timeout(5, TimeUnit.SECONDS)
.send();

assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(HttpStatus.OK_200, response.getStatus());
}

private void assertCopyRequest(Request original)
{
Request copy = client.copyRequest((HttpRequest)original, original.getURI());
Expand Down
Expand Up @@ -93,6 +93,8 @@ protected void doStart() throws Exception
client.setInputBufferSize(httpClient.getResponseBufferSize());
client.setUseInputDirectByteBuffers(httpClient.isUseInputDirectByteBuffers());
client.setUseOutputDirectByteBuffers(httpClient.isUseOutputDirectByteBuffers());
client.setConnectBlocking(httpClient.isConnectBlocking());
client.setBindAddress(httpClient.getBindAddress());
}
addBean(client);
super.doStart();
Expand Down
188 changes: 171 additions & 17 deletions jetty-io/src/main/java/org/eclipse/jetty/io/ClientConnector.java
Expand Up @@ -18,6 +18,7 @@
import java.net.ProtocolFamily;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketOption;
import java.net.StandardProtocolFamily;
import java.net.StandardSocketOptions;
import java.nio.channels.SelectableChannel;
Expand All @@ -33,6 +34,8 @@
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.JavaVersion;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
Expand All @@ -41,6 +44,32 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* <p>The client-side component that connects to server sockets.</p>
* <p>ClientConnector delegates the handling of {@link SocketChannel}s
* to a {@link SelectorManager}, and centralizes the configuration of
* necessary components such as the executor, the scheduler, etc.</p>
* <p>ClientConnector offers a low-level API that can be used to
* connect {@link SocketChannel}s to listening servers via the
* {@link #connect(SocketAddress, Map)} method.</p>
* <p>However, a ClientConnector instance is typically just configured
* and then passed to an HttpClient transport, so that applications
* can use high-level APIs to make HTTP requests to servers:</p>
* <pre>
* // Create a ClientConnector instance.
* ClientConnector connector = new ClientConnector();
*
* // Configure the ClientConnector.
* connector.setSelectors(1);
* connector.setSslContextFactory(new SslContextFactory.Client());
*
* // Pass it to the HttpClient transport.
* HttpClientTransport transport = new HttpClientTransportDynamic(connector);
* HttpClient httpClient = new HttpClient(transport);
* httpClient.start();
* </pre>
*/
@ManagedObject
public class ClientConnector extends ContainerLifeCycle
{
public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector";
Expand All @@ -49,6 +78,12 @@ public class ClientConnector extends ContainerLifeCycle
public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
private static final Logger LOG = LoggerFactory.getLogger(ClientConnector.class);

/**
* <p>Creates a ClientConnector configured to connect via Unix-Domain sockets to the given Unix-Domain path</p>
*
* @param path the Unix-Domain path to connect to
* @return a ClientConnector that connects to the given Unix-Domain path
*/
public static ClientConnector forUnixDomain(Path path)
{
return new ClientConnector(SocketChannelWithAddress.Factory.forUnixDomain(path));
Expand All @@ -65,7 +100,11 @@ public static ClientConnector forUnixDomain(Path path)
private Duration connectTimeout = Duration.ofSeconds(5);
private Duration idleTimeout = Duration.ofSeconds(30);
private SocketAddress bindAddress;
private boolean tcpNoDelay = true;
private boolean reuseAddress = true;
private boolean reusePort;
private int receiveBufferSize = -1;
private int sendBufferSize = -1;

public ClientConnector()
{
Expand Down Expand Up @@ -129,6 +168,10 @@ public void setSslContextFactory(SslContextFactory.Client sslContextFactory)
this.sslContextFactory = sslContextFactory;
}

/**
* @return the number of NIO selectors
*/
@ManagedAttribute("The number of NIO selectors")
public int getSelectors()
{
return selectors;
Expand All @@ -141,6 +184,10 @@ public void setSelectors(int selectors)
this.selectors = selectors;
}

/**
* @return whether {@link #connect(SocketAddress, Map)} operations are performed in blocking mode
*/
@ManagedAttribute("Whether connect operations are performed in blocking mode")
public boolean isConnectBlocking()
{
return connectBlocking;
Expand All @@ -151,6 +198,10 @@ public void setConnectBlocking(boolean connectBlocking)
this.connectBlocking = connectBlocking;
}

/**
* @return the timeout of {@link #connect(SocketAddress, Map)} operations
*/
@ManagedAttribute("The timeout of connect operations")
public Duration getConnectTimeout()
{
return connectTimeout;
Expand All @@ -163,6 +214,10 @@ public void setConnectTimeout(Duration connectTimeout)
selectorManager.setConnectTimeout(connectTimeout.toMillis());
}

/**
* @return the max duration for which a connection can be idle (that is, without traffic of bytes in either direction)
*/
@ManagedAttribute("The duration for which a connection can be idle")
public Duration getIdleTimeout()
{
return idleTimeout;
Expand All @@ -173,26 +228,120 @@ public void setIdleTimeout(Duration idleTimeout)
this.idleTimeout = idleTimeout;
}

/**
* @return the address to bind a socket to before the connect operation
*/
@ManagedAttribute("The socket address to bind sockets to before the connect operation")
public SocketAddress getBindAddress()
{
return bindAddress;
}

/**
* <p>Sets the bind address of sockets before the connect operation.</p>
* <p>In multi-homed hosts, you may want to connect from a specific address:</p>
* <pre>
* clientConnector.setBindAddress(new InetSocketAddress("127.0.0.2", 0));
* </pre>
* <p>Note the use of the port {@code 0} to indicate that a different ephemeral port
* should be used for each different connection.</p>
* <p>In the rare cases where you want to use the same port for all connections,
* you must also call {@link #setReusePort(boolean) setReusePort(true)}.</p>
*
* @param bindAddress the socket address to bind to before the connect operation
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}

/**
* @return whether small TCP packets are sent without delay
*/
@ManagedAttribute("Whether small TCP packets are sent without delay")
public boolean isTCPNoDelay()
{
return tcpNoDelay;
}

public void setTCPNoDelay(boolean tcpNoDelay)
{
this.tcpNoDelay = tcpNoDelay;
}

/**
* @return whether rebinding is allowed with sockets in tear-down states
*/
@ManagedAttribute("Whether rebinding is allowed with sockets in tear-down states")
public boolean getReuseAddress()
{
return reuseAddress;
}

/**
* <p>Sets whether it is allowed to bind a socket to a socket address
* that may be in use by another socket in tear-down state, for example
* in TIME_WAIT state.</p>
* <p>This is useful when ClientConnector is restarted: an existing connection
* may still be using a network address (same host and same port) that is also
* chosen for a new connection.</p>
*
* @param reuseAddress whether rebinding is allowed with sockets in tear-down states
* @see #setReusePort(boolean)
*/
public void setReuseAddress(boolean reuseAddress)
{
this.reuseAddress = reuseAddress;
}

/**
* @return whether binding to same host and port is allowed
*/
@ManagedAttribute("Whether binding to same host and port is allowed")
public boolean isReusePort()
{
return reusePort;
}

/**
* <p>Sets whether it is allowed to bind multiple sockets to the same
* socket address (same host and same port).</p>
*
* @param reusePort whether binding to same host and port is allowed
*/
public void setReusePort(boolean reusePort)
{
this.reusePort = reusePort;
}

/**
* @return the receive buffer size in bytes, or -1 for the default value
*/
@ManagedAttribute("The receive buffer size in bytes")
public int getReceiveBufferSize()
{
return receiveBufferSize;
}

public void setReceiveBufferSize(int receiveBufferSize)
{
this.receiveBufferSize = receiveBufferSize;
}

/**
* @return the send buffer size in bytes, or -1 for the default value
*/
@ManagedAttribute("The send buffer size in bytes")
public int getSendBufferSize()
{
return sendBufferSize;
}

public void setSendBufferSize(int sendBufferSize)
{
this.sendBufferSize = sendBufferSize;
}

@Override
protected void doStart() throws Exception
{
Expand Down Expand Up @@ -246,10 +395,12 @@ public void connect(SocketAddress address, Map<String, Object> context)
SocketChannelWithAddress channelWithAddress = factory.newSocketChannelWithAddress(address, context);
channel = channelWithAddress.getSocketChannel();
address = channelWithAddress.getSocketAddress();

configure(channel);

SocketAddress bindAddress = getBindAddress();
if (bindAddress != null)
bind(channel, bindAddress);
configure(channel);

boolean connected = true;
boolean blocking = isConnectBlocking() && address instanceof InetSocketAddress;
Expand Down Expand Up @@ -306,33 +457,36 @@ public void accept(SocketChannel channel, Map<String, Object> context)
}
}

private void bind(SocketChannel channel, SocketAddress bindAddress)
private void bind(SocketChannel channel, SocketAddress bindAddress) throws IOException
{
try
{
boolean reuseAddress = getReuseAddress();
if (LOG.isDebugEnabled())
LOG.debug("Binding to {} reusing address {}", bindAddress, reuseAddress);
channel.setOption(StandardSocketOptions.SO_REUSEADDR, reuseAddress);
channel.bind(bindAddress);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not bind {}", channel);
}
if (LOG.isDebugEnabled())
LOG.debug("Binding {} to {}", channel, bindAddress);
channel.bind(bindAddress);
}

protected void configure(SocketChannel channel) throws IOException
{
setSocketOption(channel, StandardSocketOptions.TCP_NODELAY, isTCPNoDelay());
setSocketOption(channel, StandardSocketOptions.SO_REUSEADDR, getReuseAddress());
setSocketOption(channel, StandardSocketOptions.SO_REUSEPORT, isReusePort());
int receiveBufferSize = getReceiveBufferSize();
if (receiveBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_RCVBUF, receiveBufferSize);
int sendBufferSize = getSendBufferSize();
if (sendBufferSize >= 0)
setSocketOption(channel, StandardSocketOptions.SO_SNDBUF, sendBufferSize);
}

private <T> void setSocketOption(SocketChannel channel, SocketOption<T> option, T value)
{
try
{
channel.setOption(StandardSocketOptions.TCP_NODELAY, true);
channel.setOption(option, value);
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not configure {}", channel);
LOG.debug("Could not configure {} to {} on {}", option, value, channel);
}
}

Expand Down
1 change: 1 addition & 0 deletions jetty-server/src/main/config/etc/jetty-http.xml
Expand Up @@ -39,6 +39,7 @@
<Set name="acceptorPriorityDelta" property="jetty.http.acceptorPriorityDelta" />
<Set name="acceptQueueSize" property="jetty.http.acceptQueueSize" />
<Set name="reuseAddress"><Property name="jetty.http.reuseAddress" default="true"/></Set>
<Set name="reusePort"><Property name="jetty.http.reusePort" default="false"/></Set>
<Set name="acceptedTcpNoDelay"><Property name="jetty.http.acceptedTcpNoDelay" default="true"/></Set>
<Set name="acceptedReceiveBufferSize" property="jetty.http.acceptedReceiveBufferSize" />
<Set name="acceptedSendBufferSize" property="jetty.http.acceptedSendBufferSize" />
Expand Down

0 comments on commit dbc0ce7

Please sign in to comment.