Skip to content

Commit

Permalink
#6322 Use RetainableByteBuffer and write a new pool for it
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Jul 12, 2021
1 parent a760f71 commit cfc0a9b
Show file tree
Hide file tree
Showing 20 changed files with 944 additions and 133 deletions.
Expand Up @@ -54,10 +54,12 @@
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Jetty;
Expand Down Expand Up @@ -193,12 +195,14 @@ protected void doStart() throws Exception
threadPool.setName(name);
setExecutor(threadPool);
}
int maxBucketSize = executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2;
ByteBufferPool byteBufferPool = getByteBufferPool();
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool(2048,
executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2));
setByteBufferPool(new MappedByteBufferPool(2048, maxBucketSize));
if (getBean(RetainableByteBufferPool.class) == null)
addBean(new ArrayRetainableByteBufferPool(0, 2048, 65536, maxBucketSize));
Scheduler scheduler = getScheduler();
if (scheduler == null)
setScheduler(new ScheduledExecutorScheduler(name + "-scheduler", false));
Expand Down
Expand Up @@ -29,9 +29,9 @@
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
Expand All @@ -43,6 +43,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res

private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
private final RetainableByteBufferPool retainableByteBufferPool;
private RetainableByteBuffer networkBuffer;
private boolean shutdown;
private boolean complete;
Expand All @@ -61,6 +62,8 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
parser.setHeaderCacheSize(httpTransport.getHeaderCacheSize());
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}

this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
}

@Override
Expand Down Expand Up @@ -111,9 +114,8 @@ private void reacquireNetworkBuffer()
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
boolean direct = client.isUseInputDirectByteBuffers();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), direct);
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), direct);
}

private void releaseNetworkBuffer()
Expand Down Expand Up @@ -166,7 +168,7 @@ private void process()
return;
}

if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();

// The networkBuffer may have been reacquired.
Expand Down
Expand Up @@ -44,9 +44,9 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand All @@ -71,6 +71,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private final ClientParser parser;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private final RetainableByteBufferPool retainableByteBufferPool;

public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
Expand All @@ -81,6 +82,9 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);

HttpClient client = destination.getHttpClient();
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
}

public HttpDestination getHttpDestination()
Expand Down Expand Up @@ -135,8 +139,7 @@ private void reacquireNetworkBuffer()
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
}

private void releaseNetworkBuffer()
Expand All @@ -161,7 +164,7 @@ void process()
if (parse(networkBuffer.getBuffer()))
return;

if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();

// The networkBuffer may have been reacquired.
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
Expand Down Expand Up @@ -67,7 +68,9 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.setUseInputDirectByteBuffers(client.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(client.isUseOutputDirectByteBuffers());
Expand All @@ -81,9 +84,9 @@ private static class HTTP2ClientConnection extends HTTP2Connection implements Ca
private final Promise<Session> promise;
private final Session.Listener listener;

private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
super(retainableByteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
this.promise = promise;
this.listener = listener;
Expand Down
Expand Up @@ -23,10 +23,10 @@
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand All @@ -45,18 +45,18 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong();
private final ByteBufferPool byteBufferPool;
private final RetainableByteBufferPool retainableByteBufferPool;
private final Parser parser;
private final ISession session;
private final int bufferSize;
private final ExecutionStrategy strategy;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;

public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
protected HTTP2Connection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.retainableByteBufferPool = retainableByteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
Expand Down Expand Up @@ -287,7 +287,7 @@ public Runnable produce()
return task;

// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
}

Expand Down Expand Up @@ -415,16 +415,43 @@ public void onConnectionFailure(int error, String reason)
}
}

private class NetworkBuffer extends RetainableByteBuffer implements Callback
private class NetworkBuffer implements Callback
{
private final RetainableByteBuffer delegate;

private NetworkBuffer()
{
super(byteBufferPool, bufferSize, isUseInputDirectByteBuffers());
delegate = retainableByteBufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
}

public ByteBuffer getBuffer()
{
return delegate.getBuffer();
}

public boolean isRetained()
{
return delegate.isRetained();
}

public boolean hasRemaining()
{
return delegate.hasRemaining();
}

public boolean release()
{
return delegate.release();
}

public void retain()
{
delegate.retain();
}

private void put(ByteBuffer source)
{
BufferUtil.append(getBuffer(), source);
BufferUtil.append(delegate.getBuffer(), source);
}

@Override
Expand All @@ -441,7 +468,7 @@ public void failed(Throwable failure)

private void completed(Throwable failure)
{
if (release() == 0)
if (delegate.release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this, failure);
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.jetty.http2.parser.WindowRateControl;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
Expand Down Expand Up @@ -279,7 +280,9 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());

HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());

HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
Expand Down
Expand Up @@ -45,8 +45,8 @@
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
Expand Down Expand Up @@ -87,9 +87,9 @@ public static boolean isSupportedProtocol(String protocol)
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels = true;

public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
super(retainableByteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.httpConfig = httpConfig;
}
Expand Down

0 comments on commit cfc0a9b

Please sign in to comment.