diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 98551c0ad542..018dfb08c085 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -70,7 +70,12 @@ protected AbstractConnectionPool(Destination destination, int maxConnections, Ca protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester); + } + + protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester) + { + this(destination, new Pool<>(strategy, maxConnections, cache), requester); } protected AbstractConnectionPool(HttpDestination destination, Pool pool, Callback requester) @@ -78,6 +83,7 @@ protected AbstractConnectionPool(HttpDestination destination, Pool p this.destination = destination; this.requester = requester; this.pool = pool; + pool.setMaxMultiplex(1); // Force the use of multiplexing. addBean(pool); } @@ -140,16 +146,6 @@ protected void setMaxMultiplex(int maxMultiplex) pool.setMaxMultiplex(maxMultiplex); } - protected void setMaxMultiplex(Connection connection, int maxMultiplex) - { - if (connection instanceof Attachable) - { - Object attachment = ((Attachable)connection).getAttachment(); - if (attachment instanceof EntryHolder) - ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex); - } - } - protected int getMaxUsageCount() { return pool.getMaxUsageCount(); @@ -538,8 +534,6 @@ public void succeeded(Connection connection) onCreated(connection); pending.decrementAndGet(); reserved.enable(connection, false); - if (connection instanceof Multiplexable) - setMaxMultiplex(connection, ((ConnectionPool.Multiplexable)connection).getMaxMultiplex()); idle(connection, false); complete(null); proceed(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index d73056a82a8f..88d0b97adf0c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -29,7 +29,7 @@ public interface ConnectionPool extends Closeable { /** - * Optionally pre-create up to connectionCount + * Optionally pre-create up to {@code connectionCount} * connections so they are immediately ready for use. * @param connectionCount the number of connections to pre-start. */ @@ -109,13 +109,17 @@ interface Factory interface Multiplexable { /** - * @return the max number of requests that can be multiplexed on a connection + * @return the max number of requests multiplexable on a single connection */ int getMaxMultiplex(); /** - * @param maxMultiplex the max number of requests that can be multiplexed on a connection + * @param maxMultiplex the max number of requests multiplexable on a single connection + * @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed */ - void setMaxMultiplex(int maxMultiplex); + @Deprecated + default void setMaxMultiplex(int maxMultiplex) + { + } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java index eab2f96915c1..07c6ea7f9a16 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/DuplexConnectionPool.java @@ -34,9 +34,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester); + super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester); } + @Deprecated public DuplexConnectionPool(HttpDestination destination, Pool pool, Callback requester) { super(destination, pool, requester); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index f95999b7baa2..79df556bf32f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -34,9 +34,26 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections, public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex) { - this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex); + this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex); } + public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex) + { + super(destination, new Pool(strategy, maxConnections, cache) + { + @Override + protected int getMaxMultiplex(Connection connection) + { + int multiplex = (connection instanceof Multiplexable) + ? ((Multiplexable)connection).getMaxMultiplex() + : super.getMaxMultiplex(connection); + return multiplex > 0 ? multiplex : 1; + } + }, requester); + setMaxMultiplex(maxMultiplex); + } + + @Deprecated public MultiplexConnectionPool(HttpDestination destination, Pool pool, Callback requester, int maxMultiplex) { super(destination, pool, requester); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index 9ba58785d18e..1003a3a77a00 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.client; -import org.eclipse.jetty.client.api.Connection; - public abstract class MultiplexHttpDestination extends HttpDestination { protected MultiplexHttpDestination(HttpClient client, Origin origin) @@ -41,11 +39,4 @@ public void setMaxRequestsPerConnection(int maxRequestsPerConnection) if (connectionPool instanceof AbstractConnectionPool) ((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection); } - - public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection) - { - ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof AbstractConnectionPool) - ((AbstractConnectionPool)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection); - } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java index e924f8d89e1e..c6a974876b33 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -31,6 +31,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool { public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex); + super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 641c65d8ca67..8e046b46ac4d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -56,7 +56,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex); + super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex); // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index ad5ce4349eed..4be18e3feabf 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -211,26 +211,13 @@ private Promise connectionPromise() @Override public void onSettings(Session session, SettingsFrame frame) { - Map settings = frame.getSettings(); - Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS); - boolean[] initialized = new boolean[1]; - Connection connection = this.connection.get(initialized); - if (initialized[0]) - { - if (maxConcurrentStreams != null && connection != null) - destination().setMaxRequestsPerConnection(connection, maxConcurrentStreams); - } - else - { - onServerPreface(session, maxConcurrentStreams); - } + if (!connection.isMarked()) + onServerPreface(session); } - private void onServerPreface(Session session, Integer maxConcurrentStreams) + private void onServerPreface(Session session) { HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session); - if (maxConcurrentStreams != null) - connection.setMaxMultiplex(maxConcurrentStreams); if (this.connection.compareAndSet(null, connection, false, true)) connectionPromise().succeeded(connection); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index c3bd1b660d35..deecd58f4b7d 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -37,6 +37,7 @@ import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -53,7 +54,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private final AtomicInteger sweeps = new AtomicInteger(); private final Session session; private boolean recycleHttpChannels = true; - private int maxMultiplex = 1; public HttpConnectionOverHTTP2(HttpDestination destination, Session session) { @@ -76,14 +76,10 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels) this.recycleHttpChannels = recycleHttpChannels; } + @Override public int getMaxMultiplex() { - return maxMultiplex; - } - - public void setMaxMultiplex(int maxMultiplex) - { - this.maxMultiplex = maxMultiplex; + return ((HTTP2Session)session).getMaxLocalStreams(); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java index 3c684a76fcd3..307260e8eb08 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java @@ -100,9 +100,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); - poolRef.set(pool); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -116,6 +114,7 @@ protected void removed(Connection connection) poolRemoveCounter.incrementAndGet(); } }; + poolRef.set(connectionPool.getBean(Pool.class)); connectionPool.setMaxDuration(maxDuration); return connectionPool; }); @@ -161,9 +160,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> { int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); - Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); - poolRef.set(pool); - MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX) { @Override protected void onCreated(Connection connection) @@ -177,6 +174,7 @@ protected void removed(Connection connection) poolRemoveCounter.incrementAndGet(); } }; + poolRef.set(connectionPool.getBean(Pool.class)); connectionPool.setMaxDuration(maxDuration); return connectionPool; }); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index 660108129a3d..629cc66d174b 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -40,8 +40,8 @@ import org.eclipse.jetty.util.thread.Locker; /** - *

A pool of objects, with optional support for - * multiplexing, max usage count and several optimized strategies plus + *

A pool of objects, with optional support for multiplexing, + * max usage count and several optimized strategies plus * an optional {@link ThreadLocal} cache of the last release entry.

*

When the method {@link #close()} is called, all {@link Closeable}s * object pooled by the pool are also closed.

@@ -68,8 +68,10 @@ public class Pool implements AutoCloseable, Dumpable private final ThreadLocal cache; private final AtomicInteger nextIndex; private volatile boolean closed; - private volatile int maxMultiplex = 1; - private volatile int maxUsageCount = -1; + @Deprecated + private volatile int maxUsage = -1; + @Deprecated + private volatile int maxMultiplex = -1; /** * The type of the strategy to use for the pool. @@ -182,25 +184,49 @@ public int getMaxEntries() /** * @return the default maximum multiplex count of entries + * @deprecated Multiplex functionalities will be removed */ @ManagedAttribute("The default maximum multiplex count of entries") + @Deprecated public int getMaxMultiplex() { - return maxMultiplex; + return maxMultiplex == -1 ? 1 : maxMultiplex; + } + + /** + *

Retrieves the max multiplex count for the given pooled object.

+ * + * @param pooled the pooled object + * @return the max multiplex count for the given pooled object + * @deprecated Multiplex functionalities will be removed + */ + @Deprecated + protected int getMaxMultiplex(T pooled) + { + return getMaxMultiplex(); } /** *

Sets the default maximum multiplex count for the Pool's entries.

- *

This value is used to initialize {@link Entry#maxMultiplex} - * when a new {@link Entry} is created.

* * @param maxMultiplex the default maximum multiplex count of entries + * @deprecated Multiplex functionalities will be removed */ + @Deprecated public final void setMaxMultiplex(int maxMultiplex) { if (maxMultiplex < 1) throw new IllegalArgumentException("Max multiplex must be >= 1"); - this.maxMultiplex = maxMultiplex; + try (Locker.Lock l = locker.lock()) + { + if (closed) + return; + + if (entries.stream().anyMatch(MonoEntry.class::isInstance)) + throw new IllegalStateException("Pool entries do not support multiplexing"); + + this.maxMultiplex = maxMultiplex; + } } /** @@ -208,11 +234,26 @@ public final void setMaxMultiplex(int maxMultiplex) * can be acquired.

* * @return the default maximum usage count of entries + * @deprecated MaxUsage functionalities will be removed */ @ManagedAttribute("The default maximum usage count of entries") + @Deprecated public int getMaxUsageCount() { - return maxUsageCount; + return maxUsage; + } + + /** + *

Retrieves the max usage count for the given pooled object.

+ * + * @param pooled the pooled object + * @return the max usage count for the given pooled object + * @deprecated MaxUsage functionalities will be removed + */ + @Deprecated + protected int getMaxUsageCount(T pooled) + { + return getMaxUsageCount(); } /** @@ -221,12 +262,13 @@ public int getMaxUsageCount() * than this new value are removed from the Pool and closed.

* * @param maxUsageCount the default maximum usage count of entries + * @deprecated MaxUsage functionalities will be removed */ + @Deprecated public final void setMaxUsageCount(int maxUsageCount) { if (maxUsageCount == 0) throw new IllegalArgumentException("Max usage count must be != 0"); - this.maxUsageCount = maxUsageCount; // Iterate the entries, remove overused ones and collect a list of the closeable removed ones. List copy; @@ -235,6 +277,11 @@ public final void setMaxUsageCount(int maxUsageCount) if (closed) return; + if (entries.stream().anyMatch(MonoEntry.class::isInstance)) + throw new IllegalStateException("Pool entries do not support max usage"); + + this.maxUsage = maxUsageCount; + copy = entries.stream() .filter(entry -> entry.isIdleAndOverUsed() && remove(entry) && entry.pooled instanceof Closeable) .map(entry -> (Closeable)entry.pooled) @@ -273,7 +320,7 @@ public Entry reserve(int allotment) if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment) return null; - Entry entry = new Entry(); + Entry entry = newEntry(); entries.add(entry); return entry; } @@ -300,12 +347,19 @@ public Entry reserve() if (entries.size() >= maxEntries) return null; - Entry entry = new Entry(); + Entry entry = newEntry(); entries.add(entry); return entry; } } + protected Entry newEntry() + { + if (maxMultiplex >= 0 || maxUsage >= 0) + return new MultiEntry(); + return new MonoEntry(); + } + /** *

Acquires the entry from the pool at the specified index.

*

This method bypasses the thread-local cache mechanism.

@@ -359,7 +413,7 @@ public Entry acquire() int index = startIndex(size); - for (int tries = size; tries-- > 0; ) + for (int tries = size; tries-- > 0;) { try { @@ -445,7 +499,6 @@ public Entry acquire(Function.Entry, T> creator) * @return true if the entry was released and could be acquired again, * false if the entry should be removed by calling {@link #remove(Pool.Entry)} * and the object contained by the entry should be disposed. - * @throws NullPointerException if value is null */ public boolean release(Entry entry) { @@ -536,90 +589,62 @@ public void dump(Appendable out, String indent) throws IOException @Override public String toString() { - return String.format("%s@%x[size=%d closed=%s]", + return String.format("%s@%x[inUse=%d,size=%d,capacity=%d,closed=%b]", getClass().getSimpleName(), hashCode(), - entries.size(), - closed); + getInUseCount(), + size(), + getMaxEntries(), + isClosed()); } /** *

A Pool entry that holds metadata and a pooled object.

*/ - public class Entry + public abstract class Entry { - // hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending - // lo: multiplexing counter - private final AtomicBiInteger state; - // The pooled item. This is not volatile as it is set once and then never changed. + // The pooled object. This is not volatile as it is set once and then never changed. // Other threads accessing must check the state field above first, so a good before/after // relationship exists to make a memory barrier. private T pooled; - private volatile int maxMultiplex; - - Entry() - { - this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); - this.maxMultiplex = Pool.this.maxMultiplex; - } - - /** - * @return the maximum multiplex count for this entry - */ - public int getMaxMultiplex() - { - return maxMultiplex; - } - - public void setMaxMultiplex(int maxMultiplex) - { - if (maxMultiplex < 1) - throw new IllegalArgumentException("Max multiplex must be >= 1"); - this.maxMultiplex = maxMultiplex; - } - - // for testing only - void setUsageCount(int usageCount) - { - this.state.getAndSetHi(usageCount); - } /** - *

Enables a {@link #reserve() reserved} Entry.

+ *

Enables this, previously {@link #reserve() reserved}, Entry.

*

An entry returned from the {@link #reserve()} method must be enabled with this method, * once and only once, before it is usable by the pool.

*

The entry may be enabled and not acquired, in which case it is immediately available to be * acquired, potentially by another thread; or it can be enabled and acquired atomically so that * no other thread can acquire it, although the acquire may still fail if the pool has been closed.

* - * @param pooled The pooled item for the entry - * @param acquire If true the entry is atomically enabled and acquired. - * @return whether the entry was enabled - * @throws IllegalStateException if the entry was already enabled + * @param pooled the pooled object for this Entry + * @param acquire whether this Entry should be atomically enabled and acquired + * @return whether this Entry was enabled + * @throws IllegalStateException if this Entry was already enabled */ public boolean enable(T pooled, boolean acquire) { Objects.requireNonNull(pooled); - if (state.getHi() != Integer.MIN_VALUE) + if (!isReserved()) { - if (state.getHi() == -1) + if (isClosed()) return false; // Pool has been closed throw new IllegalStateException("Entry already enabled: " + this); } this.pooled = pooled; - int usage = acquire ? 1 : 0; - if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage)) - { - this.pooled = null; - if (state.getHi() == -1) - return false; // Pool has been closed - throw new IllegalStateException("Entry already enabled: " + this); - } - return true; + if (tryEnable(acquire)) + return true; + + this.pooled = null; + if (isClosed()) + return false; // Pool has been closed + throw new IllegalStateException("Entry already enabled: " + this); } + /** + * @return the pooled object + */ public T getPooled() { return pooled; @@ -629,7 +654,7 @@ public T getPooled() *

Releases this Entry.

*

This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.

* - * @return whether the entry was released + * @return whether this Entry was released */ public boolean release() { @@ -637,53 +662,268 @@ public boolean release() } /** - *

Removes the entry.

+ *

Removes this Entry from the Pool.

*

This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.

* - * @return whether the entry was removed + * @return whether this Entry was removed */ public boolean remove() { return Pool.this.remove(this); } + /** + *

Tries to enable, and possible also acquire, this Entry.

+ * + * @param acquire whether to also acquire this Entry + * @return whether this Entry was enabled + */ + abstract boolean tryEnable(boolean acquire); + + /** + *

Tries to acquire this Entry.

+ * + * @return whether this Entry was acquired + */ + abstract boolean tryAcquire(); + + /** + *

Tries to release this Entry.

+ * + * @return true if this Entry was released, + * false if {@link #tryRemove()} should be called. + */ + abstract boolean tryRelease(); + + /** + *

Tries to remove the entry by marking it as closed.

+ * + * @return whether the entry can be removed from the containing pool + */ + abstract boolean tryRemove(); + + /** + * @return whether this Entry is closed + */ + public abstract boolean isClosed(); + + /** + * @return whether this Entry is reserved + */ + public abstract boolean isReserved(); + + /** + * @return whether this Entry is idle + */ + public abstract boolean isIdle(); + + /** + * @return whether this entry is in use. + */ + public abstract boolean isInUse(); + + /** + * @return whether this entry has been used beyond {@link #getMaxUsageCount()} + * @deprecated MaxUsage functionalities will be removed + */ + @Deprecated + public boolean isOverUsed() + { + return false; + } + + boolean isIdleAndOverUsed() + { + return false; + } + + // Only for testing. + int getUsageCount() + { + return 0; + } + + // Only for testing. + void setUsageCount(int usageCount) + { + } + } + + /** + *

A Pool entry that holds metadata and a pooled object, + * that can only be acquired concurrently at most once, and + * can be acquired/released multiple times.

+ */ + private class MonoEntry extends Entry + { + // MIN_VALUE => pending; -1 => closed; 0 => idle; 1 => active; + private final AtomicInteger state = new AtomicInteger(Integer.MIN_VALUE); + + @Override + protected boolean tryEnable(boolean acquire) + { + return state.compareAndSet(Integer.MIN_VALUE, acquire ? 1 : 0); + } + + @Override + boolean tryAcquire() + { + while (true) + { + int s = state.get(); + if (s != 0) + return false; + if (state.compareAndSet(s, 1)) + return true; + } + } + + @Override + boolean tryRelease() + { + while (true) + { + int s = state.get(); + if (s < 0) + return false; + if (s == 0) + throw new IllegalStateException("Cannot release an already released entry"); + if (state.compareAndSet(s, 0)) + return true; + } + } + + @Override + boolean tryRemove() + { + state.set(-1); + return true; + } + + @Override + public boolean isClosed() + { + return state.get() < 0; + } + + @Override + public boolean isReserved() + { + return state.get() == Integer.MIN_VALUE; + } + + @Override + public boolean isIdle() + { + return state.get() == 0; + } + + @Override + public boolean isInUse() + { + return state.get() == 1; + } + + @Override + public String toString() + { + String s; + switch (state.get()) + { + case Integer.MIN_VALUE: + s = "PENDING"; + break; + case -1: + s = "CLOSED"; + break; + case 0: + s = "IDLE"; + break; + default: + s = "ACTIVE"; + } + return String.format("%s@%x{%s,pooled=%s}", + getClass().getSimpleName(), + hashCode(), + s, + getPooled()); + } + } + + /** + *

A Pool entry that holds metadata and a pooled object, + * that can be acquired concurrently multiple times, and + * can be acquired/released multiple times.

+ */ + class MultiEntry extends Entry + { + // hi: MIN_VALUE => pending; -1 => closed; 0+ => usage counter; + // lo: 0 => idle; positive => multiplex counter + private final AtomicBiInteger state; + + MultiEntry() + { + this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); + } + + @Override + void setUsageCount(int usageCount) + { + this.state.getAndSetHi(usageCount); + } + + @Override + protected boolean tryEnable(boolean acquire) + { + int usage = acquire ? 1 : 0; + return state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage); + } + /** *

Tries to acquire the entry if possible by incrementing both the usage * count and the multiplex count.

* - * @return true if the usage count is <= maxUsageCount and - * the multiplex count is maxMultiplex and the entry is not closed, - * false otherwise. + * @return true if the usage count is less than {@link #getMaxUsageCount()} and + * the multiplex count is less than {@link #getMaxMultiplex(Object)} and + * the entry is not closed, false otherwise. */ + @Override boolean tryAcquire() { while (true) { long encoded = state.get(); int usageCount = AtomicBiInteger.getHi(encoded); + int multiplexCount = AtomicBiInteger.getLo(encoded); boolean closed = usageCount < 0; - int multiplexingCount = AtomicBiInteger.getLo(encoded); - int currentMaxUsageCount = maxUsageCount; - if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount)) + if (closed) + return false; + T pooled = getPooled(); + int maxUsageCount = getMaxUsageCount(pooled); + int maxMultiplexed = getMaxMultiplex(pooled); + if (maxMultiplexed > 0 && multiplexCount >= maxMultiplexed) + return false; + if (maxUsageCount > 0 && usageCount >= maxUsageCount) return false; // Prevent overflowing the usage counter by capping it at Integer.MAX_VALUE. int newUsageCount = usageCount == Integer.MAX_VALUE ? Integer.MAX_VALUE : usageCount + 1; - if (state.compareAndSet(encoded, newUsageCount, multiplexingCount + 1)) + if (state.compareAndSet(encoded, newUsageCount, multiplexCount + 1)) return true; } } /** - *

Tries to release the entry if possible by decrementing the multiplexing + *

Tries to release the entry if possible by decrementing the multiplex * count unless the entity is closed.

* * @return true if the entry was released, * false if {@link #tryRemove()} should be called. */ + @Override boolean tryRelease() { - int newMultiplexingCount; + int newMultiplexCount; int usageCount; while (true) { @@ -693,25 +933,26 @@ boolean tryRelease() if (closed) return false; - newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1; - if (newMultiplexingCount < 0) + newMultiplexCount = AtomicBiInteger.getLo(encoded) - 1; + if (newMultiplexCount < 0) throw new IllegalStateException("Cannot release an already released entry"); - if (state.compareAndSet(encoded, usageCount, newMultiplexingCount)) + if (state.compareAndSet(encoded, usageCount, newMultiplexCount)) break; } - int currentMaxUsageCount = maxUsageCount; + int currentMaxUsageCount = maxUsage; boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; - return !(overUsed && newMultiplexingCount == 0); + return !(overUsed && newMultiplexCount == 0); } /** - *

Try to remove the entry by marking it as closed and decrementing the multiplexing counter.

- *

The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.

+ *

Tries to remove the entry by marking it as closed and decrementing the multiplex counter.

+ *

The multiplex counter will never go below zero and if it reaches zero, the entry is considered removed.

* * @return true if the entry can be removed from the containing pool, false otherwise. */ + @Override boolean tryRemove() { while (true) @@ -727,45 +968,52 @@ boolean tryRemove() } } + @Override public boolean isClosed() { return state.getHi() < 0; } + @Override public boolean isReserved() { return state.getHi() == Integer.MIN_VALUE; } + @Override public boolean isIdle() { long encoded = state.get(); return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) == 0; } + @Override public boolean isInUse() { long encoded = state.get(); return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) > 0; } + @Override public boolean isOverUsed() { - int currentMaxUsageCount = maxUsageCount; + int maxUsageCount = getMaxUsageCount(); int usageCount = state.getHi(); - return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount; + return maxUsageCount > 0 && usageCount >= maxUsageCount; } + @Override boolean isIdleAndOverUsed() { - int currentMaxUsageCount = maxUsageCount; + int maxUsageCount = getMaxUsageCount(); long encoded = state.get(); int usageCount = AtomicBiInteger.getHi(encoded); int multiplexCount = AtomicBiInteger.getLo(encoded); - return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount && multiplexCount == 0; + return maxUsageCount > 0 && usageCount >= maxUsageCount && multiplexCount == 0; } - public int getUsageCount() + @Override + int getUsageCount() { return Math.max(state.getHi(), 0); } @@ -777,16 +1025,17 @@ public String toString() int usageCount = AtomicBiInteger.getHi(encoded); int multiplexCount = AtomicBiInteger.getLo(encoded); - String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE"; + String state = usageCount < 0 + ? (usageCount == Integer.MIN_VALUE ? "PENDING" : "CLOSED") + : (multiplexCount == 0 ? "IDLE" : "ACTIVE"); - return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}", + return String.format("%s@%x{%s,usage=%d,multiplex=%d,pooled=%s}", getClass().getSimpleName(), hashCode(), state, Math.max(usageCount, 0), Math.max(multiplexCount, 0), - getMaxMultiplex(), - pooled); + getPooled()); } } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java index f8b2ffdb2302..8b319a96bc2c 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -86,7 +86,7 @@ public static Stream strategy() public void testAcquireRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable(new CloseableHolder("aaa"), false); + pool.reserve().enable(new CloseableHolder("aaa"), false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); @@ -130,7 +130,7 @@ public void testAcquireRelease(Factory factory) public void testRemoveBeforeRelease(Factory factory) { Pool pool = factory.getPool(1); - pool.reserve(-1).enable(new CloseableHolder("aaa"), false); + pool.reserve().enable(new CloseableHolder("aaa"), false); Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); @@ -222,69 +222,6 @@ public void testReserve(Factory factory) assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); } - @ParameterizedTest - @MethodSource(value = "strategy") - public void testDeprecatedReserve(Factory factory) - { - Pool pool = factory.getPool(2); - - // Reserve an entry - Pool.Entry e1 = pool.reserve(-1); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - - // max reservations - assertNull(pool.reserve(1)); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(0)); - assertThat(pool.getInUseCount(), is(0)); - - // enable the entry - e1.enable(new CloseableHolder("aaa"), false); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // Reserve another entry - Pool.Entry e2 = pool.reserve(-1); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // remove the reservation - e2.remove(); - assertThat(pool.size(), is(1)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // Reserve another entry - Pool.Entry e3 = pool.reserve(-1); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(1)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(0)); - - // enable and acquire the entry - e3.enable(new CloseableHolder("bbb"), true); - assertThat(pool.size(), is(2)); - assertThat(pool.getReservedCount(), is(0)); - assertThat(pool.getIdleCount(), is(1)); - assertThat(pool.getInUseCount(), is(1)); - - // can't reenable - assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false)); - - // Can't enable acquired entry - Pool.Entry e = pool.acquire(); - assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); - } - @ParameterizedTest @MethodSource(value = "strategy") public void testReserveNegativeMaxPending(Factory factory) @@ -356,22 +293,6 @@ public void testValuesContainsAcquiredEntries(Factory factory) assertThat(pool.values().isEmpty(), is(false)); } - @ParameterizedTest - @MethodSource(value = "strategy") - public void testAcquireAt(Factory factory) - { - Pool pool = factory.getPool(2); - - pool.reserve(-1).enable(new CloseableHolder("aaa"), false); - pool.reserve(-1).enable(new CloseableHolder("bbb"), false); - - assertThat(pool.acquireAt(2), nullValue()); - assertThat(pool.acquireAt(0), notNullValue()); - assertThat(pool.acquireAt(0), nullValue()); - assertThat(pool.acquireAt(1), notNullValue()); - assertThat(pool.acquireAt(1), nullValue()); - } - @ParameterizedTest @MethodSource(value = "strategy") public void testMaxUsageCount(Factory factory) @@ -608,6 +529,7 @@ public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) { Pool pool = factory.getPool(1); + pool.setMaxMultiplex(1); Pool.Entry entry = pool.reserve(); entry.enable(new CloseableHolder("aaa"), false); entry.setUsageCount(Integer.MAX_VALUE); @@ -627,6 +549,7 @@ public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) public void testDynamicMaxUsageCountChangeSweep(Factory factory) { Pool pool = factory.getPool(2); + pool.setMaxUsageCount(100); Pool.Entry entry1 = pool.reserve(); entry1.enable(new CloseableHolder("aaa"), false); Pool.Entry entry2 = pool.reserve();