Skip to content

Commit

Permalink
Issue #6109 - Introduce carriers for HttpClient transports
Browse files Browse the repository at this point in the history
Introduced org.eclipse.jetty.io.Connectable.
Retrofitted HttpClientTransport implementation to take a Connectable
instead of ClientConnector.

In future, a different Connectable implementation can be passed to
HttpClientTransport implementation so that it may delegate to concrete
ClientConnector implementation that use TCP, or Unix Domain sockets,
or QUIC over UDP.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Apr 1, 2021
1 parent d7de3ea commit 64ca8c9
Show file tree
Hide file tree
Showing 10 changed files with 356 additions and 64 deletions.
Expand Up @@ -14,50 +14,43 @@
package org.eclipse.jetty.client;

import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.Map;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connectable;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Container;

@ManagedObject
public abstract class AbstractConnectorHttpClientTransport extends AbstractHttpClientTransport
{
private final ClientConnector connector;
private final Connectable connector;

protected AbstractConnectorHttpClientTransport(ClientConnector connector)
protected AbstractConnectorHttpClientTransport(Connectable connector)
{
this.connector = connector;
addBean(connector);
}

public ClientConnector getClientConnector()
{
return connector;
ClientConnector result = null;
if (connector instanceof ClientConnector)
result = (ClientConnector)connector;
else if (connector instanceof Container)
result = ((Container)connector).getContainedBeans(ClientConnector.class).stream().findFirst().orElse(null);
if (result == null)
throw new IllegalArgumentException(ClientConnector.class.getName() + " not found in transport " + this);
return result;
}

@ManagedAttribute(value = "The number of selectors", readonly = true)
public int getSelectors()
{
return connector.getSelectors();
}

@Override
protected void doStart() throws Exception
{
HttpClient httpClient = getHttpClient();
connector.setBindAddress(httpClient.getBindAddress());
connector.setByteBufferPool(httpClient.getByteBufferPool());
connector.setConnectBlocking(httpClient.isConnectBlocking());
connector.setConnectTimeout(Duration.ofMillis(httpClient.getConnectTimeout()));
connector.setExecutor(httpClient.getExecutor());
connector.setIdleTimeout(Duration.ofMillis(httpClient.getIdleTimeout()));
connector.setScheduler(httpClient.getScheduler());
connector.setSslContextFactory(httpClient.getSslContextFactory());
super.doStart();
return getClientConnector().getSelectors();
}

@Override
Expand Down
Expand Up @@ -159,7 +159,9 @@ public HttpClient(HttpClientTransport transport)
{
this.transport = Objects.requireNonNull(transport);
addBean(transport);
this.connector = ((AbstractHttpClientTransport)transport).getBean(ClientConnector.class);
this.connector = ((AbstractHttpClientTransport)transport).getContainedBeans(ClientConnector.class).stream()
.findFirst()
.orElseThrow(() -> new IllegalArgumentException(ClientConnector.class.getName() + " not found in transport " + transport));
addBean(handlers);
addBean(decoderFactories);
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connectable;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;

Expand Down Expand Up @@ -95,7 +96,7 @@ public HttpClientTransportDynamic()
* @param connector the ClientConnector used by this transport
* @param factoryInfos the <em>application protocols</em> that this transport can speak
*/
public HttpClientTransportDynamic(ClientConnector connector, ClientConnectionFactory.Info... factoryInfos)
public HttpClientTransportDynamic(Connectable connector, ClientConnectionFactory.Info... factoryInfos)
{
super(connector);
addBean(connector);
Expand Down Expand Up @@ -179,7 +180,7 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map<Stri
{
if (destination.isSecure() && protocol.isNegotiate())
{
factory = new ALPNClientConnectionFactory(getClientConnector().getExecutor(), this::newNegotiatedConnection, protocol.getProtocols());
factory = new ALPNClientConnectionFactory(getHttpClient().getExecutor(), this::newNegotiatedConnection, protocol.getProtocols());
}
else
{
Expand Down
Expand Up @@ -25,8 +25,8 @@
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connectable;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

Expand All @@ -41,21 +41,27 @@ public class HttpClientTransportOverHTTP extends AbstractConnectorHttpClientTran

public HttpClientTransportOverHTTP()
{
this(Math.max(1, ProcessorUtils.availableProcessors() / 2));
this(1);
}

public HttpClientTransportOverHTTP(int selectors)
{
this(new ClientConnector());
getClientConnector().setSelectors(selectors);
this(newClientConnector(selectors));
}

public HttpClientTransportOverHTTP(ClientConnector connector)
public HttpClientTransportOverHTTP(Connectable connector)
{
super(connector);
setConnectionPoolFactory(destination -> new DuplexConnectionPool(destination, getHttpClient().getMaxConnectionsPerDestination(), destination));
}

private static Connectable newClientConnector(int selectors)
{
ClientConnector connector = new ClientConnector();
connector.setSelectors(selectors);
return connector;
}

@Override
public Origin newOrigin(HttpRequest request)
{
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connectable;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ProcessorUtils;
import org.eclipse.jetty.util.Promise;
Expand All @@ -47,11 +48,10 @@ public HttpClientTransportOverFCGI(String scriptRoot)

public HttpClientTransportOverFCGI(int selectors, String scriptRoot)
{
this(new ClientConnector(), scriptRoot);
getClientConnector().setSelectors(selectors);
this(newClientConnector(selectors), scriptRoot);
}

public HttpClientTransportOverFCGI(ClientConnector connector, String scriptRoot)
public HttpClientTransportOverFCGI(Connectable connector, String scriptRoot)
{
super(connector);
this.scriptRoot = scriptRoot;
Expand All @@ -63,6 +63,13 @@ public HttpClientTransportOverFCGI(ClientConnector connector, String scriptRoot)
});
}

private static ClientConnector newClientConnector(int selectors)
{
ClientConnector connector = new ClientConnector();
connector.setSelectors(selectors);
return connector;
}

@ManagedAttribute(value = "The scripts root directory", readonly = true)
public String getScriptRoot()
{
Expand Down
Expand Up @@ -31,10 +31,12 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.Connectable;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Container;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
Expand Down Expand Up @@ -101,7 +103,7 @@
@ManagedObject
public class HTTP2Client extends ContainerLifeCycle
{
private final ClientConnector connector;
private final Connectable connector;
private int inputBufferSize = 8192;
private List<String> protocols = List.of("h2");
private int initialSessionRecvWindow = 16 * 1024 * 1024;
Expand All @@ -121,45 +123,52 @@ public HTTP2Client()
this(new ClientConnector());
}

public HTTP2Client(ClientConnector connector)
public HTTP2Client(Connectable connector)
{
this.connector = connector;
addBean(connector);
}

public ClientConnector getClientConnector()
{
return connector;
ClientConnector result = null;
if (connector instanceof ClientConnector)
result = (ClientConnector)connector;
else if (connector instanceof Container)
result = ((Container)connector).getContainedBeans(ClientConnector.class).stream().findAny().orElse(null);
if (result == null)
throw new IllegalArgumentException(ClientConnector.class.getName() + " not found in " + this);
return result;
}

public Executor getExecutor()
{
return connector.getExecutor();
return getClientConnector().getExecutor();
}

public void setExecutor(Executor executor)
{
connector.setExecutor(executor);
getClientConnector().setExecutor(executor);
}

public Scheduler getScheduler()
{
return connector.getScheduler();
return getClientConnector().getScheduler();
}

public void setScheduler(Scheduler scheduler)
{
connector.setScheduler(scheduler);
getClientConnector().setScheduler(scheduler);
}

public ByteBufferPool getByteBufferPool()
{
return connector.getByteBufferPool();
return getClientConnector().getByteBufferPool();
}

public void setByteBufferPool(ByteBufferPool bufferPool)
{
connector.setByteBufferPool(bufferPool);
getClientConnector().setByteBufferPool(bufferPool);
}

public FlowControlStrategy.Factory getFlowControlStrategyFactory()
Expand All @@ -175,23 +184,23 @@ public void setFlowControlStrategyFactory(FlowControlStrategy.Factory flowContro
@ManagedAttribute("The number of selectors")
public int getSelectors()
{
return connector.getSelectors();
return getClientConnector().getSelectors();
}

public void setSelectors(int selectors)
{
connector.setSelectors(selectors);
getClientConnector().setSelectors(selectors);
}

@ManagedAttribute("The idle timeout in milliseconds")
public long getIdleTimeout()
{
return connector.getIdleTimeout().toMillis();
return getClientConnector().getIdleTimeout().toMillis();
}

public void setIdleTimeout(long idleTimeout)
{
connector.setIdleTimeout(Duration.ofMillis(idleTimeout));
getClientConnector().setIdleTimeout(Duration.ofMillis(idleTimeout));
}

@ManagedAttribute("The stream idle timeout in milliseconds")
Expand All @@ -208,33 +217,33 @@ public void setStreamIdleTimeout(long streamIdleTimeout)
@ManagedAttribute("The connect timeout in milliseconds")
public long getConnectTimeout()
{
return connector.getConnectTimeout().toMillis();
return getClientConnector().getConnectTimeout().toMillis();
}

public void setConnectTimeout(long connectTimeout)
{
connector.setConnectTimeout(Duration.ofMillis(connectTimeout));
getClientConnector().setConnectTimeout(Duration.ofMillis(connectTimeout));
}

@ManagedAttribute("Whether the connect() operation is blocking")
public boolean isConnectBlocking()
{
return connector.isConnectBlocking();
return getClientConnector().isConnectBlocking();
}

public void setConnectBlocking(boolean connectBlocking)
{
connector.setConnectBlocking(connectBlocking);
getClientConnector().setConnectBlocking(connectBlocking);
}

public SocketAddress getBindAddress()
{
return connector.getBindAddress();
return getClientConnector().getBindAddress();
}

public void setBindAddress(SocketAddress bindAddress)
{
connector.setBindAddress(bindAddress);
getClientConnector().setBindAddress(bindAddress);
}

@ManagedAttribute("The size of the buffer used to read from the network")
Expand Down Expand Up @@ -404,7 +413,7 @@ public void accept(SocketChannel channel, ClientConnectionFactory factory, Sessi
{
Map<String, Object> context = contextFrom(factory, listener, promise, null);
context.put(ClientConnector.CONNECTION_PROMISE_CONTEXT_KEY, Promise.from(ioConnection -> {}, promise::failed));
connector.accept(channel, context);
getClientConnector().accept(channel, context);
}

private Map<String, Object> contextFrom(ClientConnectionFactory factory, Session.Listener listener, Promise<Session> promise, Map<String, Object> context)
Expand Down
Expand Up @@ -17,6 +17,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Objects;

import org.eclipse.jetty.alpn.client.ALPNClientConnectionFactory;
import org.eclipse.jetty.client.AbstractHttpClientTransport;
Expand Down Expand Up @@ -47,8 +48,8 @@ public class HttpClientTransportOverHTTP2 extends AbstractHttpClientTransport

public HttpClientTransportOverHTTP2(HTTP2Client client)
{
this.client = client;
addBean(client.getClientConnector(), false);
this.client = Objects.requireNonNull(client);
addBean(client);
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
Expand Down Expand Up @@ -93,17 +94,9 @@ protected void doStart() throws Exception
client.setUseInputDirectByteBuffers(httpClient.isUseInputDirectByteBuffers());
client.setUseOutputDirectByteBuffers(httpClient.isUseOutputDirectByteBuffers());
}
addBean(client);
super.doStart();
}

@Override
protected void doStop() throws Exception
{
super.doStop();
removeBean(client);
}

@Override
public Origin newOrigin(HttpRequest request)
{
Expand Down
Expand Up @@ -35,12 +35,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientConnector extends ContainerLifeCycle
public class ClientConnector extends ContainerLifeCycle implements Connectable
{
public static final String CLIENT_CONNECTOR_CONTEXT_KEY = "org.eclipse.jetty.client.connector";
public static final String REMOTE_SOCKET_ADDRESS_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".remoteSocketAddress";
public static final String CLIENT_CONNECTION_FACTORY_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".clientConnectionFactory";
public static final String CONNECTION_PROMISE_CONTEXT_KEY = CLIENT_CONNECTOR_CONTEXT_KEY + ".connectionPromise";
private static final Logger LOG = LoggerFactory.getLogger(ClientConnector.class);

private Executor executor;
Expand Down Expand Up @@ -211,6 +209,7 @@ protected SelectorManager newSelectorManager()
return new ClientSelectorManager(getExecutor(), getScheduler(), getSelectors());
}

@Override
public void connect(SocketAddress address, Map<String, Object> context)
{
SocketChannel channel = null;
Expand Down

0 comments on commit 64ca8c9

Please sign in to comment.