From 525fcb31194c62c44c912504ded177e30ff78c52 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 30 Aug 2021 16:00:15 +0200 Subject: [PATCH] Fixes #6603 - HTTP/2 max local stream count exceeded (#6639) Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis. Updated Pool javadocs. Signed-off-by: Simone Bordet Co-authored-by: Greg Wilkins --- .../jetty/client/AbstractConnectionPool.java | 8 +- .../eclipse/jetty/client/ConnectionPool.java | 10 +- .../jetty/client/DuplexConnectionPool.java | 3 +- .../jetty/client/MultiplexConnectionPool.java | 21 +- .../client/MultiplexHttpDestination.java | 8 +- .../jetty/client/RandomConnectionPool.java | 2 +- .../client/RoundRobinConnectionPool.java | 2 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 14 +- .../http/HttpClientTransportOverHTTP2.java | 7 +- .../client/http/HttpConnectionOverHTTP2.java | 10 +- .../client/http/MaxConcurrentStreamsTest.java | 106 ++++ .../http/MultiplexedConnectionPoolTest.java | 10 +- .../java/org/eclipse/jetty/util/Pool.java | 552 ++++++++++++++---- .../java/org/eclipse/jetty/util/PoolTest.java | 85 +-- 14 files changed, 612 insertions(+), 226 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 093cc37a8ff5..018dfb08c085 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -70,7 +70,12 @@ protected AbstractConnectionPool(Destination destination, int maxConnections, Ca protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester); + } + + protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester) + { + this(destination, new Pool<>(strategy, maxConnections, cache), requester); } protected AbstractConnectionPool(HttpDestination destination, Pool pool, Callback requester) @@ -78,6 +83,7 @@ protected AbstractConnectionPool(HttpDestination destination, Pool p this.destination = destination; this.requester = requester; this.pool = pool; + pool.setMaxMultiplex(1); // Force the use of multiplexing. addBean(pool); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 4ae507896777..88d0b97adf0c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -29,7 +29,7 @@ public interface ConnectionPool extends Closeable { /** - * Optionally pre-create up to connectionCount + * Optionally pre-create up to {@code connectionCount} * connections so they are immediately ready for use. * @param connectionCount the number of connections to pre-start. */ @@ -104,7 +104,7 @@ interface Factory } /** - * Marks a connection pool as supporting multiplexed connections. + * Marks a connection as supporting multiplexed requests. */ interface Multiplexable { @@ -115,7 +115,11 @@ interface Multiplexable /** * @param maxMultiplex the max number of requests multiplexable on a single connection + * @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed */ - void setMaxMultiplex(int maxMultiplex); + @Deprecated + default void setMaxMultiplex(int maxMultiplex) + { + } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index eab2f96915c1..07c6ea7f9a16 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -34,9 +34,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester); } + @Deprecated public DuplexConnectionPool(HttpDestination destination, Pool pool, Callback requester) { super(destination, pool, requester); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index 9f4bfd7ee7fd..79df556bf32f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -25,7 +25,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; @ManagedObject -public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable +public class MultiplexConnectionPool extends AbstractConnectionPool { public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { @@ -34,9 +34,26 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections, public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex); + this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex); } + public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex) + { + super(destination, new Pool(strategy, maxConnections, cache) + { + @Override + protected int getMaxMultiplex(Connection connection) + { + int multiplex = (connection instanceof Multiplexable) + ? ((Multiplexable)connection).getMaxMultiplex() + : super.getMaxMultiplex(connection); + return multiplex > 0 ? multiplex : 1; + } + }, requester); + setMaxMultiplex(maxMultiplex); + } + + @Deprecated public MultiplexConnectionPool(HttpDestination destination, Pool pool, Callback requester, int maxMultiplex) { super(destination, pool, requester); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index bfa1dad3b514..1003a3a77a00 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -28,15 +28,15 @@ protected MultiplexHttpDestination(HttpClient client, Origin origin) public int getMaxRequestsPerConnection() { ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof ConnectionPool.Multiplexable) - return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex(); + if (connectionPool instanceof AbstractConnectionPool) + return ((AbstractConnectionPool)connectionPool).getMaxMultiplex(); return 1; } public void setMaxRequestsPerConnection(int maxRequestsPerConnection) { ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof ConnectionPool.Multiplexable) - ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection); + if (connectionPool instanceof AbstractConnectionPool) + ((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java index e924f8d89e1e..c6a974876b33 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -31,6 +31,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool { public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex); + super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 641c65d8ca67..8e046b46ac4d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -56,7 +56,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex); + super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex); // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 3a0c7b4feea7..f94351526b01 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -738,7 +738,10 @@ protected IStream createLocalStream(int streamId, Promise promise) int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) { - promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); + IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount); + if (LOG.isDebugEnabled()) + LOG.debug("Could not create local stream #{} for {}", streamId, this, failure); + promise.failed(failure); return null; } if (localStreamCount.compareAndSet(localCount, localCount + 1)) @@ -751,7 +754,7 @@ protected IStream createLocalStream(int streamId, Promise promise) stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) - LOG.debug("Created local {}", stream); + LOG.debug("Created local {} for {}", stream, this); return stream; } else @@ -786,6 +789,9 @@ protected IStream createRemoteStream(int streamId) int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { + IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing); + if (LOG.isDebugEnabled()) + LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure); reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId))); return null; } @@ -799,7 +805,7 @@ protected IStream createRemoteStream(int streamId) stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) - LOG.debug("Created remote {}", stream); + LOG.debug("Created remote {} for {}", stream, this); return stream; } else @@ -945,7 +951,7 @@ public void onFrame(Frame frame) private void onStreamCreated(int streamId) { if (LOG.isDebugEnabled()) - LOG.debug("Created stream #{} for {}", streamId, this); + LOG.debug("Creating stream #{} for {}", streamId, this); streamsState.onStreamCreated(); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index 182ca869f098..743e00110f71 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -60,7 +60,9 @@ public HttpClientTransportOverHTTP2(HTTP2Client client) setConnectionPoolFactory(destination -> { HttpClient httpClient = getHttpClient(); - return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination()); + // Start with the minimum maxMultiplex; the SETTINGS frame from the + // server preface will override this value before any request is sent. + return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1); }); } @@ -211,9 +213,6 @@ private Promise connectionPromise() @Override public void onSettings(Session session, SettingsFrame frame) { - Map settings = frame.getSettings(); - if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS)) - destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS)); if (!connection.isMarked()) onServerPreface(session); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 54c34582a688..deecd58f4b7d 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; @@ -36,13 +37,14 @@ import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Sweeper; -public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable +public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable { private static final Logger LOG = Log.getLogger(HttpConnection.class); @@ -74,6 +76,12 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels) this.recycleHttpChannels = recycleHttpChannels; } + @Override + public int getMaxMultiplex() + { + return ((HTTP2Session)session).getMaxLocalStreams(); + } + @Override protected Iterator getHttpChannels() { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index 636fd75108f7..b75c293cf0cf 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -40,6 +41,7 @@ import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpResponseException; import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -76,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class MaxConcurrentStreamsTest extends AbstractTest { @@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map context) assertTrue(response3Latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception + { + long processing = 125; + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + private Session session1; + private Session session2; + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + switch (request.getURI().getPath()) + { + case "/prime": + { + session1 = stream.getSession(); + // Send another request from here to force the opening of the 2nd connection. + client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result -> + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + }); + break; + } + case "/prime2": + { + session2 = stream.getSession(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + case "/update_max_streams": + { + Session session = stream.getSession() == session1 ? session2 : session1; + Map settings = new HashMap<>(); + settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2); + session.settings(new SettingsFrame(settings, false), Callback.NOOP); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + default: + { + sleep(processing); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + } + return null; + } + }); + http2.setMaxConcurrentStreams(1); + prepareServer(http2); + server.start(); + prepareClient(); + client.setMaxConnectionsPerDestination(2); + client.start(); + + // Prime the 2 connections. + primeConnection(); + + String host = "localhost"; + int port = connector.getLocalPort(); + + AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool(); + assertEquals(2, pool.getConnectionCount()); + + // Send a request on one connection, which sends back a SETTINGS frame on the other connection. + ContentResponse response = client.newRequest(host, port) + .path("/update_max_streams") + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued. + int count = 4; + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + client.newRequest(host, port) + .path("/" + i) + .send(result -> + { + if (result.isSucceeded()) + { + int status = result.getResponse().getStatus(); + if (status == HttpStatus.OK_200) + latch.countDown(); + else + fail("unexpected status " + status); + } + else + { + fail(result.getFailure()); + } + }); + } + + assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS)); + } + private void primeConnection() throws Exception { // Prime the connection so that the maxConcurrentStream setting arrives to the client. diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java index 3c684a76fcd3..307260e8eb08 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java @@ -100,9 +100,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); - poolRef.set(pool); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -116,6 +114,7 @@ protected void removed(Connection connection) poolRemoveCounter.incrementAndGet(); } }; + poolRef.set(connectionPool.getBean(Pool.class)); connectionPool.setMaxDuration(maxDuration); return connectionPool; }); @@ -161,9 +160,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); - poolRef.set(pool); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -177,6 +174,7 @@ protected void removed(Connection connection) poolRemoveCounter.incrementAndGet(); } }; + poolRef.set(connectionPool.getBean(Pool.class)); connectionPool.setMaxDuration(maxDuration); return connectionPool; }); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index 183b169249a3..921d6af5a417 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -31,6 +31,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; @@ -38,24 +40,22 @@ import org.eclipse.jetty.util.thread.Locker; /** - * A fast pool of objects, with optional support for - * multiplexing, max usage count and several optimized strategies plus - * an optional {@link ThreadLocal} cache of the last release entry. - *

