Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use RetainableByteBuffer and write a new pool for it #6332

Merged
merged 1 commit into from Jul 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -54,10 +54,12 @@
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ArrayRetainableByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.Fields;
import org.eclipse.jetty.util.Jetty;
Expand Down Expand Up @@ -193,12 +195,14 @@ protected void doStart() throws Exception
threadPool.setName(name);
setExecutor(threadPool);
}
int maxBucketSize = executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2;
ByteBufferPool byteBufferPool = getByteBufferPool();
if (byteBufferPool == null)
setByteBufferPool(new MappedByteBufferPool(2048,
executor instanceof ThreadPool.SizedThreadPool
? ((ThreadPool.SizedThreadPool)executor).getMaxThreads() / 2
: ProcessorUtils.availableProcessors() * 2));
setByteBufferPool(new MappedByteBufferPool(2048, maxBucketSize));
if (getBean(RetainableByteBufferPool.class) == null)
addBean(new ArrayRetainableByteBufferPool(0, 2048, 65536, maxBucketSize));
Scheduler scheduler = getScheduler();
if (scheduler == null)
setScheduler(new ScheduledExecutorScheduler(name + "-scheduler", false));
Expand Down
Expand Up @@ -29,9 +29,9 @@
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.slf4j.Logger;
Expand All @@ -43,6 +43,7 @@ public class HttpReceiverOverHTTP extends HttpReceiver implements HttpParser.Res

private final LongAdder inMessages = new LongAdder();
private final HttpParser parser;
private final RetainableByteBufferPool retainableByteBufferPool;
private RetainableByteBuffer networkBuffer;
private boolean shutdown;
private boolean complete;
Expand All @@ -61,6 +62,8 @@ public HttpReceiverOverHTTP(HttpChannelOverHTTP channel)
parser.setHeaderCacheSize(httpTransport.getHeaderCacheSize());
parser.setHeaderCacheCaseSensitive(httpTransport.isHeaderCacheCaseSensitive());
}

this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(httpClient, httpClient.getByteBufferPool());
}

@Override
Expand Down Expand Up @@ -111,9 +114,8 @@ private void reacquireNetworkBuffer()
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = getHttpDestination().getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
boolean direct = client.isUseInputDirectByteBuffers();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), direct);
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), direct);
}

private void releaseNetworkBuffer()
Expand Down Expand Up @@ -166,7 +168,7 @@ private void process()
return;
}

if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();

// The networkBuffer may have been reacquired.
Expand Down
Expand Up @@ -44,9 +44,9 @@
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand All @@ -71,6 +71,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private final ClientParser parser;
private RetainableByteBuffer networkBuffer;
private Object attachment;
private final RetainableByteBufferPool retainableByteBufferPool;

public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
Expand All @@ -81,6 +82,9 @@ public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Pr
this.delegate = new Delegate(destination);
this.parser = new ClientParser(new ResponseListener());
requests.addLast(0);

HttpClient client = destination.getHttpClient();
this.retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, client.getByteBufferPool());
}

