From e2690cc420ccc47a8c42be2a7e46634cb4449970 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 1 Sep 2021 10:27:40 +0200 Subject: [PATCH] Fixes #6603 - HTTP/2 max local stream count exceeded (#6639) (#6682) * Fixes #6603 - HTTP/2 max local stream count exceeded (#6639) Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis. Updated Pool javadocs. Signed-off-by: Simone Bordet Signed-off-by: Greg Wilkins Co-authored-by: Greg Wilkins (cherry picked from commit 525fcb31194c62c44c912504ded177e30ff78c52) --- .../jetty/client/AbstractConnectionPool.java | 8 +- .../eclipse/jetty/client/ConnectionPool.java | 10 +- .../jetty/client/DuplexConnectionPool.java | 3 +- .../jetty/client/MultiplexConnectionPool.java | 21 +- .../client/MultiplexHttpDestination.java | 8 +- .../jetty/client/RandomConnectionPool.java | 2 +- .../client/RoundRobinConnectionPool.java | 2 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 14 +- .../http/HTTPSessionListenerPromise.java | 8 - .../client/http/HttpConnectionOverHTTP2.java | 9 +- .../client/http/MaxConcurrentStreamsTest.java | 107 ++++ .../http/MultiplexedConnectionPoolTest.java | 10 +- .../io/ArrayRetainableByteBufferPoolTest.java | 2 +- .../java/org/eclipse/jetty/util/Pool.java | 546 ++++++++++++++---- .../java/org/eclipse/jetty/util/PoolTest.java | 65 +-- 15 files changed, 604 insertions(+), 211 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 7d9b386f0ebe..e2a3b539563b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -55,7 +55,12 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester); + } + + protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester) + { + this(destination, new Pool<>(strategy, maxConnections, cache), requester); } protected AbstractConnectionPool(HttpDestination destination, Pool pool, Callback requester) @@ -63,6 +68,7 @@ protected AbstractConnectionPool(HttpDestination destination, Pool p this.destination = destination; this.requester = requester; this.pool = pool; + pool.setMaxMultiplex(1); // Force the use of multiplexing. addBean(pool); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 687ba809fbfc..8868f9a44941 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -24,7 +24,7 @@ public interface ConnectionPool extends Closeable { /** - * Optionally pre-create up to connectionCount + * Optionally pre-create up to {@code connectionCount} * connections so they are immediately ready for use. * @param connectionCount the number of connections to pre-start. */ @@ -106,7 +106,7 @@ interface Factory } /** - * Marks a connection pool as supporting multiplexed connections. + * Marks a connection as supporting multiplexed requests. */ interface Multiplexable { @@ -117,7 +117,11 @@ interface Multiplexable /** * @param maxMultiplex the max number of requests multiplexable on a single connection + * @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed */ - void setMaxMultiplex(int maxMultiplex); + @Deprecated + default void setMaxMultiplex(int maxMultiplex) + { + } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index 26ca41d2a8f9..3b7e23b291e1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -29,9 +29,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester); } + @Deprecated public DuplexConnectionPool(HttpDestination destination, Pool pool, Callback requester) { super(destination, pool, requester); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index 92f55d59472a..4c1fb4dfb577 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -20,7 +20,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; @ManagedObject -public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable +public class MultiplexConnectionPool extends AbstractConnectionPool { public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { @@ -29,9 +29,26 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections, public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex); + this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex); } + public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex) + { + super(destination, new Pool(strategy, maxConnections, cache) + { + @Override + protected int getMaxMultiplex(Connection connection) + { + int multiplex = (connection instanceof Multiplexable) + ? ((Multiplexable)connection).getMaxMultiplex() + : super.getMaxMultiplex(connection); + return multiplex > 0 ? multiplex : 1; + } + }, requester); + setMaxMultiplex(maxMultiplex); + } + + @Deprecated public MultiplexConnectionPool(HttpDestination destination, Pool pool, Callback requester, int maxMultiplex) { super(destination, pool, requester); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index 2a8dd114d800..cd42b25cfad1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -35,15 +35,15 @@ public MultiplexHttpDestination(HttpClient client, Origin origin) public int getMaxRequestsPerConnection() { ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof ConnectionPool.Multiplexable) - return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex(); + if (connectionPool instanceof AbstractConnectionPool) + return ((AbstractConnectionPool)connectionPool).getMaxMultiplex(); return 1; } public void setMaxRequestsPerConnection(int maxRequestsPerConnection) { ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof ConnectionPool.Multiplexable) - ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection); + if (connectionPool instanceof AbstractConnectionPool) + ((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java index b6459cde83ec..ee2c5ccd9385 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -26,6 +26,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool { public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex); + super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 3cb3def4a610..e78124dca8b1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -51,7 +51,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex); + super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex); // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 3f05639f6a41..6a999a68794e 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -786,7 +786,10 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Cons int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) { - failFn.accept(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); + IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount); + if (LOG.isDebugEnabled()) + LOG.debug("Could not create local stream #{} for {}", streamId, this, failure); + failFn.accept(failure); return null; } if (localStreamCount.compareAndSet(localCount, localCount + 1)) @@ -799,7 +802,7 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Cons stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) - LOG.debug("Created local {}", stream); + LOG.debug("Created local {} for {}", stream, this); return stream; } else @@ -834,6 +837,9 @@ protected IStream createRemoteStream(int streamId, MetaData.Request request) int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { + IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing); + if (LOG.isDebugEnabled()) + LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure); reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId))); return null; } @@ -847,7 +853,7 @@ protected IStream createRemoteStream(int streamId, MetaData.Request request) stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) - LOG.debug("Created remote {}", stream); + LOG.debug("Created remote {} for {}", stream, this); return stream; } else @@ -1019,7 +1025,7 @@ void scheduleTimeout(HTTP2Stream stream) private void onStreamCreated(int streamId) { if (LOG.isDebugEnabled()) - LOG.debug("Created stream #{} for {}", streamId, this); + LOG.debug("Creating stream #{} for {}", streamId, this); streamsState.onStreamCreated(); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java index c8a4490fe30e..f2ddff59202f 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HTTPSessionListenerPromise.java @@ -65,14 +65,6 @@ private Promise httpConnectionPromise() @Override public void onSettings(Session session, SettingsFrame frame) { - Map settings = frame.getSettings(); - if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS)) - { - HttpDestination destination = destination(); - if (destination instanceof HttpDestination.Multiplexed) - ((HttpDestination.Multiplexed)destination).setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS)); - } - // The first SETTINGS frame is the server preface reply. if (!connection.isMarked()) onServerPreface(session); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 3c9f895f95ef..b3377a3074cd 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; @@ -46,7 +47,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable +public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable { private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class); @@ -78,6 +79,12 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels) this.recycleHttpChannels = recycleHttpChannels; } + @Override + public int getMaxMultiplex() + { + return ((HTTP2Session)session).getMaxLocalStreams(); + } + @Override protected Iterator getHttpChannels() { diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index d3d97965b04a..7e61028f1bbf 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -16,6 +16,7 @@ import java.io.IOException; import java.net.SocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -71,6 +72,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class MaxConcurrentStreamsTest extends AbstractTest { @@ -538,6 +540,111 @@ public Connection newConnection(EndPoint endPoint, Map context) assertTrue(response3Latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception + { + long processing = 125; + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + private Session session1; + private Session session2; + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + switch (request.getURI().getPath()) + { + case "/prime": + { + session1 = stream.getSession(); + // Send another request from here to force the opening of the 2nd connection. + client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result -> + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + }); + break; + } + case "/prime2": + { + session2 = stream.getSession(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + case "/update_max_streams": + { + Session session = stream.getSession() == session1 ? session2 : session1; + Map settings = new HashMap<>(); + settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2); + session.settings(new SettingsFrame(settings, false), Callback.NOOP); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + default: + { + sleep(processing); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + } + return null; + } + }); + http2.setMaxConcurrentStreams(1); + prepareServer(http2); + server.start(); + prepareClient(); + client.setMaxConnectionsPerDestination(2); + client.start(); + + // Prime the 2 connections. + primeConnection(); + + String host = "localhost"; + int port = connector.getLocalPort(); + + assertEquals(1, client.getDestinations().size()); + HttpDestination destination = (HttpDestination)client.getDestinations().get(0); + AbstractConnectionPool pool = (AbstractConnectionPool)destination.getConnectionPool(); + assertEquals(2, pool.getConnectionCount()); + + // Send a request on one connection, which sends back a SETTINGS frame on the other connection. + ContentResponse response = client.newRequest(host, port) + .path("/update_max_streams") + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued. + int count = 4; + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + client.newRequest(host, port) + .path("/" + i) + .send(result -> + { + if (result.isSucceeded()) + { + int status = result.getResponse().getStatus(); + if (status == HttpStatus.OK_200) + latch.countDown(); + else + fail("unexpected status " + status); + } + else + { + fail(result.getFailure()); + } + }); + } + + assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS)); + } + private void primeConnection() throws Exception { // Prime the connection so that the maxConcurrentStream setting arrives to the client. diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java index 7d68e961c7f0..706f98aeb1d6 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java @@ -95,9 +95,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); - poolRef.set(pool); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -111,6 +109,7 @@ protected void removed(Connection connection) poolRemoveCounter.incrementAndGet(); } }; + poolRef.set(connectionPool.getBean(Pool.class)); connectionPool.setMaxDuration(maxDuration); return connectionPool; }); @@ -156,9 +155,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); - poolRef.set(pool); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -172,6 +169,7 @@ protected void removed(Connection connection) poolRemoveCounter.incrementAndGet(); } }; + poolRef.set(connectionPool.getBean(Pool.class)); connectionPool.setMaxDuration(maxDuration); return connectionPool; }); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java index f0b7201588d4..ab245571e1b7 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayRetainableByteBufferPoolTest.java @@ -347,7 +347,7 @@ public void testExponentialPool() throws IOException b3.release(); b4.getBuffer().limit(b4.getBuffer().capacity() - 2); - assertThat(pool.dump(), containsString("[size=4 closed=false]{capacity=4,inuse=3(75%)")); + assertThat(pool.dump(), containsString("]{capacity=4,inuse=3(75%)")); } /** diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index 5eedf0218566..556ad61f1d5a 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -26,6 +26,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.thread.AutoLock; @@ -33,24 +35,22 @@ import org.slf4j.LoggerFactory; /** - * A fast pool of objects, with optional support for - * multiplexing, max usage count and several optimized strategies plus - * an optional {@link ThreadLocal} cache of the last release entry. - *

