diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 3ef383ae255e..9f97ece9e1c4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -51,7 +51,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen { private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class); - private final AtomicBiInteger pending = new AtomicBiInteger(); // hi==reserved; lo==demand + private final AtomicBiInteger reservations = new AtomicBiInteger(); // hi==pending; lo==demand private final HttpDestination destination; private final Callback requester; private final Pool pool; @@ -97,7 +97,7 @@ public CompletableFuture preCreateConnections(int connectionCount) Pool.Entry entry = pool.reserve(); if (entry == null) break; - pending.add(1, 0); + reservations.add(1, 0); Promise.Completable future = new FutureConnection(entry); futures.add(future); @@ -218,7 +218,7 @@ public Connection acquire() * * @param create a hint to attempt to open a new connection if no idle connections are available * @return an idle connection or {@code null} if no idle connections are available - * @see #tryCreate() + * @see #tryCreate(boolean) */ protected Connection acquire(boolean create) { @@ -227,58 +227,73 @@ protected Connection acquire(boolean create) Connection connection = activate(); if (connection == null) { - if (create || isMaximizeConnections()) - tryCreate(); + tryCreate(create || isMaximizeConnections()); connection = activate(); } return connection; } /** - *

Schedules the opening of a new connection.

- *

Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter: - * if {@code maxPending} is greater than the current number of connections scheduled for opening, - * then this method returns without scheduling the opening of a new connection; - * if {@code maxPending} is negative, a new connection is always scheduled for opening.

- * - * or a negative number to always trigger the opening of a new connection + *

Try creating a new connection.

+ *

Whether a new connection is determined by the {@code demanded} parameter + * and a count kept of previous demand and supply:

+ *
    + *
  • The demand is incremented for every call to tryCreate with {@code demanded} true
  • + *
  • If the demand is greater than the pending connections time the {@link #getMaxMultiplex()} factor, + * then the method tries to reserve an entry in the pool so it can schedule a new connection.
  • + *
  • If a new connection is scheduled, then the pending count is incremented
  • + *
  • Once a scheduled new connection completes successfully, pending is decremented and demand is + * reduced by the {@link #getMaxMultiplex()} factor.
  • + *
*/ - protected void tryCreate() + protected void tryCreate(boolean demanded) { int connectionCount = getConnectionCount(); if (LOG.isDebugEnabled()) LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount()); - // If we have already reserved sufficient multiplexed connections, then do not create another + // If we have already pending sufficient multiplexed connections, then do not create another int multiplexed = getMaxMultiplex(); + Pool.Entry entry = null; while (true) { - long encoded = pending.get(); - int reserved = getHi(encoded); + long encoded = reservations.get(); + int pending = getHi(encoded); int demand = getLo(encoded); - // If we have already reserved enough connections, just increment demand and return - if (reserved * multiplexed > demand && (pending.compareAndSet(encoded, reserved, demand + 1))) + if (demanded) + demand++; + int supply = pending * multiplexed; + + // Do we need a new connections? + if (supply >= demand) + { + if (!reservations.compareAndSet(encoded, pending, demand)) + continue; + if (entry != null) + entry.release(); return; + } - // otherwise increase reservations and demand - if (pending.compareAndSet(encoded, reserved + 1, demand + 1)) - break; - } + // Try to reserve an entry to create + if (entry == null) + entry = pool.reserve(); + if (entry == null) + { + if (!reservations.compareAndSet(encoded, pending, demand)) + continue; + return; + } - Pool.Entry entry = pool.reserve(); - if (entry == null) - { - // pool is full, so decrement reservations and return - pending.add(-1, 0); + if (!reservations.compareAndSet(encoded, pending + 1, demand)) + continue; + // Create the connection + if (LOG.isDebugEnabled()) + LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry); + Promise future = new FutureConnection(entry); + destination.newConnection(future); return; } - - if (LOG.isDebugEnabled()) - LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry); - - Promise future = new FutureConnection(entry); - destination.newConnection(future); } protected void proceed() @@ -480,7 +495,7 @@ public void succeeded(Connection connection) ((Attachable)connection).setAttachment(reserved); onCreated(connection); - pending.updateAndGet(encoded -> AtomicBiInteger.encode(getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex()))); + reservations.updateAndGet(encoded -> AtomicBiInteger.encode(getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex()))); reserved.enable(connection, false); idle(connection, false); complete(null); @@ -488,7 +503,8 @@ public void succeeded(Connection connection) } else { - pending.add(-1, 0); + // reduce pending on failure and if not multiplexing also reduce demand + reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0); failed(new IllegalArgumentException("Invalid connection object: " + connection)); } } @@ -498,7 +514,8 @@ public void failed(Throwable x) { if (LOG.isDebugEnabled()) LOG.debug("Connection creation failed {}", reserved, x); - pending.add(-1, 0); + // reduce pending on failure and if not multiplexing also reduce demand + reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0); reserved.remove(); completeExceptionally(x); requester.failed(x); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java index 4f8a0a11e5d0..e5cfd07d80f7 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolHelper.java @@ -29,6 +29,6 @@ public static Connection acquire(AbstractConnectionPool connectionPool, boolean public static void tryCreate(AbstractConnectionPool connectionPool, int pending) { - connectionPool.tryCreate(); + connectionPool.tryCreate(pending > 0); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index c2f0c7c2ae4c..c8bbe45237ec 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -229,8 +229,7 @@ public Entry reserve(int allotment) if (space <= 0) return null; - long pending = entries.stream().filter(Entry::isReserved).count(); - if (allotment >= 0 && (pending * getMaxMultiplex()) >= allotment) + if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment) return null; Entry entry = new Entry();