public HttpDestination getHttpDestination()
Expand Down Expand Up @@ -135,8 +139,7 @@ private void reacquireNetworkBuffer()
private RetainableByteBuffer newNetworkBuffer()
{
HttpClient client = destination.getHttpClient();
ByteBufferPool bufferPool = client.getByteBufferPool();
return new RetainableByteBuffer(bufferPool, client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
return retainableByteBufferPool.acquire(client.getResponseBufferSize(), client.isUseInputDirectByteBuffers());
}

private void releaseNetworkBuffer()
Expand All @@ -161,7 +164,7 @@ void process()
if (parse(networkBuffer.getBuffer()))
return;

if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();

// The networkBuffer may have been reacquired.
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.LifeCycle;
Expand Down Expand Up @@ -67,7 +68,9 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
parser.setMaxFrameLength(client.getMaxFrameLength());
parser.setMaxSettingsKeys(client.getMaxSettingsKeys());

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, byteBufferPool, executor, endPoint,
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(client, byteBufferPool);

HTTP2ClientConnection connection = new HTTP2ClientConnection(client, retainableByteBufferPool, executor, endPoint,
parser, session, client.getInputBufferSize(), promise, listener);
connection.setUseInputDirectByteBuffers(client.isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(client.isUseOutputDirectByteBuffers());
Expand All @@ -81,9 +84,9 @@ private static class HTTP2ClientConnection extends HTTP2Connection implements Ca
private final Promise<Session> promise;
private final Session.Listener listener;

private HTTP2ClientConnection(HTTP2Client client, ByteBufferPool byteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
private HTTP2ClientConnection(HTTP2Client client, RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endpoint, Parser parser, ISession session, int bufferSize, Promise<Session> promise, Session.Listener listener)
{
super(byteBufferPool, executor, endpoint, parser, session, bufferSize);
super(retainableByteBufferPool, executor, endpoint, parser, session, bufferSize);
this.client = client;
this.promise = promise;
this.listener = listener;
Expand Down
Expand Up @@ -23,10 +23,10 @@
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.io.WriteFlusher;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
Expand All @@ -45,18 +45,18 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
private final Queue<Runnable> tasks = new ArrayDeque<>();
private final HTTP2Producer producer = new HTTP2Producer();
private final AtomicLong bytesIn = new AtomicLong();
private final ByteBufferPool byteBufferPool;
private final RetainableByteBufferPool retainableByteBufferPool;
private final Parser parser;
private final ISession session;
private final int bufferSize;
private final ExecutionStrategy strategy;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;

public HTTP2Connection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
protected HTTP2Connection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, Parser parser, ISession session, int bufferSize)
{
super(endPoint, executor);
this.byteBufferPool = byteBufferPool;
this.retainableByteBufferPool = retainableByteBufferPool;
this.parser = parser;
this.session = session;
this.bufferSize = bufferSize;
Expand Down Expand Up @@ -287,7 +287,7 @@ public Runnable produce()
return task;

// If more references than 1 (ie not just us), don't refill into buffer and risk compaction.
if (networkBuffer.getReferences() > 1)
if (networkBuffer.isRetained())
reacquireNetworkBuffer();
}

Expand Down Expand Up @@ -415,16 +415,43 @@ public void onConnectionFailure(int error, String reason)
}
}

private class NetworkBuffer extends RetainableByteBuffer implements Callback
private class NetworkBuffer implements Callback
sbordet marked this conversation as resolved.
Show resolved Hide resolved
{
private final RetainableByteBuffer delegate;

private NetworkBuffer()
{
super(byteBufferPool, bufferSize, isUseInputDirectByteBuffers());
delegate = retainableByteBufferPool.acquire(bufferSize, isUseInputDirectByteBuffers());
}

public ByteBuffer getBuffer()
{
return delegate.getBuffer();
}

public boolean isRetained()
{
return delegate.isRetained();
}

public boolean hasRemaining()
{
return delegate.hasRemaining();
}

public boolean release()
{
return delegate.release();
}

public void retain()
{
delegate.retain();
}

private void put(ByteBuffer source)
{
BufferUtil.append(getBuffer(), source);
BufferUtil.append(delegate.getBuffer(), source);
}

@Override
Expand All @@ -441,7 +468,7 @@ public void failed(Throwable failure)

private void completed(Throwable failure)
{
if (release() == 0)
if (delegate.release())
{
if (LOG.isDebugEnabled())
LOG.debug("Released retained {}", this, failure);
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.eclipse.jetty.http2.parser.WindowRateControl;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
Expand Down Expand Up @@ -279,7 +280,9 @@ public Connection newConnection(Connector connector, EndPoint endPoint)
parser.setMaxFrameLength(getMaxFrameLength());
parser.setMaxSettingsKeys(getMaxSettingsKeys());

HTTP2Connection connection = new HTTP2ServerConnection(connector.getByteBufferPool(), connector.getExecutor(),
RetainableByteBufferPool retainableByteBufferPool = RetainableByteBufferPool.findOrAdapt(connector, connector.getByteBufferPool());

HTTP2Connection connection = new HTTP2ServerConnection(retainableByteBufferPool, connector.getExecutor(),
endPoint, httpConfiguration, parser, session, getInputBufferSize(), listener);
connection.setUseInputDirectByteBuffers(isUseInputDirectByteBuffers());
connection.setUseOutputDirectByteBuffers(isUseOutputDirectByteBuffers());
Expand Down
Expand Up @@ -45,8 +45,8 @@
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBufferPool;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.BufferUtil;
Expand Down Expand Up @@ -87,9 +87,9 @@ public static boolean isSupportedProtocol(String protocol)
private final HttpConfiguration httpConfig;
private boolean recycleHttpChannels = true;

public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
public HTTP2ServerConnection(RetainableByteBufferPool retainableByteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize);
super(retainableByteBufferPool, executor, endPoint, parser, session, inputBufferSize);
this.listener = listener;
this.httpConfig = httpConfig;
}
Expand Down