Skip to content

Commit

Permalink
#8161 make RetainableByteBufferPool a 1st class citizen in the API
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Jun 15, 2022
1 parent 66de7ba commit 0471fe0
Show file tree
Hide file tree
Showing 52 changed files with 283 additions and 106 deletions.
Expand Up @@ -387,7 +387,8 @@ public void writeLine(String line, Callback callback)

// Wrap the "telnet" ClientConnectionFactory with the SslClientConnectionFactory.
connectionFactory = new SslClientConnectionFactory(clientConnector.getSslContextFactory(),
clientConnector.getByteBufferPool(), clientConnector.getExecutor(), connectionFactory);
clientConnector.getByteBufferPool(), clientConnector.getRetainableByteBufferPool(),
clientConnector.getExecutor(), connectionFactory);

// We will obtain a SslConnection now.
CompletableFuture<SslConnection> connectionPromise = new Promise.Completable<>();
Expand Down
Expand Up @@ -627,6 +627,14 @@ public ByteBufferPool getByteBufferPool()
return connector.getByteBufferPool();
}

/**
* @return the {@link RetainableByteBufferPool} of this HttpClient
*/
public RetainableByteBufferPool getRetainableByteBufferPool()
{
return connector.getRetainableByteBufferPool();
}

/**
* @param byteBufferPool the {@link ByteBufferPool} of this HttpClient
*/
Expand Down Expand Up @@ -1157,7 +1165,7 @@ protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactor
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory);
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getRetainableByteBufferPool(), getExecutor(), connectionFactory);
}

private class ContentDecoderFactorySet implements Set<ContentDecoder.Factory>
Expand Down
Expand Up @@ -64,7 +64,7 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}

this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
this.retainableByteBufferPool = httpClient.getRetainableByteBufferPool();
}

