Skip to content

Commit

Permalink
Enhanced fix for #5855
Browse files Browse the repository at this point in the history
updates from review:
 + always call tryCreate, now passing demanded to indicate if demand should be incremented
 + simplify demand > supply calculation
 + if not multiplexing reduce demand on connection failure.  If multiplexing, failure will not decrease demand, so some additional connections may be created until supply catches up.
  • Loading branch information
gregw committed Jan 6, 2021
1 parent 8258302 commit 482c943
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 39 deletions.
Expand Up @@ -51,7 +51,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicBiInteger pending = new AtomicBiInteger(); // hi==reserved; lo==demand
private final AtomicBiInteger reservations = new AtomicBiInteger(); // hi==pending; lo==demand
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
Expand Down Expand Up @@ -97,7 +97,7 @@ public CompletableFuture<Void> preCreateConnections(int connectionCount)
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
break;
pending.add(1, 0);
reservations.add(1, 0);

Promise.Completable<Connection> future = new FutureConnection(entry);
futures.add(future);
Expand Down Expand Up @@ -218,7 +218,7 @@ public Connection acquire()
*
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate()
* @see #tryCreate(boolean)
*/
protected Connection acquire(boolean create)
{
Expand All @@ -227,58 +227,73 @@ protected Connection acquire(boolean create)
Connection connection = activate();
if (connection == null)
{
if (create || isMaximizeConnections())
tryCreate();
tryCreate(create || isMaximizeConnections());
connection = activate();
}
return connection;
}

/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
*
* or a negative number to always trigger the opening of a new connection
* <p>Try creating a new connection.</p>
* <p>Whether a new connection is determined by the {@code demanded} parameter
* and a count kept of previous demand and supply:</p>
* <ul>
* <li>The demand is incremented for every call to tryCreate with {@code demanded} true</li>
* <li>If the demand is greater than the pending connections time the {@link #getMaxMultiplex()} factor,
* then the method tries to reserve an entry in the pool so it can schedule a new connection.</li>
* <li>If a new connection is scheduled, then the pending count is incremented</li>
* <li>Once a scheduled new connection completes successfully, pending is decremented and demand is
* reduced by the {@link #getMaxMultiplex()} factor.</li>
* </ul>
*/
protected void tryCreate()
protected void tryCreate(boolean demanded)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());

// If we have already reserved sufficient multiplexed connections, then do not create another
// If we have already pending sufficient multiplexed connections, then do not create another
int multiplexed = getMaxMultiplex();
Pool<Connection>.Entry entry = null;
while (true)
{
long encoded = pending.get();
int reserved = getHi(encoded);
long encoded = reservations.get();
int pending = getHi(encoded);
int demand = getLo(encoded);

// If we have already reserved enough connections, just increment demand and return
if (reserved * multiplexed > demand && (pending.compareAndSet(encoded, reserved, demand + 1)))
if (demanded)
demand++;
int supply = pending * multiplexed;

// Do we need a new connections?
if (supply >= demand)
{
if (!reservations.compareAndSet(encoded, pending, demand))
continue;
if (entry != null)
entry.release();
return;
}

// otherwise increase reservations and demand
if (pending.compareAndSet(encoded, reserved + 1, demand + 1))
break;
}
// Try to reserve an entry to create
if (entry == null)
entry = pool.reserve();
if (entry == null)
{
if (!reservations.compareAndSet(encoded, pending, demand))
continue;
return;
}

Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
{
// pool is full, so decrement reservations and return
pending.add(-1, 0);
if (!reservations.compareAndSet(encoded, pending + 1, demand))
continue;
// Create the connection
if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);

Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
}

protected void proceed()
Expand Down Expand Up @@ -480,15 +495,16 @@ public void succeeded(Connection connection)
((Attachable)connection).setAttachment(reserved);
onCreated(connection);

pending.updateAndGet(encoded -> AtomicBiInteger.encode(getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex())));
reservations.updateAndGet(encoded -> AtomicBiInteger.encode(getHi(encoded) - 1, Math.max(0, getLo(encoded) - getMaxMultiplex())));
reserved.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
pending.add(-1, 0);
// reduce pending on failure and if not multiplexing also reduce demand
reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0);
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}
Expand All @@ -498,7 +514,8 @@ public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
pending.add(-1, 0);
// reduce pending on failure and if not multiplexing also reduce demand
reservations.add(-1, getMaxMultiplex() == 1 ? -1 : 0);
reserved.remove();
completeExceptionally(x);
requester.failed(x);
Expand Down
Expand Up @@ -29,6 +29,6 @@ public static Connection acquire(AbstractConnectionPool connectionPool, boolean

public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
{
connectionPool.tryCreate();
connectionPool.tryCreate(pending > 0);
}
}
3 changes: 1 addition & 2 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Expand Up @@ -229,8 +229,7 @@ public Entry reserve(int allotment)
if (space <= 0)
return null;

long pending = entries.stream().filter(Entry::isReserved).count();
if (allotment >= 0 && (pending * getMaxMultiplex()) >= allotment)
if (allotment >= 0 && (getReservedCount() * getMaxMultiplex()) >= allotment)
return null;

Entry entry = new Entry();
Expand Down

0 comments on commit 482c943

Please sign in to comment.