diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java index d8ad678c430c..e4d5358cf78d 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTLSTest.java @@ -22,12 +22,14 @@ import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLEngine; @@ -36,6 +38,8 @@ import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLPeerUnverifiedException; import javax.net.ssl.SSLSocket; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP; @@ -43,12 +47,16 @@ import org.eclipse.jetty.http.HttpHeaderValue; import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.ArrayRetainableByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ClientConnectionFactory; import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.ConnectionStatistics; import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.RetainableByteBuffer; +import org.eclipse.jetty.io.RetainableByteBufferPool; import org.eclipse.jetty.io.ssl.SslClientConnectionFactory; import org.eclipse.jetty.io.ssl.SslConnection; import org.eclipse.jetty.io.ssl.SslHandshakeListener; @@ -56,11 +64,13 @@ import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConnectionFactory; +import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.SecureRequestCustomizer; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.SslConnectionFactory; import org.eclipse.jetty.toolchain.test.Net; +import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.thread.ExecutorThreadPool; @@ -71,9 +81,14 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledForJreRange; import org.junit.jupiter.api.condition.JRE; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -682,12 +697,7 @@ protected int networkFill(ByteBuffer input) throws IOException // Trigger the creation of a new connection, but don't use it. ConnectionPoolHelper.tryCreate(connectionPool); // Verify that the connection has been created. - while (true) - { - Thread.sleep(50); - if (connectionPool.getConnectionCount() == 1) - break; - } + await().atMost(5, TimeUnit.SECONDS).until(connectionPool::getConnectionCount, is(1)); // Wait for the server to idle timeout the connection. Thread.sleep(idleTimeout + idleTimeout / 2); @@ -698,6 +708,299 @@ protected int networkFill(ByteBuffer input) throws IOException assertEquals(0, clientBytes.get()); } + @Test + public void testEncryptedInputBufferRepooling() throws Exception + { + SslContextFactory.Server serverTLSFactory = createServerSslContextFactory(); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + var retainableByteBufferPool = new ArrayRetainableByteBufferPool() + { + @Override + public Pool poolFor(int capacity, boolean direct) + { + return super.poolFor(capacity, direct); + } + }; + server.addBean(retainableByteBufferPool); + HttpConfiguration httpConfig = new HttpConfiguration(); + httpConfig.addCustomizer(new SecureRequestCustomizer()); + HttpConnectionFactory http = new HttpConnectionFactory(httpConfig); + SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol()) + { + @Override + protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine) + { + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class); + return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption()) + { + @Override + protected int networkFill(ByteBuffer input) throws IOException + { + int n = super.networkFill(input); + if (n > 0) + throw new IOException("boom"); + return n; + } + }; + } + }; + connector = new ServerConnector(server, 1, 1, ssl, http); + server.addConnector(connector); + server.setHandler(new EmptyServerHandler()); + server.start(); + + SslContextFactory.Client clientTLSFactory = createClientSslContextFactory(); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSelectors(1); + clientConnector.setSslContextFactory(clientTLSFactory); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + clientConnector.setExecutor(clientThreads); + client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector)); + client.setExecutor(clientThreads); + client.start(); + + assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send()); + + Pool bucket = retainableByteBufferPool.poolFor(16 * 1024 + 1, ssl.isDirectBuffersForEncryption()); + assertEquals(1, bucket.size()); + assertEquals(1, bucket.getIdleCount()); + } + + @Test + public void testEncryptedOutputBufferRepooling() throws Exception + { + SslContextFactory.Server serverTLSFactory = createServerSslContextFactory(); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + List leakedBuffers = new ArrayList<>(); + ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool() + { + @Override + public ByteBuffer acquire(int size, boolean direct) + { + ByteBuffer acquired = super.acquire(size, direct); + leakedBuffers.add(acquired); + return acquired; + } + + @Override + public void release(ByteBuffer buffer) + { + leakedBuffers.remove(buffer); + super.release(buffer); + } + }; + server.addBean(byteBufferPool); + HttpConfiguration httpConfig = new HttpConfiguration(); + httpConfig.addCustomizer(new SecureRequestCustomizer()); + HttpConnectionFactory http = new HttpConnectionFactory(httpConfig); + SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol()) + { + @Override + protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine) + { + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class); + return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption()) + { + @Override + protected boolean networkFlush(ByteBuffer output) throws IOException + { + throw new IOException("bang"); + } + }; + } + }; + connector = new ServerConnector(server, 1, 1, ssl, http); + server.addConnector(connector); + server.setHandler(new EmptyServerHandler()); + server.start(); + + SslContextFactory.Client clientTLSFactory = createClientSslContextFactory(); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSelectors(1); + clientConnector.setSslContextFactory(clientTLSFactory); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + clientConnector.setExecutor(clientThreads); + client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector)); + client.setExecutor(clientThreads); + client.start(); + + assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send()); + + await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty())); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEncryptedOutputBufferRepoolingAfterNetworkFlushReturnsFalse(boolean close) throws Exception + { + SslContextFactory.Server serverTLSFactory = createServerSslContextFactory(); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + List leakedBuffers = new ArrayList<>(); + ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool() + { + @Override + public ByteBuffer acquire(int size, boolean direct) + { + ByteBuffer acquired = super.acquire(size, direct); + leakedBuffers.add(acquired); + return acquired; + } + + @Override + public void release(ByteBuffer buffer) + { + leakedBuffers.remove(buffer); + super.release(buffer); + } + }; + server.addBean(byteBufferPool); + HttpConfiguration httpConfig = new HttpConfiguration(); + httpConfig.addCustomizer(new SecureRequestCustomizer()); + HttpConnectionFactory http = new HttpConnectionFactory(httpConfig); + AtomicBoolean failFlush = new AtomicBoolean(false); + SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol()) + { + @Override + protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine) + { + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class); + return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption()) + { + @Override + protected boolean networkFlush(ByteBuffer output) throws IOException + { + if (failFlush.get()) + return false; + return super.networkFlush(output); + } + }; + } + }; + connector = new ServerConnector(server, 1, 1, ssl, http); + server.addConnector(connector); + server.setHandler(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + failFlush.set(true); + if (close) + jettyRequest.getHttpChannel().getEndPoint().close(); + else + jettyRequest.getHttpChannel().getEndPoint().shutdownOutput(); + } + }); + server.start(); + + SslContextFactory.Client clientTLSFactory = createClientSslContextFactory(); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSelectors(1); + clientConnector.setSslContextFactory(clientTLSFactory); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + clientConnector.setExecutor(clientThreads); + client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector)); + client.setExecutor(clientThreads); + client.start(); + + assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send()); + + await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty())); + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testEncryptedOutputBufferRepoolingAfterNetworkFlushThrows(boolean close) throws Exception + { + SslContextFactory.Server serverTLSFactory = createServerSslContextFactory(); + QueuedThreadPool serverThreads = new QueuedThreadPool(); + serverThreads.setName("server"); + server = new Server(serverThreads); + List leakedBuffers = new ArrayList<>(); + ArrayByteBufferPool byteBufferPool = new ArrayByteBufferPool() + { + @Override + public ByteBuffer acquire(int size, boolean direct) + { + ByteBuffer acquired = super.acquire(size, direct); + leakedBuffers.add(acquired); + return acquired; + } + + @Override + public void release(ByteBuffer buffer) + { + leakedBuffers.remove(buffer); + super.release(buffer); + } + }; + server.addBean(byteBufferPool); + HttpConfiguration httpConfig = new HttpConfiguration(); + httpConfig.addCustomizer(new SecureRequestCustomizer()); + HttpConnectionFactory http = new HttpConnectionFactory(httpConfig); + AtomicBoolean failFlush = new AtomicBoolean(false); + SslConnectionFactory ssl = new SslConnectionFactory(serverTLSFactory, http.getProtocol()) + { + @Override + protected SslConnection newSslConnection(Connector connector, EndPoint endPoint, SSLEngine engine) + { + ByteBufferPool byteBufferPool = connector.getByteBufferPool(); + RetainableByteBufferPool retainableByteBufferPool = connector.getBean(RetainableByteBufferPool.class); + return new SslConnection(retainableByteBufferPool, byteBufferPool, connector.getExecutor(), endPoint, engine, isDirectBuffersForEncryption(), isDirectBuffersForDecryption()) + { + @Override + protected boolean networkFlush(ByteBuffer output) throws IOException + { + if (failFlush.get()) + throw new IOException(); + return super.networkFlush(output); + } + }; + } + }; + connector = new ServerConnector(server, 1, 1, ssl, http); + server.addConnector(connector); + server.setHandler(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + failFlush.set(true); + if (close) + jettyRequest.getHttpChannel().getEndPoint().close(); + else + jettyRequest.getHttpChannel().getEndPoint().shutdownOutput(); + } + }); + server.start(); + + SslContextFactory.Client clientTLSFactory = createClientSslContextFactory(); + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSelectors(1); + clientConnector.setSslContextFactory(clientTLSFactory); + QueuedThreadPool clientThreads = new QueuedThreadPool(); + clientThreads.setName("client"); + clientConnector.setExecutor(clientThreads); + client = new HttpClient(new HttpClientTransportOverHTTP(clientConnector)); + client.setExecutor(clientThreads); + client.start(); + + assertThrows(Exception.class, () -> client.newRequest("localhost", connector.getLocalPort()).scheme(HttpScheme.HTTPS.asString()).send()); + + await().atMost(5, TimeUnit.SECONDS).until(() -> leakedBuffers, is(empty())); + } + @Test public void testNeverUsedConnectionThenClientIdleTimeout() throws Exception { @@ -780,12 +1083,7 @@ protected int networkFill(ByteBuffer input) throws IOException // Trigger the creation of a new connection, but don't use it. ConnectionPoolHelper.tryCreate(connectionPool); // Verify that the connection has been created. - while (true) - { - Thread.sleep(50); - if (connectionPool.getConnectionCount() == 1) - break; - } + await().atMost(5, TimeUnit.SECONDS).until(connectionPool::getConnectionCount, is(1)); // Wait for the client to idle timeout the connection. Thread.sleep(idleTimeout + idleTimeout / 2); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java index 03237c035ea1..759935f4bda0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java @@ -22,6 +22,10 @@ import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; +/** + * The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()} + * divided by 4.