- * When the method {@link #close()} is called, all {@link Closeable}s in the pool - * are also closed. - *

- * @param + *

A pool of objects, with optional support for multiplexing, + * max usage count and several optimized strategies plus + * an optional {@link ThreadLocal} cache of the last release entry.

+ *

When the method {@link #close()} is called, all {@link Closeable}s + * object pooled by the pool are also closed.

+ * + * @param the type of the pooled objects */ +@ManagedObject public class Pool implements AutoCloseable, Dumpable { private static final Logger LOGGER = Log.getLogger(Pool.class); private final List entries = new CopyOnWriteArrayList<>(); - private final int maxEntries; private final StrategyType strategyType; - /* * The cache is used to avoid hammering on the first index of the entry list. * Caches can become poisoned (i.e.: containing entries that are in use) when @@ -68,8 +68,10 @@ public class Pool implements AutoCloseable, Dumpable private final ThreadLocal cache; private final AtomicInteger nextIndex; private volatile boolean closed; - private volatile int maxMultiplex = 1; - private volatile int maxUsageCount = -1; + @Deprecated + private volatile int maxUsage = -1; + @Deprecated + private volatile int maxMultiplex = -1; /** * The type of the strategy to use for the pool. @@ -104,7 +106,7 @@ public enum StrategyType * random strategy but with more predictable behaviour. * No entries are favoured and contention is reduced. */ - ROUND_ROBIN, + ROUND_ROBIN } /** @@ -122,6 +124,7 @@ public Pool(StrategyType strategyType, int maxEntries) /** * Construct a Pool with the specified thread-local cache size and * an optional {@link ThreadLocal} cache. + * * @param strategyType The strategy to used for looking up entries. * @param maxEntries the maximum amount of entries that the pool will accept. * @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry. @@ -131,66 +134,141 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache) this.maxEntries = maxEntries; this.strategyType = strategyType; this.cache = cache ? new ThreadLocal<>() : null; - nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; + this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; } + /** + * @return the number of reserved entries + */ + @ManagedAttribute("The number of reserved entries") public int getReservedCount() { return (int)entries.stream().filter(Entry::isReserved).count(); } + /** + * @return the number of idle entries + */ + @ManagedAttribute("The number of idle entries") public int getIdleCount() { return (int)entries.stream().filter(Entry::isIdle).count(); } + /** + * @return the number of in-use entries + */ + @ManagedAttribute("The number of in-use entries") public int getInUseCount() { return (int)entries.stream().filter(Entry::isInUse).count(); } + /** + * @return the number of closed entries + */ + @ManagedAttribute("The number of closed entries") public int getClosedCount() { return (int)entries.stream().filter(Entry::isClosed).count(); } + /** + * @return the maximum number of entries + */ + @ManagedAttribute("The maximum number of entries") public int getMaxEntries() { return maxEntries; } + /** + * @return the default maximum multiplex count of entries + * @deprecated Multiplex functionalities will be removed + */ + @ManagedAttribute("The default maximum multiplex count of entries") + @Deprecated public int getMaxMultiplex() { - return maxMultiplex; + return maxMultiplex == -1 ? 1 : maxMultiplex; + } + + /** + *

Retrieves the max multiplex count for the given pooled object.

+ * + * @param pooled the pooled object + * @return the max multiplex count for the given pooled object + * @deprecated Multiplex functionalities will be removed + */ + @Deprecated + protected int getMaxMultiplex(T pooled) + { + return getMaxMultiplex(); } + /** + *

Sets the default maximum multiplex count for the Pool's entries.

+ * + * @param maxMultiplex the default maximum multiplex count of entries + * @deprecated Multiplex functionalities will be removed + */ + @Deprecated public final void setMaxMultiplex(int maxMultiplex) { if (maxMultiplex < 1) throw new IllegalArgumentException("Max multiplex must be >= 1"); - this.maxMultiplex = maxMultiplex; + try (Locker.Lock l = locker.lock()) + { + if (closed) + return; + + if (entries.stream().anyMatch(MonoEntry.class::isInstance)) + throw new IllegalStateException("Pool entries do not support multiplexing"); + + this.maxMultiplex = maxMultiplex; + } } /** - * Get the maximum number of times the entries of the pool - * can be acquired. - * @return the max usage count. + *

Returns the maximum number of times the entries of the pool + * can be acquired.

+ * + * @return the default maximum usage count of entries + * @deprecated MaxUsage functionalities will be removed */ + @ManagedAttribute("The default maximum usage count of entries") + @Deprecated public int getMaxUsageCount() { - return maxUsageCount; + return maxUsage; + } + + /** + *

Retrieves the max usage count for the given pooled object.

+ * + * @param pooled the pooled object + * @return the max usage count for the given pooled object + * @deprecated MaxUsage functionalities will be removed + */ + @Deprecated + protected int getMaxUsageCount(T pooled) + { + return getMaxUsageCount(); } /** - * Change the max usage count of the pool's entries. All existing - * idle entries over this new max usage are removed and closed. - * @param maxUsageCount the max usage count. + *

Sets the maximum usage count for the Pool's entries.

+ *

All existing idle entries that have a usage count larger + * than this new value are removed from the Pool and closed.

+ * + * @param maxUsageCount the default maximum usage count of entries + * @deprecated MaxUsage functionalities will be removed */ + @Deprecated public final void setMaxUsageCount(int maxUsageCount) { if (maxUsageCount == 0) throw new IllegalArgumentException("Max usage count must be != 0"); - this.maxUsageCount = maxUsageCount; // Iterate the entries, remove overused ones and collect a list of the closeable removed ones. List copy; @@ -199,6 +277,11 @@ public final void setMaxUsageCount(int maxUsageCount) if (closed) return; + if (entries.stream().anyMatch(MonoEntry.class::isInstance)) + throw new IllegalStateException("Pool entries do not support max usage"); + + this.maxUsage = maxUsageCount; + copy = entries.stream() .filter(entry -> entry.isIdleAndOverUsed() && remove(entry) && entry.pooled instanceof Closeable) .map(entry -> (Closeable)entry.pooled) @@ -210,10 +293,10 @@ public final void setMaxUsageCount(int maxUsageCount) } /** - * Create a new disabled slot into the pool. - * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + *

Creates a new disabled slot into the pool.

+ *

The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} * method called or be removed via {@link Pool.Entry#remove()} or - * {@link Pool#remove(Pool.Entry)}. + * {@link Pool#remove(Pool.Entry)}.

* * @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex, * or a negative number to always trigger the reservation of a new entry. @@ -237,17 +320,17 @@ public Entry reserve(int allotment) if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment) return null; - Entry entry = new Entry(); + Entry entry = newEntry(); entries.add(entry); return entry; } } /** - * Create a new disabled slot into the pool. - * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + *

Creates a new disabled slot into the pool.

+ *

The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} * method called or be removed via {@link Pool.Entry#remove()} or - * {@link Pool#remove(Pool.Entry)}. + * {@link Pool#remove(Pool.Entry)}.

* * @return a disabled entry that is contained in the pool, * or null if the pool is closed or if the pool already contains @@ -264,17 +347,28 @@ public Entry reserve() if (entries.size() >= maxEntries) return null; - Entry entry = new Entry(); + Entry entry = newEntry(); entries.add(entry); return entry; } } + private Entry newEntry() + { + // Do not allow more than 2 implementations of Entry, otherwise call sites in Pool + // referencing Entry methods will become mega-morphic and kill the performance. + if (maxMultiplex >= 0 || maxUsage >= 0) + return new MultiEntry(); + return new MonoEntry(); + } + /** - * Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism. - * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool. + *

Acquires the entry from the pool at the specified index.

+ *

This method bypasses the thread-local cache mechanism.

+ * * @param idx the index of the entry to acquire. * @return the specified entry or null if there is none at the specified index or if it is not available. + * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool. */ @Deprecated public Entry acquireAt(int idx) @@ -296,8 +390,11 @@ public Entry acquireAt(int idx) } /** - * Acquire an entry from the pool. - * Only enabled entries will be returned from this method and their enable method must not be called. + *

Acquires an entry from the pool.

+ *

Only enabled entries will be returned from this method + * and their {@link Entry#enable(Object, boolean)} + * method must not be called.

+ * * @return an entry from the pool or null if none is available. */ public Entry acquire() @@ -359,8 +456,8 @@ private int startIndex(int size) } /** - * Utility method to acquire an entry from the pool, - * reserving and creating a new entry if necessary. + *

Acquires an entry from the pool, + * reserving and creating a new entry if necessary.

* * @param creator a function to create the pooled value for a reserved entry. * @return an entry from the pool or null if none is available. @@ -396,15 +493,14 @@ public Entry acquire(Function.Entry, T> creator) } /** - * This method will return an acquired object to the pool. Objects - * that are acquired from the pool but never released will result - * in a memory leak. + *

Releases an {@link #acquire() acquired} entry to the pool.

+ *

Entries that are acquired from the pool but never released + * will result in a memory leak.

* * @param entry the value to return to the pool * @return true if the entry was released and could be acquired again, * false if the entry should be removed by calling {@link #remove(Pool.Entry)} * and the object contained by the entry should be disposed. - * @throws NullPointerException if value is null */ public boolean release(Entry entry) { @@ -418,7 +514,7 @@ public boolean release(Entry entry) } /** - * Remove a value from the pool. + *

Removes an entry from the pool.

* * @param entry the value to remove * @return true if the entry was removed, false otherwise @@ -495,78 +591,72 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x[size=%d closed=%s]", + return String.format("%s@%x[inUse=%d,size=%d,capacity=%d,closed=%b]", getClass().getSimpleName(), hashCode(), - entries.size(), - closed); + getInUseCount(), + size(), + getMaxEntries(), + isClosed()); } - public class Entry + /** + *

A Pool entry that holds metadata and a pooled object.

+ */ + public abstract class Entry { - // hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending - // lo: multiplexing counter - private final AtomicBiInteger state; - - // The pooled item. This is not volatile as it is set once and then never changed. + // The pooled object. This is not volatile as it is set once and then never changed. // Other threads accessing must check the state field above first, so a good before/after // relationship exists to make a memory barrier. private T pooled; - Entry() - { - this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); - } - - // for testing only - void setUsageCount(int usageCount) - { - this.state.getAndSetHi(usageCount); - } - - /** Enable a reserved entry {@link Entry}. - * An entry returned from the {@link #reserve()} method must be enabled with this method, - * once and only once, before it is usable by the pool. - * The entry may be enabled and not acquired, in which case it is immediately available to be + /** + *

Enables this, previously {@link #reserve() reserved}, Entry.

+ *

An entry returned from the {@link #reserve()} method must be enabled with this method, + * once and only once, before it is usable by the pool.

+ *

The entry may be enabled and not acquired, in which case it is immediately available to be * acquired, potentially by another thread; or it can be enabled and acquired atomically so that - * no other thread can acquire it, although the acquire may still fail if the pool has been closed. - * @param pooled The pooled item for the entry - * @param acquire If true the entry is atomically enabled and acquired. - * @return true If the entry was enabled. - * @throws IllegalStateException if the entry was already enabled + * no other thread can acquire it, although the acquire may still fail if the pool has been closed.

+ * + * @param pooled the pooled object for this Entry + * @param acquire whether this Entry should be atomically enabled and acquired + * @return whether this Entry was enabled + * @throws IllegalStateException if this Entry was already enabled */ public boolean enable(T pooled, boolean acquire) { Objects.requireNonNull(pooled); - if (state.getHi() != Integer.MIN_VALUE) + if (!isReserved()) { - if (state.getHi() == -1) + if (isClosed()) return false; // Pool has been closed throw new IllegalStateException("Entry already enabled: " + this); } this.pooled = pooled; - int usage = acquire ? 1 : 0; - if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage)) - { - this.pooled = null; - if (state.getHi() == -1) - return false; // Pool has been closed - throw new IllegalStateException("Entry already enabled: " + this); - } - return true; + if (tryEnable(acquire)) + return true; + + this.pooled = null; + if (isClosed()) + return false; // Pool has been closed + throw new IllegalStateException("Entry already enabled: " + this); } + /** + * @return the pooled object + */ public T getPooled() { return pooled; } /** - * Release the entry. - * This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry. - * @return true if released. + *

Releases this Entry.

+ *

This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.

+ * + * @return whether this Entry was released */ public boolean release() { @@ -574,9 +664,10 @@ public boolean release() } /** - * Remove the entry. - * This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry. - * @return true if remove. + *

Removes this Entry from the Pool.

+ *

This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.

+ * + * @return whether this Entry was removed */ public boolean remove() { @@ -584,40 +675,257 @@ public boolean remove() } /** - * Try to acquire the entry if possible by incrementing both the usage - * count and the multiplex count. - * @return true if the usage count is <= maxUsageCount and - * the multiplex count is maxMultiplex and the entry is not closed, - * false otherwise. + *

Tries to enable, and possible also acquire, this Entry.

+ * + * @param acquire whether to also acquire this Entry + * @return whether this Entry was enabled + */ + abstract boolean tryEnable(boolean acquire); + + /** + *

Tries to acquire this Entry.

+ * + * @return whether this Entry was acquired + */ + abstract boolean tryAcquire(); + + /** + *

Tries to release this Entry.

+ * + * @return true if this Entry was released, + * false if {@link #tryRemove()} should be called. + */ + abstract boolean tryRelease(); + + /** + *

Tries to remove the entry by marking it as closed.

+ * + * @return whether the entry can be removed from the containing pool + */ + abstract boolean tryRemove(); + + /** + * @return whether this Entry is closed + */ + public abstract boolean isClosed(); + + /** + * @return whether this Entry is reserved + */ + public abstract boolean isReserved(); + + /** + * @return whether this Entry is idle + */ + public abstract boolean isIdle(); + + /** + * @return whether this entry is in use. + */ + public abstract boolean isInUse(); + + /** + * @return whether this entry has been used beyond {@link #getMaxUsageCount()} + * @deprecated MaxUsage functionalities will be removed + */ + @Deprecated + public boolean isOverUsed() + { + return false; + } + + boolean isIdleAndOverUsed() + { + return false; + } + + // Only for testing. + int getUsageCount() + { + return 0; + } + + // Only for testing. + void setUsageCount(int usageCount) + { + } + } + + /** + *

A Pool entry that holds metadata and a pooled object, + * that can only be acquired concurrently at most once, and + * can be acquired/released multiple times.

+ */ + private class MonoEntry extends Entry + { + // MIN_VALUE => pending; -1 => closed; 0 => idle; 1 => active; + private final AtomicInteger state = new AtomicInteger(Integer.MIN_VALUE); + + @Override + protected boolean tryEnable(boolean acquire) + { + return state.compareAndSet(Integer.MIN_VALUE, acquire ? 1 : 0); + } + + @Override + boolean tryAcquire() + { + while (true) + { + int s = state.get(); + if (s != 0) + return false; + if (state.compareAndSet(s, 1)) + return true; + } + } + + @Override + boolean tryRelease() + { + while (true) + { + int s = state.get(); + if (s < 0) + return false; + if (s == 0) + throw new IllegalStateException("Cannot release an already released entry"); + if (state.compareAndSet(s, 0)) + return true; + } + } + + @Override + boolean tryRemove() + { + state.set(-1); + return true; + } + + @Override + public boolean isClosed() + { + return state.get() < 0; + } + + @Override + public boolean isReserved() + { + return state.get() == Integer.MIN_VALUE; + } + + @Override + public boolean isIdle() + { + return state.get() == 0; + } + + @Override + public boolean isInUse() + { + return state.get() == 1; + } + + @Override + public String toString() + { + String s; + switch (state.get()) + { + case Integer.MIN_VALUE: + s = "PENDING"; + break; + case -1: + s = "CLOSED"; + break; + case 0: + s = "IDLE"; + break; + default: + s = "ACTIVE"; + } + return String.format("%s@%x{%s,pooled=%s}", + getClass().getSimpleName(), + hashCode(), + s, + getPooled()); + } + } + + /** + *

A Pool entry that holds metadata and a pooled object, + * that can be acquired concurrently multiple times, and + * can be acquired/released multiple times.

+ */ + class MultiEntry extends Entry + { + // hi: MIN_VALUE => pending; -1 => closed; 0+ => usage counter; + // lo: 0 => idle; positive => multiplex counter + private final AtomicBiInteger state; + + MultiEntry() + { + this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); + } + + @Override + void setUsageCount(int usageCount) + { + this.state.getAndSetHi(usageCount); + } + + @Override + protected boolean tryEnable(boolean acquire) + { + int usage = acquire ? 1 : 0; + return state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage); + } + + /** + *

Tries to acquire the entry if possible by incrementing both the usage + * count and the multiplex count.

+ * + * @return true if the usage count is less than {@link #getMaxUsageCount()} and + * the multiplex count is less than {@link #getMaxMultiplex(Object)} and + * the entry is not closed, false otherwise. */ + @Override boolean tryAcquire() { while (true) { long encoded = state.get(); int usageCount = AtomicBiInteger.getHi(encoded); + int multiplexCount = AtomicBiInteger.getLo(encoded); boolean closed = usageCount < 0; - int multiplexingCount = AtomicBiInteger.getLo(encoded); - int currentMaxUsageCount = maxUsageCount; - if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount)) + if (closed) + return false; + T pooled = getPooled(); + int maxUsageCount = getMaxUsageCount(pooled); + int maxMultiplexed = getMaxMultiplex(pooled); + if (maxMultiplexed > 0 && multiplexCount >= maxMultiplexed) + return false; + if (maxUsageCount > 0 && usageCount >= maxUsageCount) return false; // Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE. int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1; - if (state.compareAndSet(encoded, newUsageCount, multiplexingCount + 1)) + if (state.compareAndSet(encoded, newUsageCount, multiplexCount + 1)) return true; } } /** - * Try to release the entry if possible by decrementing the multiplexing - * count unless the entity is closed. + *

Tries to release the entry if possible by decrementing the multiplex + * count unless the entity is closed.

+ * * @return true if the entry was released, * false if {@link #tryRemove()} should be called. */ + @Override boolean tryRelease() { - int newMultiplexingCount; + int newMultiplexCount; int usageCount; while (true) { @@ -627,24 +935,26 @@ boolean tryRelease() if (closed) return false; - newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1; - if (newMultiplexingCount < 0) + newMultiplexCount = AtomicBiInteger.getLo(encoded) - 1; + if (newMultiplexCount < 0) throw new IllegalStateException("Cannot release an already released entry"); - if (state.compareAndSet(encoded, usageCount, newMultiplexingCount)) + if (state.compareAndSet(encoded, usageCount, newMultiplexCount)) break; } - int currentMaxUsageCount = maxUsageCount; + int currentMaxUsageCount = maxUsage; boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; - return !(overUsed && newMultiplexingCount == 0); + return !(overUsed && newMultiplexCount == 0); } /** - * Try to remove the entry by marking it as closed and decrementing the multiplexing counter. - * The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed. + *

Tries to remove the entry by marking it as closed and decrementing the multiplex counter.

+ *

The multiplex counter will never go below zero and if it reaches zero, the entry is considered removed.

+ * * @return true if the entry can be removed from the containing pool, false otherwise. */ + @Override boolean tryRemove() { while (true) @@ -660,45 +970,52 @@ boolean tryRemove() } } + @Override public boolean isClosed() { return state.getHi() < 0; } + @Override public boolean isReserved() { return state.getHi() == Integer.MIN_VALUE; } + @Override public boolean isIdle() { long encoded = state.get(); return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) == 0; } + @Override public boolean isInUse() { long encoded = state.get(); return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) > 0; } + @Override public boolean isOverUsed() { - int currentMaxUsageCount = maxUsageCount; + int maxUsageCount = getMaxUsageCount(); int usageCount = state.getHi(); - return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; + return maxUsageCount > 0 && usageCount >= maxUsageCount; } + @Override boolean isIdleAndOverUsed() { - int currentMaxUsageCount = maxUsageCount; + int maxUsageCount = getMaxUsageCount(); long encoded = state.get(); int usageCount = AtomicBiInteger.getHi(encoded); int multiplexCount = AtomicBiInteger.getLo(encoded); - return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && multiplexCount == 0; + return maxUsageCount > 0 && usageCount >= maxUsageCount && multiplexCount == 0; } - public int getUsageCount() + @Override + int getUsageCount() { return Math.max(state.getHi(), 0); } @@ -710,16 +1027,17 @@ public String toString() int usageCount = AtomicBiInteger.getHi(encoded); int multiplexCount = AtomicBiInteger.getLo(encoded); - String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE"; + String state = usageCount < 0 + ? (usageCount == Integer.MIN_VALUE ? "PENDING" : "CLOSED") + : (multiplexCount == 0 ? "IDLE" : "ACTIVE"); - return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}", + return String.format("%s@%x{%s,usage=%d,multiplex=%d,pooled=%s}", getClass().getSimpleName(), hashCode(), state, Math.max(usageCount, 0), Math.max(multiplexCount, 0), - getMaxMultiplex(), - pooled); + getPooled()); } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java index f8b2ffdb2302..8b319a96bc2c 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -86,7 +86,7 @@ public static Stream strategy() public void testAcquireRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable(new CloseableHolder("aaa"), false); + pool.reserve().enable(new CloseableHolder("aaa"), false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); @@ -130,7 +130,7 @@ public void testAcquireRelease(Factory factory) public void testRemoveBeforeRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable(new CloseableHolder("aaa"), false); + pool.reserve().enable(new CloseableHolder("aaa"), false); Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); @@ -222,69 +222,6 @@ public void testReserve(Factory factory) assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); } - @ParameterizedTest - @MethodSource(value = "strategy") - public void testDeprecatedReserve(Factory factory) - { - Pool pool = factory.getPool(2); - - // Reserve an entry - Pool.Entry e1 = pool.reserve(-1); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - - // max reservations - assertNull(pool.reserve(1)); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - - // enable the entry - e1.enable(new CloseableHolder("aaa"), false); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // Reserve another entry - Pool.Entry e2 = pool.reserve(-1); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // remove the reservation - e2.remove(); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // Reserve another entry - Pool.Entry e3 = pool.reserve(-1); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // enable and acquire the entry - e3.enable(new CloseableHolder("bbb"), true); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(1)); - - // can't reenable - assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false)); - - // Can't enable acquired entry - Pool.Entry e = pool.acquire(); - assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); - } - @ParameterizedTest @MethodSource(value = "strategy") public void testReserveNegativeMaxPending(Factory factory) @@ -356,22 +293,6 @@ public void testValuesContainsAcquiredEntries(Factory factory) assertThat(pool.values().isEmpty(), is(false)); } - @ParameterizedTest - @MethodSource(value = "strategy") - public void testAcquireAt(Factory factory) - { - Pool pool = factory.getPool(2); - - pool.reserve(-1).enable(new CloseableHolder("aaa"), false); - pool.reserve(-1).enable(new CloseableHolder("bbb"), false); - - assertThat(pool.acquireAt(2), nullValue()); - assertThat(pool.acquireAt(0), notNullValue()); - assertThat(pool.acquireAt(0), nullValue()); - assertThat(pool.acquireAt(1), notNullValue()); - assertThat(pool.acquireAt(1), nullValue()); - } - @ParameterizedTest @MethodSource(value = "strategy") public void testMaxUsageCount(Factory factory) @@ -608,6 +529,7 @@ public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) { Pool pool = factory.getPool(1); + pool.setMaxMultiplex(1); Pool.Entry entry = pool.reserve(); entry.enable(new CloseableHolder("aaa"), false); entry.setUsageCount(Integer.MAX_VALUE); @@ -627,6 +549,7 @@ public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) public void testDynamicMaxUsageCountChangeSweep(Factory factory) { Pool pool = factory.getPool(2); + pool.setMaxUsageCount(100); Pool.Entry entry1 = pool.reserve(); entry1.enable(new CloseableHolder("aaa"), false); Pool.Entry entry2 = pool.reserve();