diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index cb22c753113e..98551c0ad542 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -142,18 +142,11 @@ protected void setMaxMultiplex(int maxMultiplex) protected void setMaxMultiplex(Connection connection, int maxMultiplex) { - if (connection == null) - { - setMaxMultiplex(maxMultiplex); - } - else + if (connection instanceof Attachable) { - if (connection instanceof Attachable) - { - Object attachment = ((Attachable)connection).getAttachment(); - if (attachment instanceof EntryHolder) - ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex); - } + Object attachment = ((Attachable)connection).getAttachment(); + if (attachment instanceof EntryHolder) + ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex); } } @@ -545,6 +538,8 @@ public void succeeded(Connection connection) onCreated(connection); pending.decrementAndGet(); reserved.enable(connection, false); + if (connection instanceof Multiplexable) + setMaxMultiplex(connection, ((ConnectionPool.Multiplexable)connection).getMaxMultiplex()); idle(connection, false); complete(null); proceed(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 27ee72fb82d0..d73056a82a8f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -104,27 +104,18 @@ interface Factory } /** - * Marks a connection pool as supporting multiplexed connections. + * Marks a connection as supporting multiplexed requests. */ interface Multiplexable { /** - * @return the default max number of requests multiplexable on a connection + * @return the max number of requests that can be multiplexed on a connection */ int getMaxMultiplex(); /** - * @param maxMultiplex the default max number of requests multiplexable on a connection + * @param maxMultiplex the max number of requests that can be multiplexed on a connection */ void setMaxMultiplex(int maxMultiplex); - - /** - * @param connection the multiplexed connection - * @param maxMultiplex the max number of requests multiplexable on the given connection - */ - default void setMaxMultiplex(Connection connection, int maxMultiplex) - { - setMaxMultiplex(maxMultiplex); - } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index f52ca50b54f9..f95999b7baa2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -25,7 +25,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject; @ManagedObject -public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable +public class MultiplexConnectionPool extends AbstractConnectionPool { public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { @@ -56,12 +56,6 @@ public void setMaxMultiplex(int maxMultiplex) super.setMaxMultiplex(maxMultiplex); } - @Override - public void setMaxMultiplex(Connection connection, int maxMultiplex) - { - super.setMaxMultiplex(connection, maxMultiplex); - } - @Override @ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed") public int getMaxUsageCount() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index 925e66c79d8b..9ba58785d18e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -30,20 +30,22 @@ protected MultiplexHttpDestination(HttpClient client, Origin origin) public int getMaxRequestsPerConnection() { ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof ConnectionPool.Multiplexable) - return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex(); + if (connectionPool instanceof AbstractConnectionPool) + return ((AbstractConnectionPool)connectionPool).getMaxMultiplex(); return 1; } public void setMaxRequestsPerConnection(int maxRequestsPerConnection) { - setMaxRequestsPerConnection(null, maxRequestsPerConnection); + ConnectionPool connectionPool = getConnectionPool(); + if (connectionPool instanceof AbstractConnectionPool) + ((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection); } public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection) { ConnectionPool connectionPool = getConnectionPool(); - if (connectionPool instanceof ConnectionPool.Multiplexable) - ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection); + if (connectionPool instanceof AbstractConnectionPool) + ((AbstractConnectionPool)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 78e60b357dd3..f94351526b01 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -738,7 +738,7 @@ protected IStream createLocalStream(int streamId, Promise promise) int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) { - IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded"); + IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount); if (LOG.isDebugEnabled()) LOG.debug("Could not create local stream #{} for {}", streamId, this, failure); promise.failed(failure); @@ -789,8 +789,9 @@ protected IStream createRemoteStream(int streamId) int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { + IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing); if (LOG.isDebugEnabled()) - LOG.debug("Could not create remote stream #{} for {}", streamId, this); + LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure); reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId))); return null; } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index 77356249a0eb..ad5ce4349eed 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -60,7 +60,7 @@ public HttpClientTransportOverHTTP2(HTTP2Client client) setConnectionPoolFactory(destination -> { HttpClient httpClient = getHttpClient(); - return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination()); + return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1); }); } @@ -213,15 +213,24 @@ public void onSettings(Session session, SettingsFrame frame) { Map settings = frame.getSettings(); Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS); - if (maxConcurrentStreams != null) - destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams); - if (!connection.isMarked()) - onServerPreface(session); + boolean[] initialized = new boolean[1]; + Connection connection = this.connection.get(initialized); + if (initialized[0]) + { + if (maxConcurrentStreams != null && connection != null) + destination().setMaxRequestsPerConnection(connection, maxConcurrentStreams); + } + else + { + onServerPreface(session, maxConcurrentStreams); + } } - private void onServerPreface(Session session) + private void onServerPreface(Session session, Integer maxConcurrentStreams) { HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session); + if (maxConcurrentStreams != null) + connection.setMaxMultiplex(maxConcurrentStreams); if (this.connection.compareAndSet(null, connection, false, true)) connectionPromise().succeeded(connection); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 54c34582a688..c3bd1b660d35 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpConnection; import org.eclipse.jetty.client.HttpDestination; @@ -42,7 +43,7 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Sweeper; -public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable +public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable { private static final Logger LOG = Log.getLogger(HttpConnection.class); @@ -52,6 +53,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private final AtomicInteger sweeps = new AtomicInteger(); private final Session session; private boolean recycleHttpChannels = true; + private int maxMultiplex = 1; public HttpConnectionOverHTTP2(HttpDestination destination, Session session) { @@ -74,6 +76,16 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels) this.recycleHttpChannels = recycleHttpChannels; } + public int getMaxMultiplex() + { + return maxMultiplex; + } + + public void setMaxMultiplex(int maxMultiplex) + { + this.maxMultiplex = maxMultiplex; + } + @Override protected Iterator getHttpChannels() { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index e7138b297abd..907ed44bdca3 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -31,6 +31,8 @@ import java.util.function.Function; import java.util.stream.Collectors; +import org.eclipse.jetty.util.annotation.ManagedAttribute; +import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; @@ -38,24 +40,22 @@ import org.eclipse.jetty.util.thread.Locker; /** - * A fast pool of objects, with optional support for + *

A pool of objects, with optional support for * multiplexing, max usage count and several optimized strategies plus - * an optional {@link ThreadLocal} cache of the last release entry. - *

- * When the method {@link #close()} is called, all {@link Closeable}s in the pool - * are also closed. - *

- * @param + * an optional {@link ThreadLocal} cache of the last release entry.

+ *

When the method {@link #close()} is called, all {@link Closeable}s + * object pooled by the pool are also closed.

+ * + * @param the type of the pooled objects */ +@ManagedObject public class Pool implements AutoCloseable, Dumpable { private static final Logger LOGGER = Log.getLogger(Pool.class); private final List entries = new CopyOnWriteArrayList<>(); - private final int maxEntries; private final StrategyType strategyType; - /* * The cache is used to avoid hammering on the first index of the entry list. * Caches can become poisoned (i.e.: containing entries that are in use) when @@ -104,7 +104,7 @@ public enum StrategyType * random strategy but with more predictable behaviour. * No entries are favoured and contention is reduced. */ - ROUND_ROBIN, + ROUND_ROBIN } /** @@ -122,6 +122,7 @@ public Pool(StrategyType strategyType, int maxEntries) /** * Construct a Pool with the specified thread-local cache size and * an optional {@link ThreadLocal} cache. + * * @param strategyType The strategy to used for looking up entries. * @param maxEntries the maximum amount of entries that the pool will accept. * @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry. @@ -134,65 +135,92 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache) this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; } + /** + * @return the number of reserved entries + */ + @ManagedAttribute("The number of reserved entries") public int getReservedCount() { return (int)entries.stream().filter(Entry::isReserved).count(); } + /** + * @return the number of idle entries + */ + @ManagedAttribute("The number of idle entries") public int getIdleCount() { return (int)entries.stream().filter(Entry::isIdle).count(); } + /** + * @return the number of in-use entries + */ + @ManagedAttribute("The number of in-use entries") public int getInUseCount() { return (int)entries.stream().filter(Entry::isInUse).count(); } + /** + * @return the number of closed entries + */ + @ManagedAttribute("The number of closed entries") public int getClosedCount() { return (int)entries.stream().filter(Entry::isClosed).count(); } + /** + * @return the maximum number of entries + */ + @ManagedAttribute("The maximum number of entries") public int getMaxEntries() { return maxEntries; } + /** + * @return the default maximum multiplex count of entries + */ + @ManagedAttribute("The default maximum multiplex count of entries") public int getMaxMultiplex() { return maxMultiplex; } + /** + *

Sets the default maximum multiplex count for the Pool's entries.

+ *

This value is used to initialize {@link Entry#maxMultiplex} + * when a new {@link Entry} is created.

+ * + * @param maxMultiplex the default maximum multiplex count of entries + */ public final void setMaxMultiplex(int maxMultiplex) { if (maxMultiplex < 1) throw new IllegalArgumentException("Max multiplex must be >= 1"); this.maxMultiplex = maxMultiplex; - - try (Locker.Lock l = locker.lock()) - { - if (closed) - return; - - entries.forEach(entry -> entry.setMaxMultiplex(maxMultiplex)); - } } /** - * Get the maximum number of times the entries of the pool - * can be acquired. - * @return the max usage count. + *

Returns the maximum number of times the entries of the pool + * can be acquired.

+ * + * @return the default maximum usage count of entries */ + @ManagedAttribute("The default maximum usage count of entries") public int getMaxUsageCount() { return maxUsageCount; } /** - * Change the max usage count of the pool's entries. All existing - * idle entries over this new max usage are removed and closed. - * @param maxUsageCount the max usage count. + *

Sets the maximum usage count for the Pool's entries.

+ *

All existing idle entries that have a usage count larger + * than this new value are removed from the Pool and closed.

+ * + * @param maxUsageCount the default maximum usage count of entries */ public final void setMaxUsageCount(int maxUsageCount) { @@ -218,10 +246,10 @@ public final void setMaxUsageCount(int maxUsageCount) } /** - * Create a new disabled slot into the pool. - * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + *

Creates a new disabled slot into the pool.

+ *

The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} * method called or be removed via {@link Pool.Entry#remove()} or - * {@link Pool#remove(Pool.Entry)}. + * {@link Pool#remove(Pool.Entry)}.

* * @param allotment the desired allotment, where each entry handles an allotment of maxMultiplex, * or a negative number to always trigger the reservation of a new entry. @@ -252,10 +280,10 @@ public Entry reserve(int allotment) } /** - * Create a new disabled slot into the pool. - * The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} + *

Creates a new disabled slot into the pool.

+ *

The returned entry must ultimately have the {@link Entry#enable(Object, boolean)} * method called or be removed via {@link Pool.Entry#remove()} or - * {@link Pool#remove(Pool.Entry)}. + * {@link Pool#remove(Pool.Entry)}.

* * @return a disabled entry that is contained in the pool, * or null if the pool is closed or if the pool already contains @@ -279,10 +307,12 @@ public Entry reserve() } /** - * Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism. - * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool. + *

Acquires the entry from the pool at the specified index.

+ *

This method bypasses the thread-local cache mechanism.

+ * * @param idx the index of the entry to acquire. * @return the specified entry or null if there is none at the specified index or if it is not available. + * @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool. */ @Deprecated public Entry acquireAt(int idx) @@ -304,8 +334,11 @@ public Entry acquireAt(int idx) } /** - * Acquire an entry from the pool. - * Only enabled entries will be returned from this method and their enable method must not be called. + *

Acquires an entry from the pool.

+ *

Only enabled entries will be returned from this method + * and their {@link Entry#enable(Object, boolean)} + * method must not be called.

+ * * @return an entry from the pool or null if none is available. */ public Entry acquire() @@ -326,7 +359,7 @@ public Entry acquire() int index = startIndex(size); - for (int tries = size; tries-- > 0;) + for (int tries = size; tries-- > 0; ) { try { @@ -367,8 +400,8 @@ private int startIndex(int size) } /** - * Utility method to acquire an entry from the pool, - * reserving and creating a new entry if necessary. + *

Acquires an entry from the pool, + * reserving and creating a new entry if necessary.

* * @param creator a function to create the pooled value for a reserved entry. * @return an entry from the pool or null if none is available. @@ -404,9 +437,9 @@ public Entry acquire(Function.Entry, T> creator) } /** - * This method will return an acquired object to the pool. Objects - * that are acquired from the pool but never released will result - * in a memory leak. + *

Releases an {@link #acquire() acquired} entry to the pool.

+ *

Entries that are acquired from the pool but never released + * will result in a memory leak.

* * @param entry the value to return to the pool * @return true if the entry was released and could be acquired again, @@ -426,7 +459,7 @@ public boolean release(Entry entry) } /** - * Remove a value from the pool. + *

Removes an entry from the pool.

* * @param entry the value to remove * @return true if the entry was removed, false otherwise @@ -510,6 +543,9 @@ public String toString() closed); } + /** + *

A Pool entry that holds metadata and a pooled object.

+ */ public class Entry { // hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending @@ -527,6 +563,9 @@ public class Entry this.maxMultiplex = Pool.this.maxMultiplex; } + /** + * @return the maximum multiplex count for this entry + */ public int getMaxMultiplex() { return maxMultiplex; @@ -545,15 +584,17 @@ void setUsageCount(int usageCount) this.state.getAndSetHi(usageCount); } - /** Enable a reserved entry {@link Entry}. - * An entry returned from the {@link #reserve()} method must be enabled with this method, - * once and only once, before it is usable by the pool. - * The entry may be enabled and not acquired, in which case it is immediately available to be + /** + *

Enables a {@link #reserve() reserved} Entry.

+ *

An entry returned from the {@link #reserve()} method must be enabled with this method, + * once and only once, before it is usable by the pool.

+ *

The entry may be enabled and not acquired, in which case it is immediately available to be * acquired, potentially by another thread; or it can be enabled and acquired atomically so that - * no other thread can acquire it, although the acquire may still fail if the pool has been closed. + * no other thread can acquire it, although the acquire may still fail if the pool has been closed.

+ * * @param pooled The pooled item for the entry * @param acquire If true the entry is atomically enabled and acquired. - * @return true If the entry was enabled. + * @return whether the entry was enabled * @throws IllegalStateException if the entry was already enabled */ public boolean enable(T pooled, boolean acquire) @@ -585,9 +626,10 @@ public T getPooled() } /** - * Release the entry. - * This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry. - * @return true if released. + *

Releases this Entry.

+ *

This is equivalent to calling {@link Pool#release(Entry)} passing this entry.

+ * + * @return whether the entry was released */ public boolean release() { @@ -595,9 +637,10 @@ public boolean release() } /** - * Remove the entry. - * This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry. - * @return true if remove. + *

Removes the entry.

+ *

This is equivalent to calling {@link Pool#remove(Entry)} passing this entry.

+ * + * @return whether the entry was removed */ public boolean remove() { @@ -605,8 +648,9 @@ public boolean remove() } /** - * Try to acquire the entry if possible by incrementing both the usage - * count and the multiplex count. + *

Tries to acquire the entry if possible by incrementing both the usage + * count and the multiplex count.

+ * * @return true if the usage count is <= maxUsageCount and * the multiplex count is maxMultiplex and the entry is not closed, * false otherwise. @@ -631,8 +675,9 @@ boolean tryAcquire() } /** - * Try to release the entry if possible by decrementing the multiplexing - * count unless the entity is closed. + *

Tries to release the entry if possible by decrementing the multiplexing + * count unless the entity is closed.

+ * * @return true if the entry was released, * false if {@link #tryRemove()} should be called. */ @@ -662,8 +707,9 @@ boolean tryRelease() } /** - * Try to remove the entry by marking it as closed and decrementing the multiplexing counter. - * The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed. + *

Try to remove the entry by marking it as closed and decrementing the multiplexing counter.

+ *

The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.

+ * * @return true if the entry can be removed from the containing pool, false otherwise. */ boolean tryRemove()