+ */ @ManagedObject abstract class AbstractByteBufferPool implements ByteBufferPool { @@ -37,8 +41,8 @@ abstract class AbstractByteBufferPool implements ByteBufferPool * * @param factor the capacity factor * @param maxQueueLength the maximum ByteBuffer queue length - * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. - * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic */ protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 0cdb05235316..02a6130c9aee 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -34,6 +34,8 @@ *

Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers * each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity * 2048, and so on.

+ * The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()} + * divided by 4.

*/ @ManagedObject public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable @@ -48,6 +50,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpa /** * Creates a new ArrayByteBufferPool with a default configuration. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. */ public ArrayByteBufferPool() { @@ -56,6 +59,7 @@ public ArrayByteBufferPool() /** * Creates a new ArrayByteBufferPool with the given configuration. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -68,6 +72,7 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) /** * Creates a new ArrayByteBufferPool with the given configuration. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. * * @param minCapacity the minimum ByteBuffer capacity * @param factor the capacity factor @@ -86,8 +91,8 @@ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int max * @param factor the capacity factor * @param maxCapacity the maximum ByteBuffer capacity * @param maxQueueLength the maximum ByteBuffer queue length - * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. - * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic */ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java index 3fe42c147ee1..cf4c0e239bf4 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPool.java @@ -30,6 +30,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + *

A {@link RetainableByteBuffer} pool where RetainableByteBuffers are held in {@link Pool}s that are + * held in array elements.

+ *

Given a capacity {@code factor} of 1024, the first array element holds a Pool of RetainableByteBuffers + * each of capacity 1024, the second array element holds a Pool of RetainableByteBuffers each of capacity + * 2048, and so on.

+ * The {@code maxHeapMemory} and {@code maxDirectMemory} default heuristic is to use {@link Runtime#maxMemory()} + * divided by 4.

+ */ @ManagedObject public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, Dumpable { @@ -45,21 +54,56 @@ public class ArrayRetainableByteBufferPool implements RetainableByteBufferPool, private final AtomicLong _currentDirectMemory = new AtomicLong(); private final Function _bucketIndexFor; + /** + * Creates a new ArrayRetainableByteBufferPool with a default configuration. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. + */ public ArrayRetainableByteBufferPool() { - this(0, -1, -1, Integer.MAX_VALUE, -1L, -1L); + this(0, -1, -1, Integer.MAX_VALUE); } + /** + * Creates a new ArrayRetainableByteBufferPool with the given configuration. + * Both {@code maxHeapMemory} and {@code maxDirectMemory} default to 0 to use default heuristic. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param factor the capacity factor + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxBucketSize the maximum number of ByteBuffers for each bucket + */ public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize) { - this(minCapacity, factor, maxCapacity, maxBucketSize, -1L, -1L); + this(minCapacity, factor, maxCapacity, maxBucketSize, 0L, 0L); } + /** + * Creates a new ArrayRetainableByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param factor the capacity factor + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxBucketSize the maximum number of ByteBuffers for each bucket + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic + */ public ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory) { this(minCapacity, factor, maxCapacity, maxBucketSize, maxHeapMemory, maxDirectMemory, null, null); } + /** + * Creates a new ArrayRetainableByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param factor the capacity factor + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxBucketSize the maximum number of ByteBuffers for each bucket + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic + * @param bucketIndexFor a {@link Function} that takes a capacity and returns a bucket index + * @param bucketCapacity a {@link Function} that takes a bucket index and returns a capacity + */ protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxBucketSize, long maxHeapMemory, long maxDirectMemory, Function bucketIndexFor, Function bucketCapacity) { @@ -91,8 +135,8 @@ protected ArrayRetainableByteBufferPool(int minCapacity, int factor, int maxCapa _maxCapacity = maxCapacity; _direct = directArray; _indirect = indirectArray; - _maxHeapMemory = maxHeapMemory; - _maxDirectMemory = maxDirectMemory; + _maxHeapMemory = (maxHeapMemory != 0L) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4; + _maxDirectMemory = (maxDirectMemory != 0L) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4; _bucketIndexFor = bucketIndexFor; } @@ -156,6 +200,11 @@ private RetainableByteBuffer newRetainableByteBuffer(int capacity, boolean direc return retainableByteBuffer; } + protected Pool poolFor(int capacity, boolean direct) + { + return bucketFor(capacity, direct); + } + private Bucket bucketFor(int capacity, boolean direct) { if (capacity < _minCapacity) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java index aeb34eab9c25..62214a1f866a 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ssl/SslConnection.java @@ -419,8 +419,10 @@ public String toConnectionString() connection instanceof AbstractConnection ? ((AbstractConnection)connection).toConnectionString() : connection); } - private void releaseEncryptedInputBuffer() + private void releaseEmptyEncryptedInputBuffer() { + if (!_lock.isHeldByCurrentThread()) + throw new IllegalStateException(); if (_encryptedInput != null && !_encryptedInput.hasRemaining()) { _encryptedInput.release(); @@ -428,8 +430,10 @@ private void releaseEncryptedInputBuffer() } } - protected void releaseDecryptedInputBuffer() + private void releaseEmptyDecryptedInputBuffer() { + if (!_lock.isHeldByCurrentThread()) + throw new IllegalStateException(); if (_decryptedInput != null && !_decryptedInput.hasRemaining()) { _bufferPool.release(_decryptedInput); @@ -437,7 +441,31 @@ protected void releaseDecryptedInputBuffer() } } - private void releaseEncryptedOutputBuffer() + private void discardInputBuffers() + { + if (!_lock.isHeldByCurrentThread()) + throw new IllegalStateException(); + if (_encryptedInput != null) + _encryptedInput.clear(); + BufferUtil.clear(_decryptedInput); + releaseEmptyInputBuffers(); + } + + private void releaseEmptyInputBuffers() + { + releaseEmptyEncryptedInputBuffer(); + releaseEmptyDecryptedInputBuffer(); + } + + private void discardEncryptedOutputBuffer() + { + if (!_lock.isHeldByCurrentThread()) + throw new IllegalStateException(); + BufferUtil.clear(_encryptedOutput); + releaseEmptyEncryptedOutputBuffer(); + } + + private void releaseEmptyEncryptedOutputBuffer() { if (!_lock.isHeldByCurrentThread()) throw new IllegalStateException(); @@ -759,7 +787,7 @@ public int fill(ByteBuffer buffer) throws IOException // See also system property "jsse.SSLEngine.acceptLargeFragments". if (BufferUtil.isEmpty(_decryptedInput) && appBufferSize < getApplicationBufferSize()) { - releaseDecryptedInputBuffer(); + releaseEmptyDecryptedInputBuffer(); continue; } throw new IllegalStateException("Unexpected unwrap result " + unwrap); @@ -790,6 +818,7 @@ public int fill(ByteBuffer buffer) throws IOException } catch (Throwable x) { + discardInputBuffers(); Throwable f = handleException(x, "fill"); Throwable failure = handshakeFailed(f); if (_flushState == FlushState.WAIT_FOR_FILL) @@ -801,8 +830,7 @@ public int fill(ByteBuffer buffer) throws IOException } finally { - releaseEncryptedInputBuffer(); - releaseDecryptedInputBuffer(); + releaseEmptyInputBuffers(); if (_flushState == FlushState.WAIT_FOR_FILL) { @@ -988,26 +1016,26 @@ public boolean flush(ByteBuffer... appOuts) throws IOException } } - // finish of any previous flushes - if (_encryptedOutput != null) + Boolean result = null; + try { - int remaining = _encryptedOutput.remaining(); - if (remaining > 0) + // finish of any previous flushes + if (_encryptedOutput != null) { - boolean flushed = networkFlush(_encryptedOutput); - int written = remaining - _encryptedOutput.remaining(); - if (written > 0) - _bytesOut.addAndGet(written); - if (!flushed) - return false; + int remaining = _encryptedOutput.remaining(); + if (remaining > 0) + { + boolean flushed = networkFlush(_encryptedOutput); + int written = remaining - _encryptedOutput.remaining(); + if (written > 0) + _bytesOut.addAndGet(written); + if (!flushed) + return false; + } } - } - boolean isEmpty = BufferUtil.isEmpty(appOuts); + boolean isEmpty = BufferUtil.isEmpty(appOuts); - Boolean result = null; - try - { if (_flushState != FlushState.IDLE) return result = false; @@ -1121,7 +1149,7 @@ public boolean flush(ByteBuffer... appOuts) throws IOException // See also system property "jsse.SSLEngine.acceptLargeFragments". if (packetBufferSize < getPacketBufferSize()) { - releaseEncryptedOutputBuffer(); + releaseEmptyEncryptedOutputBuffer(); continue; } throw new IllegalStateException("Unexpected wrap result " + wrap); @@ -1159,12 +1187,13 @@ public boolean flush(ByteBuffer... appOuts) throws IOException } catch (Throwable x) { + discardEncryptedOutputBuffer(); Throwable failure = handleException(x, "flush"); throw handshakeFailed(failure); } finally { - releaseEncryptedOutputBuffer(); + releaseEmptyEncryptedOutputBuffer(); if (LOG.isDebugEnabled()) LOG.debug(" endPoint.close()), write); + }, t -> disconnect()), write); } } } if (close) - endPoint.close(); + disconnect(); else ensureFillInterested(); } catch (Throwable x) { - LOG.trace("IGNORED", x); - endPoint.close(); + if (LOG.isTraceEnabled()) + LOG.trace("IGNORED", x); + disconnect(); + } + } + + private void disconnect() + { + try (AutoLock l = _lock.lock()) + { + discardEncryptedOutputBuffer(); } + getEndPoint().close(); } private void closeOutbound() @@ -1382,9 +1426,12 @@ private boolean isOutboundDone() @Override public void doClose() { + try (AutoLock l = _lock.lock()) + { + discardInputBuffers(); + } // First send the TLS Close Alert, then the FIN. - doShutdownOutput(); - getEndPoint().close(); + doShutdownOutput(true); super.doClose(); } @@ -1537,7 +1584,7 @@ public void succeeded() { if (LOG.isDebugEnabled()) LOG.debug("IncompleteWriteCB succeeded {}", SslConnection.this); - releaseEncryptedOutputBuffer(); + releaseEmptyEncryptedOutputBuffer(); _flushState = FlushState.IDLE; interested = _fillState == FillState.INTERESTED; @@ -1563,8 +1610,7 @@ public void failed(final Throwable x) if (LOG.isDebugEnabled()) LOG.debug("IncompleteWriteCB failed {}", SslConnection.this, x); - BufferUtil.clear(_encryptedOutput); - releaseEncryptedOutputBuffer(); + discardEncryptedOutputBuffer(); _flushState = FlushState.IDLE; failFillInterest = _fillState == FillState.WAIT_FOR_FLUSH ||