- * When the method {@link #close()} is called, all {@link Closeable}s in the pool - * are also closed. - *

- * @param + *

A pool of objects, with optional support for multiplexing, + * max usage count and several optimized strategies plus + * an optional {@link ThreadLocal} cache of the last release entry.

+ *

When the method {@link #close()} is called, all {@link Closeable}s + * object pooled by the pool are also closed.

+ * + * @param the type of the pooled objects */ +@ManagedObject public class Pool implements AutoCloseable, Dumpable { private static final Logger LOGGER = LoggerFactory.getLogger(Pool.class); private final List entries = new CopyOnWriteArrayList<>(); - private final int maxEntries; private final StrategyType strategyType; - /* * The cache is used to avoid hammering on the first index of the entry list. * Caches can become poisoned (i.e.: containing entries that are in use) when @@ -63,8 +63,10 @@ public class Pool implements AutoCloseable, Dumpable private final ThreadLocal cache; private final AtomicInteger nextIndex; private volatile boolean closed; - private volatile int maxMultiplex = 1; - private volatile int maxUsageCount = -1; + @Deprecated + private volatile int maxUsage = -1; + @Deprecated + private volatile int maxMultiplex = -1; /** * The type of the strategy to use for the pool. @@ -99,7 +101,7 @@ public enum StrategyType * random strategy but with more predictable behaviour. * No entries are favoured and contention is reduced. */ - ROUND_ROBIN, + ROUND_ROBIN } /** @@ -117,6 +119,7 @@ public Pool(StrategyType strategyType, int maxEntries) /** * Construct a Pool with the specified thread-local cache size and * an optional {@link ThreadLocal} cache. + * * @param strategyType The strategy to used for looking up entries. * @param maxEntries the maximum amount of entries that the pool will accept. * @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry. @@ -126,66 +129,141 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache) this.maxEntries = maxEntries; this.strategyType = strategyType; this.cache = cache ? new ThreadLocal<>() : null; - nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; + this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; } + /** + * @return the number of reserved entries + */ + @ManagedAttribute("The number of reserved entries") public int getReservedCount() { return (int)entries.stream().filter(Entry::isReserved).count(); } + /** + * @return the number of idle entries + */ + @ManagedAttribute("The number of idle entries") public int getIdleCount() { return (int)entries.stream().filter(Entry::isIdle).count(); } + /** + * @return the number of in-use entries + */ + @ManagedAttribute("The number of in-use entries") public int getInUseCount() { return (int)entries.stream().filter(Entry::isInUse).count(); } + /** + * @return the number of closed entries + */ + @ManagedAttribute("The number of closed entries") public int getClosedCount() { return (int)entries.stream().filter(Entry::isClosed).count(); } + /** + * @return the maximum number of entries + */ + @ManagedAttribute("The maximum number of entries") public int getMaxEntries() { return maxEntries; } + /** + * @return the default maximum multiplex count of entries + * @deprecated Multiplex functionalities will be removed + */ + @ManagedAttribute("The default maximum multiplex count of entries") + @Deprecated public int getMaxMultiplex() { - return maxMultiplex; + return maxMultiplex == -1 ? 1 : maxMultiplex; + } + + /** + *

Retrieves the max multiplex count for the given pooled object.

+ * + * @param pooled the pooled object + * @return the max multiplex count for the given pooled object + * @deprecated Multiplex functionalities will be removed + */ + @Deprecated + protected int getMaxMultiplex(T pooled) + { + return getMaxMultiplex(); } + /** + *

Sets the default maximum multiplex count for the Pool's entries.

+ * + * @param maxMultiplex the default maximum multiplex count of entries + * @deprecated Multiplex functionalities will be removed + */ + @Deprecated public final void setMaxMultiplex(int maxMultiplex) { if (maxMultiplex < 1) throw new IllegalArgumentException("Max multiplex must be >= 1"); - this.maxMultiplex = maxMultiplex; + try (AutoLock l = lock.lock()) + { + if (closed) + return; + + if (entries.stream().anyMatch(MonoEntry.class::isInstance)) + throw new IllegalStateException("Pool entries do not support multiplexing"); + + this.maxMultiplex = maxMultiplex; + } } /** - * Get the maximum number of times the entries of the pool - * can be acquired. - * @return the max usage count. + *

Returns the maximum number of times the entries of the pool + * can be acquired.

+ * + * @return the default maximum usage count of entries + * @deprecated MaxUsage functionalities will be removed */ + @ManagedAttribute("The default maximum usage count of entries") + @Deprecated public int getMaxUsageCount() { - return maxUsageCount; + return maxUsage; + } + + /** + *

Retrieves the max usage count for the given pooled object.

+ * + * @param pooled the pooled object + * @return the max usage count for the given pooled object + * @deprecated MaxUsage functionalities will be removed + */ + @Deprecated + protected int getMaxUsageCount(T pooled) + { + return getMaxUsageCount(); } /** - * Change the max usage count of the pool's entries. All existing - * idle entries over this new max usage are removed and closed. - * @param maxUsageCount the max usage count. + *

Sets the maximum usage count for the Pool's entries.

+ *

All existing idle entries that have a usage count larger + * than this new value are removed from the Pool and closed.

+ * + * @param maxUsageCount the default maximum usage count of entries + * @deprecated MaxUsage functionalities will be removed */ + @Deprecated public final void setMaxUsageCount(int maxUsageCount) { if (maxUsageCount == 0) throw new IllegalArgumentException("Max usage count must be != 0"); - this.maxUsageCount = maxUsageCount; // Iterate the entries, remove overused ones and collect a list of the closeable removed ones. List copy; @@ -194,6 +272,11 @@ public final void setMaxUsageCount(int maxUsageCount) if (closed) return; + if (entries.stream().anyMatch(MonoEntry.class::isInstance)) + throw new IllegalStateException("Pool entries do not support max usage"); + + this.maxUsage = maxUsageCount; + copy = entries.stream() .filter(entry -> entry.isIdleAndOverUsed() && remove(entry) && entry.pooled instanceof Closeable) .map(entry -> (Closeable)entry.pooled) @@ -205,10 +288,10 @@ public final void setMaxUsageCount(int maxUsageCount) } /** - * Create a new disabled slot into the pool. - * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + *

Creates a new disabled slot into the pool.

+ *

The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} * method called or be removed via {@link Pool.Entry#remove()} or - * {@link Pool#remove(Pool.Entry)}. + * {@link Pool#remove(Pool.Entry)}.

* * @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex, * or a negative number to always trigger the reservation of a new entry. @@ -232,17 +315,17 @@ public Entry reserve(int allotment) if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment) return null; - Entry entry = new Entry(); + Entry entry = newEntry(); entries.add(entry); return entry; } } /** - * Create a new disabled slot into the pool. - * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + *

Creates a new disabled slot into the pool.

+ *

The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} * method called or be removed via {@link Pool.Entry#remove()} or - * {@link Pool#remove(Pool.Entry)}. + * {@link Pool#remove(Pool.Entry)}.

* * @return a disabled entry that is contained in the pool, * or null if the pool is closed or if the pool already contains @@ -259,15 +342,27 @@ public Entry reserve() if (entries.size() >= maxEntries) return null; - Entry entry = new Entry(); + Entry entry = newEntry(); entries.add(entry); return entry; } } + private Entry newEntry() + { + // Do not allow more than 2 implementations of Entry, otherwise call sites in Pool + // referencing Entry methods will become mega-morphic and kill the performance. + if (maxMultiplex >= 0 || maxUsage >= 0) + return new MultiEntry(); + return new MonoEntry(); + } + /** - * Acquire an entry from the pool. - * Only enabled entries will be returned from this method and their enable method must not be called. + *

Acquires an entry from the pool.

+ *

Only enabled entries will be returned from this method + * and their {@link Entry#enable(Object, boolean)} + * method must not be called.

+ * * @return an entry from the pool or null if none is available. */ public Entry acquire() @@ -329,8 +424,8 @@ private int startIndex(int size) } /** - * Utility method to acquire an entry from the pool, - * reserving and creating a new entry if necessary. + *

Acquires an entry from the pool, + * reserving and creating a new entry if necessary.

* * @param creator a function to create the pooled value for a reserved entry. * @return an entry from the pool or null if none is available. @@ -366,15 +461,14 @@ public Entry acquire(Function.Entry, T> creator) } /** - * This method will return an acquired object to the pool. Objects - * that are acquired from the pool but never released will result - * in a memory leak. + *

Releases an {@link #acquire() acquired} entry to the pool.

+ *

Entries that are acquired from the pool but never released + * will result in a memory leak.

* * @param entry the value to return to the pool * @return true if the entry was released and could be acquired again, * false if the entry should be removed by calling {@link #remove(Pool.Entry)} * and the object contained by the entry should be disposed. - * @throws NullPointerException if value is null */ public boolean release(Entry entry) { @@ -388,7 +482,7 @@ public boolean release(Entry entry) } /** - * Remove a value from the pool. + *

Removes an entry from the pool.

* * @param entry the value to remove * @return true if the entry was removed, false otherwise @@ -465,78 +559,72 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x[size=%d closed=%s]", + return String.format("%s@%x[inUse=%d,size=%d,max=%d,closed=%b]", getClass().getSimpleName(), hashCode(), - entries.size(), - closed); + getInUseCount(), + size(), + getMaxEntries(), + isClosed()); } - public class Entry + /** + *

A Pool entry that holds metadata and a pooled object.

+ */ + public abstract class Entry { - // hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending - // lo: multiplexing counter - private final AtomicBiInteger state; - - // The pooled item. This is not volatile as it is set once and then never changed. + // The pooled object. This is not volatile as it is set once and then never changed. // Other threads accessing must check the state field above first, so a good before/after // relationship exists to make a memory barrier. private T pooled; - Entry() - { - this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); - } - - // for testing only - void setUsageCount(int usageCount) - { - this.state.getAndSetHi(usageCount); - } - - /** Enable a reserved entry {@link Entry}. - * An entry returned from the {@link #reserve()} method must be enabled with this method, - * once and only once, before it is usable by the pool. - * The entry may be enabled and not acquired, in which case it is immediately available to be + /** + *

Enables this, previously {@link #reserve() reserved}, Entry.

+ *

An entry returned from the {@link #reserve()} method must be enabled with this method, + * once and only once, before it is usable by the pool.

+ *

The entry may be enabled and not acquired, in which case it is immediately available to be * acquired, potentially by another thread; or it can be enabled and acquired atomically so that - * no other thread can acquire it, although the acquire may still fail if the pool has been closed. - * @param pooled The pooled item for the entry - * @param acquire If true the entry is atomically enabled and acquired. - * @return true If the entry was enabled. - * @throws IllegalStateException if the entry was already enabled + * no other thread can acquire it, although the acquire may still fail if the pool has been closed.

+ * + * @param pooled the pooled object for this Entry + * @param acquire whether this Entry should be atomically enabled and acquired + * @return whether this Entry was enabled + * @throws IllegalStateException if this Entry was already enabled */ public boolean enable(T pooled, boolean acquire) { Objects.requireNonNull(pooled); - if (state.getHi() != Integer.MIN_VALUE) + if (!isReserved()) { - if (state.getHi() == -1) + if (isClosed()) return false; // Pool has been closed throw new IllegalStateException("Entry already enabled: " + this); } this.pooled = pooled; - int usage = acquire ? 1 : 0; - if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage)) - { - this.pooled = null; - if (state.getHi() == -1) - return false; // Pool has been closed - throw new IllegalStateException("Entry already enabled: " + this); - } - return true; + if (tryEnable(acquire)) + return true; + + this.pooled = null; + if (isClosed()) + return false; // Pool has been closed + throw new IllegalStateException("Entry already enabled: " + this); } + /** + * @return the pooled object + */ public T getPooled() { return pooled; } /** - * Release the entry. - * This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry. - * @return true if released. + *

Releases this Entry.

+ *

This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.

+ * + * @return whether this Entry was released */ public boolean release() { @@ -544,9 +632,10 @@ public boolean release() } /** - * Remove the entry. - * This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry. - * @return true if remove. + *

Removes this Entry from the Pool.

+ *

This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.

+ * + * @return whether this Entry was removed */ public boolean remove() { @@ -554,40 +643,257 @@ public boolean remove() } /** - * Try to acquire the entry if possible by incrementing both the usage - * count and the multiplex count. - * @return true if the usage count is <= maxUsageCount and - * the multiplex count is maxMultiplex and the entry is not closed, - * false otherwise. + *

Tries to enable, and possible also acquire, this Entry.

+ * + * @param acquire whether to also acquire this Entry + * @return whether this Entry was enabled + */ + abstract boolean tryEnable(boolean acquire); + + /** + *

Tries to acquire this Entry.

+ * + * @return whether this Entry was acquired + */ + abstract boolean tryAcquire(); + + /** + *

Tries to release this Entry.

+ * + * @return true if this Entry was released, + * false if {@link #tryRemove()} should be called. + */ + abstract boolean tryRelease(); + + /** + *

Tries to remove the entry by marking it as closed.

+ * + * @return whether the entry can be removed from the containing pool + */ + abstract boolean tryRemove(); + + /** + * @return whether this Entry is closed + */ + public abstract boolean isClosed(); + + /** + * @return whether this Entry is reserved + */ + public abstract boolean isReserved(); + + /** + * @return whether this Entry is idle + */ + public abstract boolean isIdle(); + + /** + * @return whether this entry is in use. */ + public abstract boolean isInUse(); + + /** + * @return whether this entry has been used beyond {@link #getMaxUsageCount()} + * @deprecated MaxUsage functionalities will be removed + */ + @Deprecated + public boolean isOverUsed() + { + return false; + } + + boolean isIdleAndOverUsed() + { + return false; + } + + // Only for testing. + int getUsageCount() + { + return 0; + } + + // Only for testing. + void setUsageCount(int usageCount) + { + } + } + + /** + *

A Pool entry that holds metadata and a pooled object, + * that can only be acquired concurrently at most once, and + * can be acquired/released multiple times.

+ */ + private class MonoEntry extends Entry + { + // MIN_VALUE => pending; -1 => closed; 0 => idle; 1 => active; + private final AtomicInteger state = new AtomicInteger(Integer.MIN_VALUE); + + @Override + protected boolean tryEnable(boolean acquire) + { + return state.compareAndSet(Integer.MIN_VALUE, acquire ? 1 : 0); + } + + @Override + boolean tryAcquire() + { + while (true) + { + int s = state.get(); + if (s != 0) + return false; + if (state.compareAndSet(s, 1)) + return true; + } + } + + @Override + boolean tryRelease() + { + while (true) + { + int s = state.get(); + if (s < 0) + return false; + if (s == 0) + throw new IllegalStateException("Cannot release an already released entry"); + if (state.compareAndSet(s, 0)) + return true; + } + } + + @Override + boolean tryRemove() + { + state.set(-1); + return true; + } + + @Override + public boolean isClosed() + { + return state.get() < 0; + } + + @Override + public boolean isReserved() + { + return state.get() == Integer.MIN_VALUE; + } + + @Override + public boolean isIdle() + { + return state.get() == 0; + } + + @Override + public boolean isInUse() + { + return state.get() == 1; + } + + @Override + public String toString() + { + String s; + switch (state.get()) + { + case Integer.MIN_VALUE: + s = "PENDING"; + break; + case -1: + s = "CLOSED"; + break; + case 0: + s = "IDLE"; + break; + default: + s = "ACTIVE"; + } + return String.format("%s@%x{%s,pooled=%s}", + getClass().getSimpleName(), + hashCode(), + s, + getPooled()); + } + } + + /** + *

A Pool entry that holds metadata and a pooled object, + * that can be acquired concurrently multiple times, and + * can be acquired/released multiple times.

+ */ + class MultiEntry extends Entry + { + // hi: MIN_VALUE => pending; -1 => closed; 0+ => usage counter; + // lo: 0 => idle; positive => multiplex counter + private final AtomicBiInteger state; + + MultiEntry() + { + this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); + } + + @Override + void setUsageCount(int usageCount) + { + this.state.getAndSetHi(usageCount); + } + + @Override + protected boolean tryEnable(boolean acquire) + { + int usage = acquire ? 1 : 0; + return state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage); + } + + /** + *

Tries to acquire the entry if possible by incrementing both the usage + * count and the multiplex count.

+ * + * @return true if the usage count is less than {@link #getMaxUsageCount()} and + * the multiplex count is less than {@link #getMaxMultiplex(Object)} and + * the entry is not closed, false otherwise. + */ + @Override boolean tryAcquire() { while (true) { long encoded = state.get(); int usageCount = AtomicBiInteger.getHi(encoded); + int multiplexCount = AtomicBiInteger.getLo(encoded); boolean closed = usageCount < 0; - int multiplexingCount = AtomicBiInteger.getLo(encoded); - int currentMaxUsageCount = maxUsageCount; - if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount)) + if (closed) + return false; + T pooled = getPooled(); + int maxUsageCount = getMaxUsageCount(pooled); + int maxMultiplexed = getMaxMultiplex(pooled); + if (maxMultiplexed > 0 && multiplexCount >= maxMultiplexed) + return false; + if (maxUsageCount > 0 && usageCount >= maxUsageCount) return false; // Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE. int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1; - if (state.compareAndSet(encoded, newUsageCount, multiplexingCount + 1)) + if (state.compareAndSet(encoded, newUsageCount, multiplexCount + 1)) return true; } } /** - * Try to release the entry if possible by decrementing the multiplexing - * count unless the entity is closed. + *

Tries to release the entry if possible by decrementing the multiplex + * count unless the entity is closed.

+ * * @return true if the entry was released, * false if {@link #tryRemove()} should be called. */ + @Override boolean tryRelease() { - int newMultiplexingCount; + int newMultiplexCount; int usageCount; while (true) { @@ -597,24 +903,26 @@ boolean tryRelease() if (closed) return false; - newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1; - if (newMultiplexingCount < 0) + newMultiplexCount = AtomicBiInteger.getLo(encoded) - 1; + if (newMultiplexCount < 0) throw new IllegalStateException("Cannot release an already released entry"); - if (state.compareAndSet(encoded, usageCount, newMultiplexingCount)) + if (state.compareAndSet(encoded, usageCount, newMultiplexCount)) break; } - int currentMaxUsageCount = maxUsageCount; + int currentMaxUsageCount = maxUsage; boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; - return !(overUsed && newMultiplexingCount == 0); + return !(overUsed && newMultiplexCount == 0); } /** - * Try to remove the entry by marking it as closed and decrementing the multiplexing counter. - * The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed. + *

Tries to remove the entry by marking it as closed and decrementing the multiplex counter.

+ *

The multiplex counter will never go below zero and if it reaches zero, the entry is considered removed.

+ * * @return true if the entry can be removed from the containing pool, false otherwise. */ + @Override boolean tryRemove() { while (true) @@ -630,45 +938,52 @@ boolean tryRemove() } } + @Override public boolean isClosed() { return state.getHi() < 0; } + @Override public boolean isReserved() { return state.getHi() == Integer.MIN_VALUE; } + @Override public boolean isIdle() { long encoded = state.get(); return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) == 0; } + @Override public boolean isInUse() { long encoded = state.get(); return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) > 0; } + @Override public boolean isOverUsed() { - int currentMaxUsageCount = maxUsageCount; + int maxUsageCount = getMaxUsageCount(); int usageCount = state.getHi(); - return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; + return maxUsageCount > 0 && usageCount >= maxUsageCount; } + @Override boolean isIdleAndOverUsed() { - int currentMaxUsageCount = maxUsageCount; + int maxUsageCount = getMaxUsageCount(); long encoded = state.get(); int usageCount = AtomicBiInteger.getHi(encoded); int multiplexCount = AtomicBiInteger.getLo(encoded); - return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && multiplexCount == 0; + return maxUsageCount > 0 && usageCount >= maxUsageCount && multiplexCount == 0; } - public int getUsageCount() + @Override + int getUsageCount() { return Math.max(state.getHi(), 0); } @@ -680,16 +995,17 @@ public String toString() int usageCount = AtomicBiInteger.getHi(encoded); int multiplexCount = AtomicBiInteger.getLo(encoded); - String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE"; + String state = usageCount < 0 + ? (usageCount == Integer.MIN_VALUE ? "PENDING" : "CLOSED") + : (multiplexCount == 0 ? "IDLE" : "ACTIVE"); - return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}", + return String.format("%s@%x{%s,usage=%d,multiplex=%d,pooled=%s}", getClass().getSimpleName(), hashCode(), state, Math.max(usageCount, 0), Math.max(multiplexCount, 0), - getMaxMultiplex(), - pooled); + getPooled()); } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java index 4bf8ad6f48d8..b8f13c7661bc 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -217,69 +217,6 @@ public void testReserve(Factory factory) assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); } - @ParameterizedTest - @MethodSource(value = "strategy") - public void testDeprecatedReserve(Factory factory) - { - Pool pool = factory.getPool(2); - - // Reserve an entry - Pool.Entry e1 = pool.reserve(-1); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - - // max reservations - assertNull(pool.reserve(1)); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - - // enable the entry - e1.enable(new CloseableHolder("aaa"), false); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // Reserve another entry - Pool.Entry e2 = pool.reserve(-1); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // remove the reservation - e2.remove(); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // Reserve another entry - Pool.Entry e3 = pool.reserve(-1); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // enable and acquire the entry - e3.enable(new CloseableHolder("bbb"), true); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(1)); - - // can't reenable - assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false)); - - // Can't enable acquired entry - Pool.Entry e = pool.acquire(); - assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); - } - @ParameterizedTest @MethodSource(value = "strategy") public void testReserveNegativeMaxPending(Factory factory) @@ -587,6 +524,7 @@ public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) { Pool pool = factory.getPool(1); + pool.setMaxMultiplex(1); Pool.Entry entry = pool.reserve(); entry.enable(new CloseableHolder("aaa"), false); entry.setUsageCount(Integer.MAX_VALUE); @@ -606,6 +544,7 @@ public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) public void testDynamicMaxUsageCountChangeSweep(Factory factory) { Pool pool = factory.getPool(2); + pool.setMaxUsageCount(100); Pool.Entry entry1 = pool.reserve(); entry1.enable(new CloseableHolder("aaa"), false); Pool.Entry entry2 = pool.reserve();