@Override
Expand Down
Expand Up @@ -633,7 +633,7 @@ public void testNeverUsedConnectionThenServerIdleTimeout() throws Exception
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(connector.getRetainableByteBufferPool(), connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected int networkFill(ByteBuffer input) throws IOException
Expand Down Expand Up @@ -667,12 +667,12 @@ protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactor
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getRetainableByteBufferPool(), getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
protected SslConnection newSslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(retainableByteBufferPool, byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected int networkFill(ByteBuffer input) throws IOException
Expand Down Expand Up @@ -1017,7 +1017,7 @@ public void testNeverUsedConnectionThenClientIdleTimeout() throws Exception
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(connector.getRetainableByteBufferPool(), connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected int networkFill(ByteBuffer input) throws IOException
Expand Down Expand Up @@ -1052,12 +1052,12 @@ protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactor
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getRetainableByteBufferPool(), getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
protected SslConnection newSslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(retainableByteBufferPool, byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected int networkFill(ByteBuffer input) throws IOException
Expand Down Expand Up @@ -1114,12 +1114,12 @@ protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactor
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getRetainableByteBufferPool(), getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
protected SslConnection newSslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(retainableByteBufferPool, byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected SSLEngineResult wrap(SSLEngine sslEngine, ByteBuffer[] input, ByteBuffer output) throws SSLException
Expand Down Expand Up @@ -1158,7 +1158,7 @@ public void testTLSLargeFragments() throws Exception
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(connector.getRetainableByteBufferPool(), connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected SSLEngineResult unwrap(SSLEngine sslEngine, ByteBuffer input, ByteBuffer output) throws SSLException
Expand Down Expand Up @@ -1194,12 +1194,12 @@ protected ClientConnectionFactory newSslClientConnectionFactory(SslContextFactor
{
if (sslContextFactory == null)
sslContextFactory = getSslContextFactory();
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), connectionFactory)
return new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getRetainableByteBufferPool(), getExecutor(), connectionFactory)
{
@Override
protected SslConnection newSslConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
protected SslConnection newSslConnection(RetainableByteBufferPool retainableByteBufferPool, ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
return new SslConnection(retainableByteBufferPool, byteBufferPool, executor, endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption())
{
@Override
protected SSLEngineResult wrap(SSLEngine sslEngine, ByteBuffer[] input, ByteBuffer output) throws SSLException
Expand Down
Expand Up @@ -157,7 +157,7 @@ protected boolean onReadTimeout(Throwable timeout)
@Override
protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine)
{
return new SslConnection(connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine)
return new SslConnection(connector.getRetainableByteBufferPool(), connector.getByteBufferPool(), connector.getExecutor(), endPoint, engine)
{
@Override
protected DecryptedEndPoint newDecryptedEndPoint()
Expand Down
Expand Up @@ -19,10 +19,12 @@
import javax.net.ssl.SSLHandshakeException;

import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.BufferUtil;
Expand All @@ -44,12 +46,13 @@ public void testSslConnectionClosedBeforeFill() throws Exception
sslContextFactory.start();

ByteBufferPool byteBufferPool = new MappedByteBufferPool();
RetainableByteBufferPool retainableByteBufferPool = new ArrayRetainableByteBufferPool();
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.start();
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
SSLEngine sslEngine = sslContextFactory.newSSLEngine();
sslEngine.setUseClientMode(false);
SslConnection sslConnection = new SslConnection(byteBufferPool, threadPool, endPoint, sslEngine);
SslConnection sslConnection = new SslConnection(retainableByteBufferPool, byteBufferPool, threadPool, endPoint, sslEngine);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
sslEndPoint.setConnection(new AbstractConnection(sslEndPoint, threadPool)
{
Expand Down
Expand Up @@ -79,7 +79,7 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);
HttpClient client = destination.getHttpClient();
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
this.networkByteBufferPool = client.getRetainableByteBufferPool();
}

public HttpDestination getHttpDestination()
Expand Down
Expand Up @@ -50,7 +50,7 @@ public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfigur
{
super(endPoint, connector.getExecutor());
this.connector = connector;
this.networkByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
this.networkByteBufferPool = connector.getRetainableByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
Expand Down Expand Up @@ -158,6 +159,11 @@ public ByteBufferPool getByteBufferPool()
return connector.getByteBufferPool();
}

public RetainableByteBufferPool getRetainableByteBufferPool()
{
return connector.getRetainableByteBufferPool();
}

public void setByteBufferPool(ByteBufferPool bufferPool)
{
connector.setByteBufferPool(bufferPool);
Expand Down Expand Up @@ -437,7 +443,7 @@ private ClientConnectionFactory newClientConnectionFactory(SslContextFactory ssl
{
if (isUseALPN())
factory = new ALPNClientConnectionFactory(getExecutor(), factory, getProtocols());
factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getExecutor(), factory);
factory = new SslClientConnectionFactory(sslContextFactory, getByteBufferPool(), getRetainableByteBufferPool(), getExecutor(), factory);
}
return factory;
}
Expand Down
Expand Up @@ -68,7 +68,7 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);
RetainableByteBufferPool retainableByteBufferPool = client.getRetainableByteBufferPool();

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
Expand Down
Expand Up @@ -280,7 +280,7 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());

RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());
RetainableByteBufferPool retainableByteBufferPool = connector.getRetainableByteBufferPool();

HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
Expand Down
Expand Up @@ -23,7 +23,7 @@ public class ClientHTTP3StreamConnection extends HTTP3StreamConnection

public ClientHTTP3StreamConnection(QuicStreamEndPoint endPoint, ClientHTTP3Session session, MessageParser parser)
{
super(endPoint, session.getQuicSession().getExecutor(), session.getQuicSession().getByteBufferPool(), parser);
super(endPoint, session.getQuicSession().getExecutor(), session.getQuicSession().getRetainableByteBufferPool(), parser);
this.session = session;
}

Expand Down
Expand Up @@ -26,7 +26,6 @@
import org.eclipse.jetty.http3.internal.parser.MessageParser;
import org.eclipse.jetty.http3.internal.parser.ParserListener;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.quic.common.QuicStreamEndPoint;
Expand Down Expand Up @@ -54,10 +53,10 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private boolean noData;
private boolean remotelyClosed;

public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool byteBufferPool, MessageParser parser)
public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, RetainableByteBufferPool retainableByteBufferPool, MessageParser parser)
{
super(endPoint, executor);
this.buffers = RetainableByteBufferPool.findOrAdapt(null, byteBufferPool);
this.buffers = retainableByteBufferPool;
this.parser = parser;
parser.init(MessageListener::new);
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.PreEncodedHttpField;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.quic.server.QuicServerConnector;
import org.eclipse.jetty.server.ConnectionFactory;
import org.eclipse.jetty.server.Server;
Expand All @@ -41,12 +42,17 @@ public class HTTP3ServerConnector extends QuicServerConnector

public HTTP3ServerConnector(Server server, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
{
this(server, null, null, null, sslContextFactory, factories);
this(server, null, null, null, null, sslContextFactory, factories);
}

public HTTP3ServerConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
{
super(server, executor, scheduler, bufferPool, sslContextFactory, factories);
this(server, executor, scheduler, bufferPool, null, sslContextFactory, factories);
}

public HTTP3ServerConnector(Server server, Executor executor, Scheduler scheduler, ByteBufferPool bufferPool, RetainableByteBufferPool retainableBufferPool, SslContextFactory.Server sslContextFactory, ConnectionFactory... factories)
{
super(server, executor, scheduler, bufferPool, retainableBufferPool, sslContextFactory, factories);
// Max concurrent streams that a client can open.
getQuicConfiguration().setMaxBidirectionalRemoteStreams(128);
// HTTP/3 requires a few mandatory unidirectional streams.
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class ServerHTTP3StreamConnection extends HTTP3StreamConnection

public ServerHTTP3StreamConnection(Connector connector, HttpConfiguration httpConfiguration, QuicStreamEndPoint endPoint, ServerHTTP3Session session, MessageParser parser)
{
super(endPoint, connector.getExecutor(), connector.getByteBufferPool(), parser);
super(endPoint, connector.getExecutor(), connector.getRetainableByteBufferPool(), parser);
this.connector = connector;
this.httpConfiguration = httpConfiguration;
this.session = session;
Expand Down

0 comments on commit 0471fe0

Please sign in to comment.