From 6ea6bdaba760f6dabe83fee065f2d14c6f5b7581 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Wed, 18 Aug 2021 17:02:33 +0200
Subject: [PATCH 1/7] Fixes #6603 - HTTP/2 max local stream count exceeded
Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis.
Signed-off-by: Simone Bordet
---
.../jetty/client/AbstractConnectionPool.java | 17 +++
.../eclipse/jetty/client/ConnectionPool.java | 13 ++-
.../jetty/client/MultiplexConnectionPool.java | 6 +
.../client/MultiplexHttpDestination.java | 9 +-
.../org/eclipse/jetty/http2/HTTP2Session.java | 13 ++-
.../http/HttpClientTransportOverHTTP2.java | 5 +-
.../client/http/MaxConcurrentStreamsTest.java | 106 ++++++++++++++++++
.../java/org/eclipse/jetty/util/Pool.java | 25 ++++-
8 files changed, 183 insertions(+), 11 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index 093cc37a8ff5..cb22c753113e 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -140,6 +140,23 @@ protected void setMaxMultiplex(int maxMultiplex)
pool.setMaxMultiplex(maxMultiplex);
}
+ protected void setMaxMultiplex(Connection connection, int maxMultiplex)
+ {
+ if (connection == null)
+ {
+ setMaxMultiplex(maxMultiplex);
+ }
+ else
+ {
+ if (connection instanceof Attachable)
+ {
+ Object attachment = ((Attachable)connection).getAttachment();
+ if (attachment instanceof EntryHolder)
+ ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
+ }
+ }
+ }
+
protected int getMaxUsageCount()
{
return pool.getMaxUsageCount();
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
index 4ae507896777..27ee72fb82d0 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
@@ -109,13 +109,22 @@ interface Factory
interface Multiplexable
{
/**
- * @return the max number of requests multiplexable on a single connection
+ * @return the default max number of requests multiplexable on a connection
*/
int getMaxMultiplex();
/**
- * @param maxMultiplex the max number of requests multiplexable on a single connection
+ * @param maxMultiplex the default max number of requests multiplexable on a connection
*/
void setMaxMultiplex(int maxMultiplex);
+
+ /**
+ * @param connection the multiplexed connection
+ * @param maxMultiplex the max number of requests multiplexable on the given connection
+ */
+ default void setMaxMultiplex(Connection connection, int maxMultiplex)
+ {
+ setMaxMultiplex(maxMultiplex);
+ }
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
index 9f4bfd7ee7fd..f52ca50b54f9 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
@@ -56,6 +56,12 @@ public void setMaxMultiplex(int maxMultiplex)
super.setMaxMultiplex(maxMultiplex);
}
+ @Override
+ public void setMaxMultiplex(Connection connection, int maxMultiplex)
+ {
+ super.setMaxMultiplex(connection, maxMultiplex);
+ }
+
@Override
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
index bfa1dad3b514..925e66c79d8b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
@@ -18,6 +18,8 @@
package org.eclipse.jetty.client;
+import org.eclipse.jetty.client.api.Connection;
+
public abstract class MultiplexHttpDestination extends HttpDestination
{
protected MultiplexHttpDestination(HttpClient client, Origin origin)
@@ -34,9 +36,14 @@ public int getMaxRequestsPerConnection()
}
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
+ {
+ setMaxRequestsPerConnection(null, maxRequestsPerConnection);
+ }
+
+ public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
- ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
+ ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
}
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 793982a3c38d..b6680b9a2ce8 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -737,7 +737,10 @@ protected IStream createLocalStream(int streamId, Promise promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
- promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
+ IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded");
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
+ promise.failed(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
@@ -750,7 +753,7 @@ protected IStream createLocalStream(int streamId, Promise promise)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
- LOG.debug("Created local {}", stream);
+ LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
@@ -785,6 +788,8 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
+ if (LOG.isDebugEnabled())
+ LOG.debug("Could not create remote stream #{} for {}", streamId, this);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
@@ -798,7 +803,7 @@ protected IStream createRemoteStream(int streamId)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
- LOG.debug("Created remote {}", stream);
+ LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
@@ -944,7 +949,7 @@ public void onFrame(Frame frame)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
- LOG.debug("Created stream #{} for {}", streamId, this);
+ LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
index 182ca869f098..77356249a0eb 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
@@ -212,8 +212,9 @@ private Promise connectionPromise()
public void onSettings(Session session, SettingsFrame frame)
{
Map settings = frame.getSettings();
- if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
- destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
+ Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
+ if (maxConcurrentStreams != null)
+ destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams);
if (!connection.isMarked())
onServerPreface(session);
}
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
index 636fd75108f7..b75c293cf0cf 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
@@ -40,6 +41,7 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.MultiplexConnectionPool;
+import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
@@ -76,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
public class MaxConcurrentStreamsTest extends AbstractTest
{
@@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}
+ @Test
+ public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
+ {
+ long processing = 125;
+ RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
+ {
+ private Session session1;
+ private Session session2;
+
+ @Override
+ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
+ {
+ MetaData.Request request = (MetaData.Request)frame.getMetaData();
+ switch (request.getURI().getPath())
+ {
+ case "/prime":
+ {
+ session1 = stream.getSession();
+ // Send another request from here to force the opening of the 2nd connection.
+ client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
+ {
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields());
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ });
+ break;
+ }
+ case "/prime2":
+ {
+ session2 = stream.getSession();
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ break;
+ }
+ case "/update_max_streams":
+ {
+ Session session = stream.getSession() == session1 ? session2 : session1;
+ Map settings = new HashMap<>();
+ settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
+ session.settings(new SettingsFrame(settings, false), Callback.NOOP);
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ break;
+ }
+ default:
+ {
+ sleep(processing);
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ break;
+ }
+ }
+ return null;
+ }
+ });
+ http2.setMaxConcurrentStreams(1);
+ prepareServer(http2);
+ server.start();
+ prepareClient();
+ client.setMaxConnectionsPerDestination(2);
+ client.start();
+
+ // Prime the 2 connections.
+ primeConnection();
+
+ String host = "localhost";
+ int port = connector.getLocalPort();
+
+ AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool();
+ assertEquals(2, pool.getConnectionCount());
+
+ // Send a request on one connection, which sends back a SETTINGS frame on the other connection.
+ ContentResponse response = client.newRequest(host, port)
+ .path("/update_max_streams")
+ .send();
+ assertEquals(HttpStatus.OK_200, response.getStatus());
+
+ // Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
+ int count = 4;
+ CountDownLatch latch = new CountDownLatch(count);
+ for (int i = 0; i < count; ++i)
+ {
+ client.newRequest(host, port)
+ .path("/" + i)
+ .send(result ->
+ {
+ if (result.isSucceeded())
+ {
+ int status = result.getResponse().getStatus();
+ if (status == HttpStatus.OK_200)
+ latch.countDown();
+ else
+ fail("unexpected status " + status);
+ }
+ else
+ {
+ fail(result.getFailure());
+ }
+ });
+ }
+
+ assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
+ }
+
private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index 183b169249a3..e7138b297abd 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -131,7 +131,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
- nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
+ this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
}
public int getReservedCount()
@@ -169,6 +169,14 @@ public final void setMaxMultiplex(int maxMultiplex)
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
+
+ try (Locker.Lock l = locker.lock())
+ {
+ if (closed)
+ return;
+
+ entries.forEach(entry -> entry.setMaxMultiplex(maxMultiplex));
+ }
}
/**
@@ -507,15 +515,28 @@ public class Entry
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
// lo: multiplexing counter
private final AtomicBiInteger state;
-
// The pooled item. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
private T pooled;
+ private volatile int maxMultiplex;
Entry()
{
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
+ this.maxMultiplex = Pool.this.maxMultiplex;
+ }
+
+ public int getMaxMultiplex()
+ {
+ return maxMultiplex;
+ }
+
+ public void setMaxMultiplex(int maxMultiplex)
+ {
+ if (maxMultiplex < 1)
+ throw new IllegalArgumentException("Max multiplex must be >= 1");
+ this.maxMultiplex = maxMultiplex;
}
// for testing only
From cb102d9afac3723fabaf37451d959671dd7b5b92 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Fri, 20 Aug 2021 00:35:23 +0200
Subject: [PATCH 2/7] Issue #6603 - HTTP/2 max local stream count exceeded
Updates after review.
Updated the maxMultiplex mechanism to always work on Pool.Entry, rather than on Pool.
Updated Pool javadocs.
Signed-off-by: Simone Bordet
---
.../jetty/client/AbstractConnectionPool.java | 17 +-
.../eclipse/jetty/client/ConnectionPool.java | 15 +-
.../jetty/client/MultiplexConnectionPool.java | 8 +-
.../client/MultiplexHttpDestination.java | 12 +-
.../org/eclipse/jetty/http2/HTTP2Session.java | 5 +-
.../http/HttpClientTransportOverHTTP2.java | 21 ++-
.../client/http/HttpConnectionOverHTTP2.java | 14 +-
.../java/org/eclipse/jetty/util/Pool.java | 164 +++++++++++-------
8 files changed, 153 insertions(+), 103 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index cb22c753113e..98551c0ad542 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -142,18 +142,11 @@ protected void setMaxMultiplex(int maxMultiplex)
protected void setMaxMultiplex(Connection connection, int maxMultiplex)
{
- if (connection == null)
- {
- setMaxMultiplex(maxMultiplex);
- }
- else
+ if (connection instanceof Attachable)
{
- if (connection instanceof Attachable)
- {
- Object attachment = ((Attachable)connection).getAttachment();
- if (attachment instanceof EntryHolder)
- ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
- }
+ Object attachment = ((Attachable)connection).getAttachment();
+ if (attachment instanceof EntryHolder)
+ ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
}
}
@@ -545,6 +538,8 @@ public void succeeded(Connection connection)
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
+ if (connection instanceof Multiplexable)
+ setMaxMultiplex(connection, ((ConnectionPool.Multiplexable)connection).getMaxMultiplex());
idle(connection, false);
complete(null);
proceed();
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
index 27ee72fb82d0..d73056a82a8f 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
@@ -104,27 +104,18 @@ interface Factory
}
/**
- * Marks a connection pool as supporting multiplexed connections.
+ * Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
/**
- * @return the default max number of requests multiplexable on a connection
+ * @return the max number of requests that can be multiplexed on a connection
*/
int getMaxMultiplex();
/**
- * @param maxMultiplex the default max number of requests multiplexable on a connection
+ * @param maxMultiplex the max number of requests that can be multiplexed on a connection
*/
void setMaxMultiplex(int maxMultiplex);
-
- /**
- * @param connection the multiplexed connection
- * @param maxMultiplex the max number of requests multiplexable on the given connection
- */
- default void setMaxMultiplex(Connection connection, int maxMultiplex)
- {
- setMaxMultiplex(maxMultiplex);
- }
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
index f52ca50b54f9..f95999b7baa2 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
@@ -25,7 +25,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;
@ManagedObject
-public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
+public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
@@ -56,12 +56,6 @@ public void setMaxMultiplex(int maxMultiplex)
super.setMaxMultiplex(maxMultiplex);
}
- @Override
- public void setMaxMultiplex(Connection connection, int maxMultiplex)
- {
- super.setMaxMultiplex(connection, maxMultiplex);
- }
-
@Override
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
index 925e66c79d8b..9ba58785d18e 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
@@ -30,20 +30,22 @@ protected MultiplexHttpDestination(HttpClient client, Origin origin)
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
- if (connectionPool instanceof ConnectionPool.Multiplexable)
- return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
+ if (connectionPool instanceof AbstractConnectionPool)
+ return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}
public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
- setMaxRequestsPerConnection(null, maxRequestsPerConnection);
+ ConnectionPool connectionPool = getConnectionPool();
+ if (connectionPool instanceof AbstractConnectionPool)
+ ((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}
public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
- if (connectionPool instanceof ConnectionPool.Multiplexable)
- ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
+ if (connectionPool instanceof AbstractConnectionPool)
+ ((AbstractConnectionPool)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
}
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index b6680b9a2ce8..e8bef982c808 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -737,7 +737,7 @@ protected IStream createLocalStream(int streamId, Promise promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
- IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded");
+ IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
promise.failed(failure);
@@ -788,8 +788,9 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
+ IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
- LOG.debug("Could not create remote stream #{} for {}", streamId, this);
+ LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
index 77356249a0eb..ad5ce4349eed 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
@@ -60,7 +60,7 @@ public HttpClientTransportOverHTTP2(HTTP2Client client)
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
- return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
+ return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1);
});
}
@@ -213,15 +213,24 @@ public void onSettings(Session session, SettingsFrame frame)
{
Map settings = frame.getSettings();
Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
- if (maxConcurrentStreams != null)
- destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams);
- if (!connection.isMarked())
- onServerPreface(session);
+ boolean[] initialized = new boolean[1];
+ Connection connection = this.connection.get(initialized);
+ if (initialized[0])
+ {
+ if (maxConcurrentStreams != null && connection != null)
+ destination().setMaxRequestsPerConnection(connection, maxConcurrentStreams);
+ }
+ else
+ {
+ onServerPreface(session, maxConcurrentStreams);
+ }
}
- private void onServerPreface(Session session)
+ private void onServerPreface(Session session, Integer maxConcurrentStreams)
{
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
+ if (maxConcurrentStreams != null)
+ connection.setMaxMultiplex(maxConcurrentStreams);
if (this.connection.compareAndSet(null, connection, false, true))
connectionPromise().succeeded(connection);
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
index 54c34582a688..c3bd1b660d35 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
@@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
@@ -42,7 +43,7 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;
-public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
+public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
@@ -52,6 +53,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels = true;
+ private int maxMultiplex = 1;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
@@ -74,6 +76,16 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}
+ public int getMaxMultiplex()
+ {
+ return maxMultiplex;
+ }
+
+ public void setMaxMultiplex(int maxMultiplex)
+ {
+ this.maxMultiplex = maxMultiplex;
+ }
+
@Override
protected Iterator getHttpChannels()
{
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index e7138b297abd..907ed44bdca3 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -31,6 +31,8 @@
import java.util.function.Function;
import java.util.stream.Collectors;
+import org.eclipse.jetty.util.annotation.ManagedAttribute;
+import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
@@ -38,24 +40,22 @@
import org.eclipse.jetty.util.thread.Locker;
/**
- * A fast pool of objects, with optional support for
+ *
A pool of objects, with optional support for
* multiplexing, max usage count and several optimized strategies plus
- * an optional {@link ThreadLocal} cache of the last release entry.
- *
- * When the method {@link #close()} is called, all {@link Closeable}s in the pool
- * are also closed.
- *
- * @param
+ * an optional {@link ThreadLocal} cache of the last release entry.
+ *
When the method {@link #close()} is called, all {@link Closeable}s
+ * object pooled by the pool are also closed.
+ *
+ * @param the type of the pooled objects
*/
+@ManagedObject
public class Pool implements AutoCloseable, Dumpable
{
private static final Logger LOGGER = Log.getLogger(Pool.class);
private final List entries = new CopyOnWriteArrayList<>();
-
private final int maxEntries;
private final StrategyType strategyType;
-
/*
* The cache is used to avoid hammering on the first index of the entry list.
* Caches can become poisoned (i.e.: containing entries that are in use) when
@@ -104,7 +104,7 @@ public enum StrategyType
* random strategy but with more predictable behaviour.
* No entries are favoured and contention is reduced.
*/
- ROUND_ROBIN,
+ ROUND_ROBIN
}
/**
@@ -122,6 +122,7 @@ public Pool(StrategyType strategyType, int maxEntries)
/**
* Construct a Pool with the specified thread-local cache size and
* an optional {@link ThreadLocal} cache.
+ *
* @param strategyType The strategy to used for looking up entries.
* @param maxEntries the maximum amount of entries that the pool will accept.
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
@@ -134,65 +135,92 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
}
+ /**
+ * @return the number of reserved entries
+ */
+ @ManagedAttribute("The number of reserved entries")
public int getReservedCount()
{
return (int)entries.stream().filter(Entry::isReserved).count();
}
+ /**
+ * @return the number of idle entries
+ */
+ @ManagedAttribute("The number of idle entries")
public int getIdleCount()
{
return (int)entries.stream().filter(Entry::isIdle).count();
}
+ /**
+ * @return the number of in-use entries
+ */
+ @ManagedAttribute("The number of in-use entries")
public int getInUseCount()
{
return (int)entries.stream().filter(Entry::isInUse).count();
}
+ /**
+ * @return the number of closed entries
+ */
+ @ManagedAttribute("The number of closed entries")
public int getClosedCount()
{
return (int)entries.stream().filter(Entry::isClosed).count();
}
+ /**
+ * @return the maximum number of entries
+ */
+ @ManagedAttribute("The maximum number of entries")
public int getMaxEntries()
{
return maxEntries;
}
+ /**
+ * @return the default maximum multiplex count of entries
+ */
+ @ManagedAttribute("The default maximum multiplex count of entries")
public int getMaxMultiplex()
{
return maxMultiplex;
}
+ /**
+ *
Sets the default maximum multiplex count for the Pool's entries.
+ *
This value is used to initialize {@link Entry#maxMultiplex}
+ * when a new {@link Entry} is created.
+ *
+ * @param maxMultiplex the default maximum multiplex count of entries
+ */
public final void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
-
- try (Locker.Lock l = locker.lock())
- {
- if (closed)
- return;
-
- entries.forEach(entry -> entry.setMaxMultiplex(maxMultiplex));
- }
}
/**
- * Get the maximum number of times the entries of the pool
- * can be acquired.
- * @return the max usage count.
+ *
Returns the maximum number of times the entries of the pool
+ * can be acquired.
+ *
+ * @return the default maximum usage count of entries
*/
+ @ManagedAttribute("The default maximum usage count of entries")
public int getMaxUsageCount()
{
return maxUsageCount;
}
/**
- * Change the max usage count of the pool's entries. All existing
- * idle entries over this new max usage are removed and closed.
- * @param maxUsageCount the max usage count.
+ *
Sets the maximum usage count for the Pool's entries.
+ *
All existing idle entries that have a usage count larger
+ * than this new value are removed from the Pool and closed.
+ *
+ * @param maxUsageCount the default maximum usage count of entries
*/
public final void setMaxUsageCount(int maxUsageCount)
{
@@ -218,10 +246,10 @@ public final void setMaxUsageCount(int maxUsageCount)
}
/**
- * Create a new disabled slot into the pool.
- * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
+ *
Creates a new disabled slot into the pool.
+ *
The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
- * {@link Pool#remove(Pool.Entry)}.
+ * {@link Pool#remove(Pool.Entry)}.
*
* @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex,
* or a negative number to always trigger the reservation of a new entry.
@@ -252,10 +280,10 @@ public Entry reserve(int allotment)
}
/**
- * Create a new disabled slot into the pool.
- * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
+ *
Creates a new disabled slot into the pool.
+ *
The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
- * {@link Pool#remove(Pool.Entry)}.
+ * {@link Pool#remove(Pool.Entry)}.
*
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
@@ -279,10 +307,12 @@ public Entry reserve()
}
/**
- * Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism.
- * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
+ *
Acquires the entry from the pool at the specified index.
+ *
This method bypasses the thread-local cache mechanism.
+ *
* @param idx the index of the entry to acquire.
* @return the specified entry or null if there is none at the specified index or if it is not available.
+ * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
*/
@Deprecated
public Entry acquireAt(int idx)
@@ -304,8 +334,11 @@ public Entry acquireAt(int idx)
}
/**
- * Acquire an entry from the pool.
- * Only enabled entries will be returned from this method and their enable method must not be called.
+ *
Acquires an entry from the pool.
+ *
Only enabled entries will be returned from this method
+ * and their {@link Entry#enable(Object, boolean)}
+ * method must not be called.
+ *
* @return an entry from the pool or null if none is available.
*/
public Entry acquire()
@@ -326,7 +359,7 @@ public Entry acquire()
int index = startIndex(size);
- for (int tries = size; tries-- > 0;)
+ for (int tries = size; tries-- > 0; )
{
try
{
@@ -367,8 +400,8 @@ private int startIndex(int size)
}
/**
- * Utility method to acquire an entry from the pool,
- * reserving and creating a new entry if necessary.
+ *
Acquires an entry from the pool,
+ * reserving and creating a new entry if necessary.
*
* @param creator a function to create the pooled value for a reserved entry.
* @return an entry from the pool or null if none is available.
@@ -404,9 +437,9 @@ public Entry acquire(Function.Entry, T> creator)
}
/**
- * This method will return an acquired object to the pool. Objects
- * that are acquired from the pool but never released will result
- * in a memory leak.
+ *
Releases an {@link #acquire() acquired} entry to the pool.
+ *
Entries that are acquired from the pool but never released
+ * will result in a memory leak.
*
* @param entry the value to return to the pool
* @return true if the entry was released and could be acquired again,
@@ -426,7 +459,7 @@ public boolean release(Entry entry)
}
/**
- * Remove a value from the pool.
+ *
Removes an entry from the pool.
*
* @param entry the value to remove
* @return true if the entry was removed, false otherwise
@@ -510,6 +543,9 @@ public String toString()
closed);
}
+ /**
+ *
A Pool entry that holds metadata and a pooled object.
+ */
public class Entry
{
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
@@ -527,6 +563,9 @@ public class Entry
this.maxMultiplex = Pool.this.maxMultiplex;
}
+ /**
+ * @return the maximum multiplex count for this entry
+ */
public int getMaxMultiplex()
{
return maxMultiplex;
@@ -545,15 +584,17 @@ void setUsageCount(int usageCount)
this.state.getAndSetHi(usageCount);
}
- /** Enable a reserved entry {@link Entry}.
- * An entry returned from the {@link #reserve()} method must be enabled with this method,
- * once and only once, before it is usable by the pool.
- * The entry may be enabled and not acquired, in which case it is immediately available to be
+ /**
+ *
Enables a {@link #reserve() reserved} Entry.
+ *
An entry returned from the {@link #reserve()} method must be enabled with this method,
+ * once and only once, before it is usable by the pool.
+ *
The entry may be enabled and not acquired, in which case it is immediately available to be
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
- * no other thread can acquire it, although the acquire may still fail if the pool has been closed.
+ * no other thread can acquire it, although the acquire may still fail if the pool has been closed.
+ *
* @param pooled The pooled item for the entry
* @param acquire If true the entry is atomically enabled and acquired.
- * @return true If the entry was enabled.
+ * @return whether the entry was enabled
* @throws IllegalStateException if the entry was already enabled
*/
public boolean enable(T pooled, boolean acquire)
@@ -585,9 +626,10 @@ public T getPooled()
}
/**
- * Release the entry.
- * This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.
- * @return true if released.
+ *
Releases this Entry.
+ *
This is equivalent to calling {@link Pool#release(Entry)} passing this entry.
+ *
+ * @return whether the entry was released
*/
public boolean release()
{
@@ -595,9 +637,10 @@ public boolean release()
}
/**
- * Remove the entry.
- * This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.
- * @return true if remove.
+ *
Removes the entry.
+ *
This is equivalent to calling {@link Pool#remove(Entry)} passing this entry.
+ *
+ * @return whether the entry was removed
*/
public boolean remove()
{
@@ -605,8 +648,9 @@ public boolean remove()
}
/**
- * Try to acquire the entry if possible by incrementing both the usage
- * count and the multiplex count.
+ *
Tries to acquire the entry if possible by incrementing both the usage
+ * count and the multiplex count.
+ *
* @return true if the usage count is <= maxUsageCount and
* the multiplex count is maxMultiplex and the entry is not closed,
* false otherwise.
@@ -631,8 +675,9 @@ boolean tryAcquire()
}
/**
- * Try to release the entry if possible by decrementing the multiplexing
- * count unless the entity is closed.
+ *
Tries to release the entry if possible by decrementing the multiplexing
+ * count unless the entity is closed.
+ *
* @return true if the entry was released,
* false if {@link #tryRemove()} should be called.
*/
@@ -662,8 +707,9 @@ boolean tryRelease()
}
/**
- * Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
- * The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
+ *
Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
+ *
The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
+ *
* @return true if the entry can be removed from the containing pool, false otherwise.
*/
boolean tryRemove()
From a1297cac0ac9a1ebd2b2c00fad0baee6de15fc08 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Fri, 20 Aug 2021 11:39:13 +1000
Subject: [PATCH 3/7] Thought bubble on abstracting out multiplexing from the
pool
---
.../jetty/client/AbstractConnectionPool.java | 16 +--
.../client/MultiplexHttpDestination.java | 9 --
.../http/HttpClientTransportOverHTTP2.java | 19 +--
.../java/org/eclipse/jetty/util/Pool.java | 127 +++++++++---------
.../java/org/eclipse/jetty/util/PoolTest.java | 22 +--
5 files changed, 82 insertions(+), 111 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index 98551c0ad542..f8ec4235bc33 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -132,22 +132,12 @@ public void setMaxDuration(long maxDurationInMs)
protected int getMaxMultiplex()
{
- return pool.getMaxMultiplex();
+ return pool.getMaxInUse();
}
protected void setMaxMultiplex(int maxMultiplex)
{
- pool.setMaxMultiplex(maxMultiplex);
- }
-
- protected void setMaxMultiplex(Connection connection, int maxMultiplex)
- {
- if (connection instanceof Attachable)
- {
- Object attachment = ((Attachable)connection).getAttachment();
- if (attachment instanceof EntryHolder)
- ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
- }
+ pool.setMaxInUse(maxMultiplex);
}
protected int getMaxUsageCount()
@@ -538,8 +528,6 @@ public void succeeded(Connection connection)
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
- if (connection instanceof Multiplexable)
- setMaxMultiplex(connection, ((ConnectionPool.Multiplexable)connection).getMaxMultiplex());
idle(connection, false);
complete(null);
proceed();
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
index 9ba58785d18e..1003a3a77a00 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java
@@ -18,8 +18,6 @@
package org.eclipse.jetty.client;
-import org.eclipse.jetty.client.api.Connection;
-
public abstract class MultiplexHttpDestination extends HttpDestination
{
protected MultiplexHttpDestination(HttpClient client, Origin origin)
@@ -41,11 +39,4 @@ public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}
-
- public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
- {
- ConnectionPool connectionPool = getConnectionPool();
- if (connectionPool instanceof AbstractConnectionPool)
- ((AbstractConnectionPool)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
- }
}
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
index ad5ce4349eed..ec5003a2d62f 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java
@@ -211,26 +211,15 @@ private Promise connectionPromise()
@Override
public void onSettings(Session session, SettingsFrame frame)
{
- Map settings = frame.getSettings();
- Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
boolean[] initialized = new boolean[1];
- Connection connection = this.connection.get(initialized);
- if (initialized[0])
- {
- if (maxConcurrentStreams != null && connection != null)
- destination().setMaxRequestsPerConnection(connection, maxConcurrentStreams);
- }
- else
- {
- onServerPreface(session, maxConcurrentStreams);
- }
+ this.connection.get(initialized);
+ if (!initialized[0])
+ onServerPreface(session);
}
- private void onServerPreface(Session session, Integer maxConcurrentStreams)
+ private void onServerPreface(Session session)
{
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
- if (maxConcurrentStreams != null)
- connection.setMaxMultiplex(maxConcurrentStreams);
if (this.connection.compareAndSet(null, connection, false, true))
connectionPromise().succeeded(connection);
}
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index 907ed44bdca3..5df9f5229a8e 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -28,6 +28,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -40,8 +41,8 @@
import org.eclipse.jetty.util.thread.Locker;
/**
- *
A pool of objects, with optional support for
- * multiplexing, max usage count and several optimized strategies plus
+ *
A pool of objects, with optional support for availability checks,
+ * max usage count and several optimized strategies plus
* an optional {@link ThreadLocal} cache of the last release entry.
*
When the method {@link #close()} is called, all {@link Closeable}s
* object pooled by the pool are also closed.
@@ -67,8 +68,9 @@ public class Pool implements AutoCloseable, Dumpable
private final Locker locker = new Locker();
private final ThreadLocal cache;
private final AtomicInteger nextIndex;
+ private final BiFunction checkInUse;
private volatile boolean closed;
- private volatile int maxMultiplex = 1;
+ private volatile int maxInUse = 1;
private volatile int maxUsageCount = -1;
/**
@@ -128,11 +130,32 @@ public Pool(StrategyType strategyType, int maxEntries)
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
*/
public Pool(StrategyType strategyType, int maxEntries, boolean cache)
+ {
+ this(strategyType, maxEntries, cache, null);
+ }
+
+ /**
+ * Construct a Pool with the specified thread-local cache size and
+ * an optional {@link ThreadLocal} cache.
+ *
+ * @param strategyType The strategy to used for looking up entries.
+ * @param maxEntries the maximum amount of entries that the pool will accept.
+ * @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
+ * @param checkInUse A bi-function to check if the entry inUse count is OK to allocate another usage. If null
+ * then the inUse count will be checked against {@link #maxInUse}
+ */
+ public Pool(StrategyType strategyType, int maxEntries, boolean cache, BiFunction checkInUse)
{
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
+ this.checkInUse = checkInUse == null ? this::checkMaxInUse : checkInUse;
+ }
+
+ private Boolean checkMaxInUse(T item, Integer inUse)
+ {
+ return inUse < maxInUse;
}
/**
@@ -181,26 +204,24 @@ public int getMaxEntries()
}
/**
- * @return the default maximum multiplex count of entries
+ * @return the default maximum in-use count of entries
*/
- @ManagedAttribute("The default maximum multiplex count of entries")
- public int getMaxMultiplex()
+ @ManagedAttribute("The default maximum in-use count of entries")
+ public int getMaxInUse()
{
- return maxMultiplex;
+ return maxInUse;
}
/**
- *
Sets the default maximum multiplex count for the Pool's entries.
- *
This value is used to initialize {@link Entry#maxMultiplex}
- * when a new {@link Entry} is created.
+ *
Sets the default maximum in-use count for the Pool's entries.
*
- * @param maxMultiplex the default maximum multiplex count of entries
+ * @param maxMultiplex the default maximum in-use count of entries
*/
- public final void setMaxMultiplex(int maxMultiplex)
+ public final void setMaxInUse(int maxMultiplex)
{
if (maxMultiplex < 1)
- throw new IllegalArgumentException("Max multiplex must be >= 1");
- this.maxMultiplex = maxMultiplex;
+ throw new IllegalArgumentException("Max in-use must be >= 1");
+ this.maxInUse = maxMultiplex;
}
/**
@@ -270,7 +291,7 @@ public Entry reserve(int allotment)
if (space <= 0)
return null;
- if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment)
+ if (allotment >= 0 && getReservedCount() >= allotment)
return null;
Entry entry = new Entry();
@@ -548,34 +569,17 @@ public String toString()
*/
public class Entry
{
- // hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
- // lo: multiplexing counter
+ // hi: negative==closed else total-usage counter; negative=closed; MIN_VALUE pending
+ // lo: in-use counter
private final AtomicBiInteger state;
// The pooled item. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
private T pooled;
- private volatile int maxMultiplex;
Entry()
{
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
- this.maxMultiplex = Pool.this.maxMultiplex;
- }
-
- /**
- * @return the maximum multiplex count for this entry
- */
- public int getMaxMultiplex()
- {
- return maxMultiplex;
- }
-
- public void setMaxMultiplex(int maxMultiplex)
- {
- if (maxMultiplex < 1)
- throw new IllegalArgumentException("Max multiplex must be >= 1");
- this.maxMultiplex = maxMultiplex;
}
// for testing only
@@ -601,9 +605,9 @@ public boolean enable(T pooled, boolean acquire)
{
Objects.requireNonNull(pooled);
- if (state.getHi() != Integer.MIN_VALUE)
+ if (!isReserved())
{
- if (state.getHi() == -1)
+ if (isClosed())
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
@@ -612,7 +616,7 @@ public boolean enable(T pooled, boolean acquire)
if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage))
{
this.pooled = null;
- if (state.getHi() == -1)
+ if (isClosed())
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
@@ -649,10 +653,10 @@ public boolean remove()
/**
*
Tries to acquire the entry if possible by incrementing both the usage
- * count and the multiplex count.
+ * count and the in-use count.
*
* @return true if the usage count is <= maxUsageCount and
- * the multiplex count is maxMultiplex and the entry is not closed,
+ * the in-use count is maxMultiplex and the entry is not closed,
* false otherwise.
*/
boolean tryAcquire()
@@ -662,20 +666,20 @@ boolean tryAcquire()
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
boolean closed = usageCount < 0;
- int multiplexingCount = AtomicBiInteger.getLo(encoded);
- int currentMaxUsageCount = maxUsageCount;
- if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount))
+ int inUseCount = AtomicBiInteger.getLo(encoded);
+ int maxUsageCount = Pool.this.maxUsageCount;
+ if (closed || !checkInUse.apply(pooled, inUseCount) || (maxUsageCount > 0 && usageCount >= maxUsageCount))
return false;
// Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE.
int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1;
- if (state.compareAndSet(encoded, newUsageCount, multiplexingCount + 1))
+ if (state.compareAndSet(encoded, newUsageCount, inUseCount + 1))
return true;
}
}
/**
- *
Tries to release the entry if possible by decrementing the multiplexing
+ *
Tries to release the entry if possible by decrementing the in-use
* count unless the entity is closed.
*
* @return true if the entry was released,
@@ -683,7 +687,7 @@ boolean tryAcquire()
*/
boolean tryRelease()
{
- int newMultiplexingCount;
+ int newInUseCount;
int usageCount;
while (true)
{
@@ -693,22 +697,22 @@ boolean tryRelease()
if (closed)
return false;
- newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1;
- if (newMultiplexingCount < 0)
+ newInUseCount = AtomicBiInteger.getLo(encoded) - 1;
+ if (newInUseCount < 0)
throw new IllegalStateException("Cannot release an already released entry");
- if (state.compareAndSet(encoded, usageCount, newMultiplexingCount))
+ if (state.compareAndSet(encoded, usageCount, newInUseCount))
break;
}
int currentMaxUsageCount = maxUsageCount;
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
- return !(overUsed && newMultiplexingCount == 0);
+ return !(overUsed && newInUseCount == 0);
}
/**
- *
Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
- *
The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
+ *
Try to remove the entry by marking it as closed and decrementing the in-use counter.
+ *
The in-use counter will never go below zero and if it reaches zero, the entry is considered removed.
*
* @return true if the entry can be removed from the containing pool, false otherwise.
*/
@@ -718,12 +722,12 @@ boolean tryRemove()
{
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
- int multiplexCount = AtomicBiInteger.getLo(encoded);
- int newMultiplexCount = Math.max(multiplexCount - 1, 0);
+ int inUseCount = AtomicBiInteger.getLo(encoded);
+ int newInUseCount = Math.max(inUseCount - 1, 0);
- boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount);
+ boolean removed = state.compareAndSet(usageCount, -1, inUseCount, newInUseCount);
if (removed)
- return newMultiplexCount == 0;
+ return newInUseCount == 0;
}
}
@@ -761,8 +765,8 @@ boolean isIdleAndOverUsed()
int currentMaxUsageCount = maxUsageCount;
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
- int multiplexCount = AtomicBiInteger.getLo(encoded);
- return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && multiplexCount == 0;
+ int inUseCount = AtomicBiInteger.getLo(encoded);
+ return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && inUseCount == 0;
}
public int getUsageCount()
@@ -775,17 +779,16 @@ public String toString()
{
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
- int multiplexCount = AtomicBiInteger.getLo(encoded);
+ int inUseCount = AtomicBiInteger.getLo(encoded);
- String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE";
+ String state = usageCount < 0 ? "CLOSED" : inUseCount == 0 ? "IDLE" : "INUSE";
- return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}",
+ return String.format("%s@%x{%s, used=%d, inUse=%d, pooled=%s}",
getClass().getSimpleName(),
hashCode(),
state,
Math.max(usageCount, 0),
- Math.max(multiplexCount, 0),
- getMaxMultiplex(),
+ Math.max(inUseCount, 0),
pooled);
}
}
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
index f8b2ffdb2302..c8e1355e9122 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
@@ -170,7 +170,7 @@ public void testMaxPoolSize(Factory factory)
public void testReserve(Factory factory)
{
Pool pool = factory.getPool(2);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
// Reserve an entry
Pool.Entry e1 = pool.reserve();
@@ -400,7 +400,7 @@ public void testMaxUsageCount(Factory factory)
public void testMaxMultiplex(Factory factory)
{
Pool pool = factory.getPool(2);
- pool.setMaxMultiplex(3);
+ pool.setMaxInUse(3);
Map counts = new HashMap<>();
AtomicInteger a = new AtomicInteger();
@@ -434,7 +434,7 @@ public void testMaxMultiplex(Factory factory)
public void testRemoveMultiplexed(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -464,7 +464,7 @@ public void testRemoveMultiplexed(Factory factory)
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -482,7 +482,7 @@ public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -495,7 +495,7 @@ public void testNonMultiplexRemoveAfterAcquire(Factory factory)
public void testMultiplexRemoveAfterAcquire(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -544,7 +544,7 @@ public void testRemoveNonEnabledEntry(Factory factory)
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.setMaxUsageCount(3);
pool.reserve().enable(new CloseableHolder("aaa"), false);
@@ -565,7 +565,7 @@ public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.setMaxUsageCount(3);
pool.reserve().enable(new CloseableHolder("aaa"), false);
@@ -590,7 +590,7 @@ public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory fac
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxMultiplex(2);
+ pool.setMaxInUse(2);
pool.setMaxUsageCount(10);
pool.reserve().enable(new CloseableHolder("aaa"), false);
@@ -645,8 +645,8 @@ public void testDynamicMaxUsageCountChangeSweep(Factory factory)
@Test
public void testConfigLimits()
{
- assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(0));
- assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(-1));
+ assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxInUse(0));
+ assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxInUse(-1));
assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxUsageCount(0));
}
From e4d1fb95746f165687a16c37fef4669bb29721b9 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Fri, 20 Aug 2021 18:27:06 +1000
Subject: [PATCH 4/7] revert to the original naming
---
.../jetty/client/AbstractConnectionPool.java | 4 +-
.../java/org/eclipse/jetty/util/Pool.java | 56 ++++++++++---------
.../java/org/eclipse/jetty/util/PoolTest.java | 22 ++++----
3 files changed, 43 insertions(+), 39 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index f8ec4235bc33..093cc37a8ff5 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -132,12 +132,12 @@ public void setMaxDuration(long maxDurationInMs)
protected int getMaxMultiplex()
{
- return pool.getMaxInUse();
+ return pool.getMaxMultiplex();
}
protected void setMaxMultiplex(int maxMultiplex)
{
- pool.setMaxInUse(maxMultiplex);
+ pool.setMaxMultiplex(maxMultiplex);
}
protected int getMaxUsageCount()
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index 5df9f5229a8e..400c3ab2f48f 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -28,7 +28,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -68,10 +68,11 @@ public class Pool implements AutoCloseable, Dumpable
private final Locker locker = new Locker();
private final ThreadLocal cache;
private final AtomicInteger nextIndex;
- private final BiFunction checkInUse;
+ private final BiPredicate available;
private volatile boolean closed;
- private volatile int maxInUse = 1;
- private volatile int maxUsageCount = -1;
+ private volatile int maxUsage = -1;
+ @Deprecated
+ private volatile int maxMultiplex = 1;
/**
* The type of the strategy to use for the pool.
@@ -141,21 +142,21 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
* @param strategyType The strategy to used for looking up entries.
* @param maxEntries the maximum amount of entries that the pool will accept.
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
- * @param checkInUse A bi-function to check if the entry inUse count is OK to allocate another usage. If null
- * then the inUse count will be checked against {@link #maxInUse}
+ * @param available A bi predicate to check if the entry available for another usage. If null
+ * then the active count will be checked against {@link #maxMultiplex}
*/
- public Pool(StrategyType strategyType, int maxEntries, boolean cache, BiFunction checkInUse)
+ public Pool(StrategyType strategyType, int maxEntries, boolean cache, BiPredicate available)
{
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
- this.checkInUse = checkInUse == null ? this::checkMaxInUse : checkInUse;
+ this.available = available == null ? this::isAvailable : available;
}
- private Boolean checkMaxInUse(T item, Integer inUse)
+ private boolean isAvailable(T item, Integer inUse)
{
- return inUse < maxInUse;
+ return inUse < maxMultiplex;
}
/**
@@ -205,23 +206,27 @@ public int getMaxEntries()
/**
* @return the default maximum in-use count of entries
+ * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, BiPredicate)}
*/
- @ManagedAttribute("The default maximum in-use count of entries")
- public int getMaxInUse()
+ @Deprecated
+ @ManagedAttribute("The default maximum multiplex count of entries")
+ public int getMaxMultiplex()
{
- return maxInUse;
+ return maxMultiplex;
}
/**
- *
Sets the default maximum in-use count for the Pool's entries.
+ *
Sets the default maximum multiplex count for the Pool's entries.
*
- * @param maxMultiplex the default maximum in-use count of entries
+ * @param maxMultiplex the default maximum multiplex count of entries
+ * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, BiPredicate)}
*/
- public final void setMaxInUse(int maxMultiplex)
+ @Deprecated
+ public final void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
- throw new IllegalArgumentException("Max in-use must be >= 1");
- this.maxInUse = maxMultiplex;
+ throw new IllegalArgumentException("Max multiplex must be >= 1");
+ this.maxMultiplex = maxMultiplex;
}
/**
@@ -233,7 +238,7 @@ public final void setMaxInUse(int maxMultiplex)
@ManagedAttribute("The default maximum usage count of entries")
public int getMaxUsageCount()
{
- return maxUsageCount;
+ return maxUsage;
}
/**
@@ -247,7 +252,7 @@ public final void setMaxUsageCount(int maxUsageCount)
{
if (maxUsageCount == 0)
throw new IllegalArgumentException("Max usage count must be != 0");
- this.maxUsageCount = maxUsageCount;
+ this.maxUsage = maxUsageCount;
// Iterate the entries, remove overused ones and collect a list of the closeable removed ones.
List copy;
@@ -667,8 +672,8 @@ boolean tryAcquire()
int usageCount = AtomicBiInteger.getHi(encoded);
boolean closed = usageCount < 0;
int inUseCount = AtomicBiInteger.getLo(encoded);
- int maxUsageCount = Pool.this.maxUsageCount;
- if (closed || !checkInUse.apply(pooled, inUseCount) || (maxUsageCount > 0 && usageCount >= maxUsageCount))
+ int maxUsageCount = Pool.this.maxUsage;
+ if (closed || !available.test(pooled, inUseCount) || (maxUsageCount > 0 && usageCount >= maxUsageCount))
return false;
// Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE.
@@ -705,7 +710,7 @@ boolean tryRelease()
break;
}
- int currentMaxUsageCount = maxUsageCount;
+ int currentMaxUsageCount = maxUsage;
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
return !(overUsed && newInUseCount == 0);
}
@@ -755,18 +760,17 @@ public boolean isInUse()
public boolean isOverUsed()
{
- int currentMaxUsageCount = maxUsageCount;
+ int currentMaxUsageCount = maxUsage;
int usageCount = state.getHi();
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
}
boolean isIdleAndOverUsed()
{
- int currentMaxUsageCount = maxUsageCount;
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
int inUseCount = AtomicBiInteger.getLo(encoded);
- return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && inUseCount == 0;
+ return maxUsage > 0 && usageCount >= maxUsage && inUseCount == 0;
}
public int getUsageCount()
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
index c8e1355e9122..f8b2ffdb2302 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java
@@ -170,7 +170,7 @@ public void testMaxPoolSize(Factory factory)
public void testReserve(Factory factory)
{
Pool pool = factory.getPool(2);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
// Reserve an entry
Pool.Entry e1 = pool.reserve();
@@ -400,7 +400,7 @@ public void testMaxUsageCount(Factory factory)
public void testMaxMultiplex(Factory factory)
{
Pool pool = factory.getPool(2);
- pool.setMaxInUse(3);
+ pool.setMaxMultiplex(3);
Map counts = new HashMap<>();
AtomicInteger a = new AtomicInteger();
@@ -434,7 +434,7 @@ public void testMaxMultiplex(Factory factory)
public void testRemoveMultiplexed(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -464,7 +464,7 @@ public void testRemoveMultiplexed(Factory factory)
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -482,7 +482,7 @@ public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -495,7 +495,7 @@ public void testNonMultiplexRemoveAfterAcquire(Factory factory)
public void testMultiplexRemoveAfterAcquire(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.reserve().enable(new CloseableHolder("aaa"), false);
Pool.Entry e1 = pool.acquire();
@@ -544,7 +544,7 @@ public void testRemoveNonEnabledEntry(Factory factory)
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve().enable(new CloseableHolder("aaa"), false);
@@ -565,7 +565,7 @@ public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve().enable(new CloseableHolder("aaa"), false);
@@ -590,7 +590,7 @@ public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory fac
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
{
Pool pool = factory.getPool(1);
- pool.setMaxInUse(2);
+ pool.setMaxMultiplex(2);
pool.setMaxUsageCount(10);
pool.reserve().enable(new CloseableHolder("aaa"), false);
@@ -645,8 +645,8 @@ public void testDynamicMaxUsageCountChangeSweep(Factory factory)
@Test
public void testConfigLimits()
{
- assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxInUse(0));
- assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxInUse(-1));
+ assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(0));
+ assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(-1));
assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxUsageCount(0));
}
From 5834e5f9824ffe127dcecd32315751ddc9846052 Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Thu, 26 Aug 2021 10:59:03 +1000
Subject: [PATCH 5/7] Review suggestion
Use interface
---
.../main/java/org/eclipse/jetty/util/Pool.java | 16 ++++++++++------
1 file changed, 10 insertions(+), 6 deletions(-)
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index 400c3ab2f48f..4f86f15c462e 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -28,7 +28,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -68,7 +67,7 @@ public class Pool implements AutoCloseable, Dumpable
private final Locker locker = new Locker();
private final ThreadLocal cache;
private final AtomicInteger nextIndex;
- private final BiPredicate available;
+ private final Availability available;
private volatile boolean closed;
private volatile int maxUsage = -1;
@Deprecated
@@ -145,7 +144,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
* @param available A bi predicate to check if the entry available for another usage. If null
* then the active count will be checked against {@link #maxMultiplex}
*/
- public Pool(StrategyType strategyType, int maxEntries, boolean cache, BiPredicate available)
+ public Pool(StrategyType strategyType, int maxEntries, boolean cache, Availability available)
{
this.maxEntries = maxEntries;
this.strategyType = strategyType;
@@ -154,7 +153,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache, BiPredicat
this.available = available == null ? this::isAvailable : available;
}
- private boolean isAvailable(T item, Integer inUse)
+ private boolean isAvailable(T item, int inUse)
{
return inUse < maxMultiplex;
}
@@ -206,7 +205,7 @@ public int getMaxEntries()
/**
* @return the default maximum in-use count of entries
- * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, BiPredicate)}
+ * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, Availability)}
*/
@Deprecated
@ManagedAttribute("The default maximum multiplex count of entries")
@@ -219,7 +218,7 @@ public int getMaxMultiplex()
*
Sets the default maximum multiplex count for the Pool's entries.
*
* @param maxMultiplex the default maximum multiplex count of entries
- * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, BiPredicate)}
+ * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, Availability)}
*/
@Deprecated
public final void setMaxMultiplex(int maxMultiplex)
@@ -796,4 +795,9 @@ public String toString()
pooled);
}
}
+
+ public interface Availability
+ {
+ boolean test(T item, int inUse);
+ }
}
From 7b78b90cd1c13c03314eb778f83b66c63f9c0a3f Mon Sep 17 00:00:00 2001
From: Greg Wilkins
Date: Fri, 27 Aug 2021 12:36:13 +1000
Subject: [PATCH 6/7] Work In Progress
---
.../jetty/client/AbstractConnectionPool.java | 8 +-
.../eclipse/jetty/client/ConnectionPool.java | 16 -
.../jetty/client/DuplexConnectionPool.java | 3 +-
.../jetty/client/MultiplexConnectionPool.java | 26 +-
.../jetty/client/RandomConnectionPool.java | 2 +-
.../client/RoundRobinConnectionPool.java | 2 +-
.../client/http/HttpConnectionOverHTTP2.java | 14 +-
.../http/MultiplexedConnectionPoolTest.java | 41 +-
.../java/org/eclipse/jetty/util/Pool.java | 367 ++++++++++++------
.../java/org/eclipse/jetty/util/PoolTest.java | 87 +----
10 files changed, 324 insertions(+), 242 deletions(-)
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
index 093cc37a8ff5..45cabd6ced4a 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java
@@ -70,7 +70,12 @@ protected AbstractConnectionPool(Destination destination, int maxConnections, Ca
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
- this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
+ this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
+ }
+
+ protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
+ {
+ this(destination, new Pool<>(strategy, maxConnections, cache), requester);
}
protected AbstractConnectionPool(HttpDestination destination, Pool pool, Callback requester)
@@ -78,6 +83,7 @@ protected AbstractConnectionPool(HttpDestination destination, Pool p
this.destination = destination;
this.requester = requester;
this.pool = pool;
+ pool.setMaxMultiplex(1); // Force the use of MultiUseEntry
addBean(pool);
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
index d73056a82a8f..208f4186a060 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java
@@ -102,20 +102,4 @@ interface Factory
*/
ConnectionPool newConnectionPool(HttpDestination destination);
}
-
- /**
- * Marks a connection as supporting multiplexed requests.
- */
- interface Multiplexable
- {
- /**
- * @return the max number of requests that can be multiplexed on a connection
- */
- int getMaxMultiplex();
-
- /**
- * @param maxMultiplex the max number of requests that can be multiplexed on a connection
- */
- void setMaxMultiplex(int maxMultiplex);
- }
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java
index eab2f96915c1..07c6ea7f9a16 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java
@@ -34,9 +34,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
- this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
+ super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}
+ @Deprecated
public DuplexConnectionPool(HttpDestination destination, Pool pool, Callback requester)
{
super(destination, pool, requester);
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
index f95999b7baa2..4e16b6f7de8b 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java
@@ -34,12 +34,21 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
- this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
+ this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
}
- public MultiplexConnectionPool(HttpDestination destination, Pool pool, Callback requester, int maxMultiplex)
+ public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
- super(destination, pool, requester);
+ super(destination, new Pool(strategy, maxConnections, cache)
+ {
+ @Override
+ protected int getMaxMultiplex(Connection connection)
+ {
+ if (connection instanceof Multiplexable)
+ return ((Multiplexable)connection).getMaxMultiplex();
+ return super.getMaxMultiplex(connection);
+ }
+ }, requester);
setMaxMultiplex(maxMultiplex);
}
@@ -68,4 +77,15 @@ public void setMaxUsageCount(int maxUsageCount)
{
super.setMaxUsageCount(maxUsageCount);
}
+
+ /**
+ * Marks a connection as supporting multiplexed requests.
+ */
+ public interface Multiplexable extends Connection
+ {
+ /**
+ * @return the max number of requests that can be multiplexed on a connection
+ */
+ int getMaxMultiplex();
+ }
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
index e924f8d89e1e..c6a974876b33 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java
@@ -31,6 +31,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
- super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
+ super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
}
}
diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
index 641c65d8ca67..8e046b46ac4d 100644
--- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
+++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java
@@ -56,7 +56,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
- super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
+ super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
index c3bd1b660d35..a3c9857d5e89 100644
--- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
+++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java
@@ -28,22 +28,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
+import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
+import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;
-public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
+public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, MultiplexConnectionPool.Multiplexable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);
@@ -53,7 +54,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels = true;
- private int maxMultiplex = 1;
public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
@@ -76,14 +76,10 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}
+ @Override
public int getMaxMultiplex()
{
- return maxMultiplex;
- }
-
- public void setMaxMultiplex(int maxMultiplex)
- {
- this.maxMultiplex = maxMultiplex;
+ return ((HTTP2Session)session).getMaxLocalStreams();
}
@Override
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java
index 3c684a76fcd3..e5d42df4f427 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java
@@ -100,10 +100,12 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
- Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
- poolRef.set(pool);
- MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
+ MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
+ {
+ poolRef.set(getBean(Pool.class));
+ }
+
@Override
protected void onCreated(Connection connection)
{
@@ -161,22 +163,27 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
- Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
- poolRef.set(pool);
- MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
- {
- @Override
- protected void onCreated(Connection connection)
+ MultiplexConnectionPool connectionPool =
+ new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
- poolCreateCounter.incrementAndGet();
- }
+ {
+ poolRef.set(getBean(Pool.class));
+ }
+
+ @Override
+ protected void onCreated(Connection connection)
+ {
+ poolCreateCounter.incrementAndGet();
+ }
+
+ @Override
+ protected void removed(Connection connection)
+ {
+ poolRemoveCounter.incrementAndGet();
+ }
+ };
+
- @Override
- protected void removed(Connection connection)
- {
- poolRemoveCounter.incrementAndGet();
- }
- };
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
index 4f86f15c462e..8d6ed40458b8 100644
--- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
+++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
@@ -67,11 +67,9 @@ public class Pool implements AutoCloseable, Dumpable
private final Locker locker = new Locker();
private final ThreadLocal cache;
private final AtomicInteger nextIndex;
- private final Availability available;
private volatile boolean closed;
private volatile int maxUsage = -1;
- @Deprecated
- private volatile int maxMultiplex = 1;
+ private volatile int maxMultiplex = -1;
/**
* The type of the strategy to use for the pool.
@@ -130,32 +128,11 @@ public Pool(StrategyType strategyType, int maxEntries)
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
*/
public Pool(StrategyType strategyType, int maxEntries, boolean cache)
- {
- this(strategyType, maxEntries, cache, null);
- }
-
- /**
- * Construct a Pool with the specified thread-local cache size and
- * an optional {@link ThreadLocal} cache.
- *
- * @param strategyType The strategy to used for looking up entries.
- * @param maxEntries the maximum amount of entries that the pool will accept.
- * @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
- * @param available A bi predicate to check if the entry available for another usage. If null
- * then the active count will be checked against {@link #maxMultiplex}
- */
- public Pool(StrategyType strategyType, int maxEntries, boolean cache, Availability available)
{
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
- this.available = available == null ? this::isAvailable : available;
- }
-
- private boolean isAvailable(T item, int inUse)
- {
- return inUse < maxMultiplex;
}
/**
@@ -205,27 +182,37 @@ public int getMaxEntries()
/**
* @return the default maximum in-use count of entries
- * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, Availability)}
*/
- @Deprecated
@ManagedAttribute("The default maximum multiplex count of entries")
public int getMaxMultiplex()
{
- return maxMultiplex;
+ return maxMultiplex == -1 ? 1 : maxMultiplex;
+ }
+
+ protected int getMaxMultiplex(T item)
+ {
+ return getMaxMultiplex();
}
/**
*
Sets the default maximum multiplex count for the Pool's entries.
*
* @param maxMultiplex the default maximum multiplex count of entries
- * @deprecated Use available predicate in {@link #Pool(StrategyType, int, boolean, Availability)}
*/
- @Deprecated
public final void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
- this.maxMultiplex = maxMultiplex;
+ try (Locker.Lock l = locker.lock())
+ {
+ if (closed)
+ return;
+
+ if (entries.stream().anyMatch(SingleUseEntry.class::isInstance))
+ throw new IllegalStateException("SingleUse Entries in use");
+
+ this.maxMultiplex = maxMultiplex;
+ }
}
/**
@@ -240,6 +227,11 @@ public int getMaxUsageCount()
return maxUsage;
}
+ protected int getMaxUsageCount(T item)
+ {
+ return getMaxUsageCount();
+ }
+
/**
*
Sets the maximum usage count for the Pool's entries.
*
All existing idle entries that have a usage count larger
@@ -251,7 +243,6 @@ public final void setMaxUsageCount(int maxUsageCount)
{
if (maxUsageCount == 0)
throw new IllegalArgumentException("Max usage count must be != 0");
- this.maxUsage = maxUsageCount;
// Iterate the entries, remove overused ones and collect a list of the closeable removed ones.
List copy;
@@ -260,6 +251,11 @@ public final void setMaxUsageCount(int maxUsageCount)
if (closed)
return;
+ if (entries.stream().anyMatch(SingleUseEntry.class::isInstance))
+ throw new IllegalStateException("SingleUse Entries in use");
+
+ this.maxUsage = maxUsageCount;
+
copy = entries.stream()
.filter(entry -> entry.isIdleAndOverUsed() && remove(entry) && entry.pooled instanceof Closeable)
.map(entry -> (Closeable)entry.pooled)
@@ -271,37 +267,12 @@ public final void setMaxUsageCount(int maxUsageCount)
}
/**
- *
Creates a new disabled slot into the pool.
- *
The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
- * method called or be removed via {@link Pool.Entry#remove()} or
- * {@link Pool#remove(Pool.Entry)}.
- *
- * @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex,
- * or a negative number to always trigger the reservation of a new entry.
- * @return a disabled entry that is contained in the pool,
- * or null if the pool is closed or if the pool already contains
- * {@link #getMaxEntries()} entries, or the allotment has already been reserved
- * @deprecated Use {@link #reserve()} instead
+ * @deprecated Use {@link #reserve()}
*/
@Deprecated
public Entry reserve(int allotment)
{
- try (Locker.Lock l = locker.lock())
- {
- if (closed)
- return null;
-
- int space = maxEntries - entries.size();
- if (space <= 0)
- return null;
-
- if (allotment >= 0 && getReservedCount() >= allotment)
- return null;
-
- Entry entry = new Entry();
- entries.add(entry);
- return entry;
- }
+ return null;
}
/**
@@ -325,36 +296,25 @@ public Entry reserve()
if (entries.size() >= maxEntries)
return null;
- Entry entry = new Entry();
+ Entry entry = newEntry();
entries.add(entry);
return entry;
}
}
+ protected Entry newEntry()
+ {
+ if (maxMultiplex >= 0 || maxUsage >= 0)
+ return new MultiUseEntry();
+ return new SingleUseEntry();
+ }
+
/**
- *
Acquires the entry from the pool at the specified index.
- *
This method bypasses the thread-local cache mechanism.
- *
- * @param idx the index of the entry to acquire.
- * @return the specified entry or null if there is none at the specified index or if it is not available.
- * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
+ * @deprecated Use {@link #acquire()}
*/
@Deprecated
public Entry acquireAt(int idx)
{
- if (closed)
- return null;
-
- try
- {
- Entry entry = entries.get(idx);
- if (entry.tryAcquire())
- return entry;
- }
- catch (IndexOutOfBoundsException e)
- {
- // no entry at that index
- }
return null;
}
@@ -561,9 +521,10 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
- return String.format("%s@%x[size=%d closed=%s]",
+ return String.format("%s@%x[inUse=%d/%d, closed=%s]",
getClass().getSimpleName(),
hashCode(),
+ getInUseCount(),
entries.size(),
closed);
}
@@ -571,26 +532,12 @@ public String toString()
/**
*
A Pool entry that holds metadata and a pooled object.
*/
- public class Entry
+ public abstract class Entry
{
- // hi: negative==closed else total-usage counter; negative=closed; MIN_VALUE pending
- // lo: in-use counter
- private final AtomicBiInteger state;
// The pooled item. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
- private T pooled;
-
- Entry()
- {
- this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
- }
-
- // for testing only
- void setUsageCount(int usageCount)
- {
- this.state.getAndSetHi(usageCount);
- }
+ protected T pooled;
/**
*
Enables a {@link #reserve() reserved} Entry.
@@ -616,18 +563,18 @@ public boolean enable(T pooled, boolean acquire)
throw new IllegalStateException("Entry already enabled: " + this);
}
this.pooled = pooled;
- int usage = acquire ? 1 : 0;
- if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage))
- {
- this.pooled = null;
- if (isClosed())
- return false; // Pool has been closed
- throw new IllegalStateException("Entry already enabled: " + this);
- }
- return true;
+ if (tryEnable(acquire))
+ return true;
+
+ this.pooled = null;
+ if (isClosed())
+ return false; // Pool has been closed
+ throw new IllegalStateException("Entry already enabled: " + this);
}
+ protected abstract boolean tryEnable(boolean acquire);
+
public T getPooled()
{
return pooled;
@@ -655,6 +602,207 @@ public boolean remove()
return Pool.this.remove(this);
}
+ /**
+ *
Tries to acquire the entry if possible by incrementing both the usage
+ * count and the in-use count.
+ *
+ * @return true if the usage count is <= maxUsageCount and
+ * the in-use count is maxMultiplex and the entry is not closed,
+ * false otherwise.
+ */
+ abstract boolean tryAcquire();
+
+ /**
+ *
Tries to release the entry if possible by decrementing the in-use
+ * count unless the entity is closed.
+ *
+ * @return true if the entry was released,
+ * false if {@link #tryRemove()} should be called.
+ */
+ abstract boolean tryRelease();
+
+ /**
+ *
Try to remove the entry by marking it as closed.
+ *
+ * @return true if the entry can be removed from the containing pool, false otherwise.
+ */
+ abstract boolean tryRemove();
+
+ public abstract boolean isClosed();
+
+ public abstract boolean isReserved();
+
+ public abstract boolean isIdle();
+
+ public abstract boolean isInUse();
+
+ @Deprecated
+ public boolean isOverUsed()
+ {
+ return false;
+ }
+
+ @Deprecated
+ boolean isIdleAndOverUsed()
+ {
+ return false;
+ }
+
+ public int getUsageCount()
+ {
+ return isIdle() ? 0 : 1;
+ }
+ }
+
+ /**
+ *
A Pool entry that holds metadata and a pooled object.
Tries to acquire the entry if possible by incrementing both the usage
+ * count and the in-use count.
+ *
+ * @return true if the usage count is <= maxUsageCount and
+ * the in-use count is maxMultiplex and the entry is not closed,
+ * false otherwise.
+ */
+ @Override
+ boolean tryAcquire()
+ {
+ while (true)
+ {
+ int s = state.get();
+ if (s != 0)
+ return false;
+ if (state.compareAndSet(s, 1))
+ return true;
+ }
+ }
+
+ /**
+ *
Tries to release the entry if possible by decrementing the in-use
+ * count unless the entity is closed.
+ *
+ * @return true if the entry was released,
+ * false if {@link #tryRemove()} should be called.
+ */
+ @Override
+ boolean tryRelease()
+ {
+ while (true)
+ {
+ int s = state.get();
+ if (s < 0)
+ return false;
+ if (s == 0)
+ throw new IllegalStateException("Cannot release an already released entry");
+ if (state.compareAndSet(s, 0))
+ return true;
+ }
+ }
+
+ /**
+ *