Skip to content

Commit

Permalink
Fixes #5855 - HttpClient may not send queued requests. (#5856)
Browse files Browse the repository at this point in the history
Changed the AbstractConnectionPool.acquire() logic to call tryCreate() even
when create=false.

This is necessary when e.g. a sender thread T2 with create=true steals a
connection whose creation was triggered by another sender thread T1.
In the old code, T2 did not trigger the creation of a connection, possibly
leaving a request queued.
In the new code, T2 would call tryCreate(), possibly triggering
the creation of a connection.

This change re-introduces the fact that when sending e.g. 20 requests
concurrently, 20+ connections may be created.

However, it is better to err on creating more than creating less and leaving
requests queued.

Further refactoring moved field pending from Pool to AbstractConnectionPool.
As a consequence, AbstractConnectionPool.tryCreate() now performs a 
demand/supply calculation to decide whether to create a new connection.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
sbordet and gregw committed Jan 7, 2021
1 parent b45c326 commit 403d5ec
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 172 deletions.
Expand Up @@ -20,9 +20,12 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.api.Connection;
Expand All @@ -46,6 +49,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicInteger pending = new AtomicInteger();
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
Expand Down Expand Up @@ -82,12 +86,23 @@ protected void doStop() throws Exception
@Override
public CompletableFuture<Void> preCreateConnections(int connectionCount)
{
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
if (LOG.isDebugEnabled())
LOG.debug("Pre-creating connections {}/{}", connectionCount, getMaxConnectionCount());

List<CompletableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateAsync(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
break;
pending.incrementAndGet();
Promise.Completable<Connection> future = new FutureConnection(entry);
futures.add(future);
if (LOG.isDebugEnabled())
LOG.debug("Pre-creating connection {}/{} at {}", futures.size(), getMaxConnectionCount(), entry);
destination.newConnection(future);
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

protected int getMaxMultiplex()
Expand Down Expand Up @@ -148,7 +163,7 @@ public int getPendingCount()
@ManagedAttribute(value = "The number of pending connections", readonly = true)
public int getPendingConnectionCount()
{
return pool.getReservedCount();
return pending.get();
}

@Override
Expand Down Expand Up @@ -190,88 +205,82 @@ public Connection acquire()
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* then attempts to open a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
* otherwise it attempts to open a new connection, if the number of queued requests is
* greater than the number of pending connections;
* if no connection is available even after the attempts to open, return {@code null}.</p>
* <p>The {@code create} parameter is just a hint: the connection may be created even if
* {@code false}, or may not be created even if {@code true}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
* @see #tryCreate(boolean)
*/
protected Connection acquire(boolean create)
{
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && (create || isMaximizeConnections()))
if (connection == null)
{
tryCreate(destination.getQueuedRequestCount());
tryCreate(create);
connection = activate();
}
return connection;
}

/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
* <p>Tries to create a new connection.</p>
* <p>Whether a new connection is created is determined by the {@code create} parameter
* and a count of demand and supply, where the demand is derived from the number of
* queued requests, and the supply is the number of pending connections time the
* {@link #getMaxMultiplex()} factor: if the demand is less than the supply, the
* connection will not be created.</p>
* <p>Since the number of queued requests used to derive the demand may be a stale
* value, it is possible that few more connections than strictly necessary may be
* created, but enough to satisfy the demand.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
* @param create a hint to request to create a connection
*/
protected void tryCreate(int maxPending)
{
tryCreateAsync(maxPending);
}

private CompletableFuture<Void> tryCreateAsync(int maxPending)
protected void tryCreate(boolean create)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());

Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null)
return CompletableFuture.completedFuture(null);
// If we have already pending sufficient multiplexed connections, then do not create another.
int multiplexed = getMaxMultiplex();
while (true)
{
int pending = this.pending.get();
int supply = pending * multiplexed;
int demand = destination.getQueuedRequestCount() + (create ? 1 : 0);

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());
boolean tryCreate = isMaximizeConnections() || supply < demand;

CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
if (!(connection instanceof Attachable))
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
return;
}
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
future.complete(null);
proceed();
}
if (LOG.isDebugEnabled())
LOG.debug("Try creating({}) connection, pending/demand/supply: {}/{}/{}, result={}", create, pending, demand, supply, tryCreate);

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
entry.remove();
future.completeExceptionally(x);
requester.failed(x);
}
});
if (!tryCreate)
return;

if (this.pending.compareAndSet(pending, pending + 1))
break;
}

return future;
// Create the connection.
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
{
pending.decrementAndGet();
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
}

protected void proceed()
Expand Down Expand Up @@ -444,13 +453,58 @@ public boolean sweep()
@Override
public String toString()
{
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d,q=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getActiveConnectionCount(),
getIdleConnectionCount());
getIdleConnectionCount(),
destination.getQueuedRequestCount());
}

private class FutureConnection extends Promise.Completable<Connection>
{
private final Pool<Connection>.Entry reserved;

public FutureConnection(Pool<Connection>.Entry reserved)
{
this.reserved = reserved;
}

@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
// reduce pending on failure and if not multiplexing also reduce demand
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
// reduce pending on failure and if not multiplexing also reduce demand
pending.decrementAndGet();
reserved.remove();
completeExceptionally(x);
requester.failed(x);
}
}
}
Expand Up @@ -109,7 +109,7 @@ public HttpDestination(HttpClient client, Origin origin)
protected void doStart() throws Exception
{
this.connectionPool = newConnectionPool(client);
addBean(connectionPool);
addBean(connectionPool, true);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
Expand Down Expand Up @@ -311,17 +311,19 @@ public void send()

private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process(create);
if (!getHttpExchanges().isEmpty())
process(create);
}

private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
// It is also necessary when thread T1 cannot acquire a connection
// (for example, it has been stolen by thread T2 and the pool has
// enough pending reservations). T1 returns without doing anything
// and therefore it is T2 that must send both queued requests.
while (true)
{
Connection connection;
Expand All @@ -331,14 +333,15 @@ private void process(boolean create)
connection = connectionPool.acquire();
if (connection == null)
break;
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
boolean proceed = process(connection);
if (proceed)
create = false;
else
break;
create = result == ProcessResult.RESTART;
}
}

private ProcessResult process(Connection connection)
private boolean process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
Expand All @@ -354,7 +357,7 @@ private ProcessResult process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
return ProcessResult.FINISH;
return false;
}
else
{
Expand All @@ -372,17 +375,15 @@ private ProcessResult process(Connection connection)
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}

SendFailure failure = send(connection, exchange);
if (failure == null)
{
// Aggressively send other queued requests
// in case connections are multiplexed.
return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}

if (LOG.isDebugEnabled())
Expand All @@ -392,10 +393,10 @@ private ProcessResult process(Connection connection)
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
return false;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}
}

Expand Down Expand Up @@ -474,7 +475,7 @@ else if (removed)
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
send(true);
}
return removed;
}
Expand Down Expand Up @@ -541,8 +542,8 @@ public String toString()
asString(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
exchanges.size(),
connectionPool);
getQueuedRequestCount(),
getConnectionPool());
}

/**
Expand Down Expand Up @@ -610,9 +611,4 @@ private void schedule(long expiresAt)
}
}
}

private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}

0 comments on commit 403d5ec

Please sign in to comment.