Skip to content

Commit

Permalink
Fixes #5855 - HttpClient may not send queued requests.
Browse files Browse the repository at this point in the history
Changed the AbstractConnectionPool.acquire() logic to call tryCreate() even
when create=false.

This is necessary when e.g. a sender thread T2 with create=true steals a
connection whose creation was triggered by another sender thread T1.
In the old code, T2 did not trigger the creation of a connection, possibly
leaving a request queued.
In the new code, T2 would call tryCreate(queuedRequests), possibly triggering
the creation of a connection.

This change re-introduces the fact that when sending e.g. 20 requests
concurrently, 20+ connections may be created.

However, it is better to err on creating more than creating less and leaving
requests queued.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Jan 5, 2021
1 parent 343e7b2 commit e3690c6
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 55 deletions.
Expand Up @@ -190,11 +190,15 @@ public Connection acquire()
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* then attempts to open a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
* otherwise it attempts to open a new connection, if the number of queued requests is
* greater than the number of pending connections;
* if no connection is available even after the attempts to open, return {@code null}.</p>
* <p>The {@code create} parameter is just a hint: the connection may be created even if
* {@code false}, or may not be created even if {@code true}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
*/
Expand All @@ -203,9 +207,23 @@ protected Connection acquire(boolean create)
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && (create || isMaximizeConnections()))
if (connection == null)
{
tryCreate(destination.getQueuedRequestCount());
if (create || isMaximizeConnections())
{
// Try to forcibly create a connection if none is available.
tryCreate(-1);
}
else
{
// QueuedRequests may be stale and different from pool.pending.
// So tryCreate() may be a no-operation (when queuedRequests < pool.pending);
// or tryCreate() may create more connections than necessary, when
// queuedRequests read below is stale and some request has just been
// dequeued to be processed causing queuedRequests > pool.pending.
int queuedRequests = destination.getQueuedRequestCount();
tryCreate(queuedRequests);
}
connection = activate();
}
return connection;
Expand All @@ -226,7 +244,7 @@ protected void tryCreate(int maxPending)
tryCreateAsync(maxPending);
}

private CompletableFuture<Void> tryCreateAsync(int maxPending)
private CompletableFuture<?> tryCreateAsync(int maxPending)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
Expand All @@ -237,39 +255,42 @@ private CompletableFuture<Void> tryCreateAsync(int maxPending)
return CompletableFuture.completedFuture(null);

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount());
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);

CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<Connection>()
Promise.Completable<Connection> future = new Promise.Completable<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection);
if (!(connection instanceof Attachable))
LOG.debug("Connection {}/{} creation succeeded at {}: {}", connectionCount, getMaxConnectionCount(), entry, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
return;
}
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
future.complete(null);
proceed();
}

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x);
LOG.debug("Connection {}/{} creation failed at {}", connectionCount, getMaxConnectionCount(), entry, x);
entry.remove();
future.completeExceptionally(x);
completeExceptionally(x);
requester.failed(x);
}
});
};

destination.newConnection(future);

return future;
}
Expand Down
Expand Up @@ -109,7 +109,7 @@ public HttpDestination(HttpClient client, Origin origin)
protected void doStart() throws Exception
{
this.connectionPool = newConnectionPool(client);
addBean(connectionPool);
addBean(connectionPool, true);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
Expand Down Expand Up @@ -311,17 +311,19 @@ public void send()

private void send(boolean create)
{
if (getHttpExchanges().isEmpty())
return;
process(create);
if (!getHttpExchanges().isEmpty())
process(create);
}

private void process(boolean create)
{
// The loop is necessary in case of a new multiplexed connection,
// when a single thread notified of the connection opening must
// process all queued exchanges.
// In other cases looping is a work-stealing optimization.
// It is also necessary when thread T1 cannot acquire a connection
// (for example, it has been stolen by thread T2 and the pool has
// enough pending reservations). T1 returns without doing anything
// and therefore it is T2 that must send both request R1 and R2.
while (true)
{
Connection connection;
Expand All @@ -331,14 +333,15 @@ private void process(boolean create)
connection = connectionPool.acquire();
if (connection == null)
break;
ProcessResult result = process(connection);
if (result == ProcessResult.FINISH)
boolean proceed = process(connection);
if (proceed)
create = false;
else
break;
create = result == ProcessResult.RESTART;
}
}

private ProcessResult process(Connection connection)
private boolean process(Connection connection)
{
HttpClient client = getHttpClient();
HttpExchange exchange = getHttpExchanges().poll();
Expand All @@ -354,7 +357,7 @@ private ProcessResult process(Connection connection)
LOG.debug("{} is stopping", client);
connection.close();
}
return ProcessResult.FINISH;
return false;
}
else
{
Expand All @@ -372,17 +375,15 @@ private ProcessResult process(Connection connection)
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
return getHttpExchanges().size() > 0
? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART)
: ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}

SendFailure failure = send(connection, exchange);
if (failure == null)
{
// Aggressively send other queued requests
// in case connections are multiplexed.
return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}

if (LOG.isDebugEnabled())
Expand All @@ -392,10 +393,10 @@ private ProcessResult process(Connection connection)
// Resend this exchange, likely on another connection,
// and return false to avoid to re-enter this method.
send(exchange);
return ProcessResult.FINISH;
return false;
}
request.abort(failure.failure);
return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH;
return getQueuedRequestCount() > 0;
}
}

Expand Down Expand Up @@ -474,7 +475,7 @@ else if (removed)
// Process queued requests that may be waiting.
// We may create a connection that is not
// needed, but it will eventually idle timeout.
process(true);
send(true);
}
return removed;
}
Expand Down Expand Up @@ -541,8 +542,8 @@ public String toString()
asString(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
exchanges.size(),
connectionPool);
getQueuedRequestCount(),
getConnectionPool());
}

/**
Expand Down Expand Up @@ -610,9 +611,4 @@ private void schedule(long expiresAt)
}
}
}

private enum ProcessResult
{
RESTART, CONTINUE, FINISH
}
}
Expand Up @@ -23,10 +23,12 @@
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

Expand Down Expand Up @@ -244,9 +246,12 @@ else if (serverClose)
}

@ParameterizedTest
@MethodSource("pools")
@MethodSource("poolsNoRoundRobin")
public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more
// connections than expected, exclude it from this test.

startServer(new EmptyServerHandler());

HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
Expand Down Expand Up @@ -300,11 +305,10 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
}

@ParameterizedTest
@MethodSource("poolsNoRoundRobin")
public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception
@MethodSource("pools")
public void testConcurrentRequestsWithSlowAddressResolver(ConnectionPoolFactory factory) throws Exception
{
// Round robin connection pool does open a few more
// connections than expected, exclude it from this test.
// ConnectionPools may open a few more connections than expected.

startServer(new EmptyServerHandler());

Expand Down Expand Up @@ -351,9 +355,81 @@ public void resolve(String host, int port, Promise<List<InetSocketAddress>> prom
assertTrue(latch.await(count, TimeUnit.SECONDS));
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
}

@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception
{
int count = 50;
testConcurrentRequestsAllBlockedOnServer(factory, count, 2 * count);
}

@ParameterizedTest
@MethodSource("pools")
public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception
{
int count = 50;
testConcurrentRequestsAllBlockedOnServer(factory, count, count);
}

private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(count);

QueuedThreadPool serverThreads = new QueuedThreadPool(2 * count);
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server);
server.addConnector(connector);
server.setHandler(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
barrier.await();
}
catch (Exception x)
{
throw new ServletException(x);
}
}
});
server.start();

QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count);
clientThreads.setName("client");
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
transport.setConnectionPoolFactory(factory.factory);
client = new HttpClient(transport, null);
client.setExecutor(clientThreads);
client.setMaxConnectionsPerDestination(maxConnections);
client.start();

// Send N requests to the server, all waiting on the server.
// This should open N connections, and the test verifies that
// all N are sent (i.e. the client does not keep any queued).
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
int id = i;
clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort())
.path("/" + id)
.send(result ->
{
if (result.isSucceeded())
latch.countDown();
}));
}

assertTrue(latch.await(5, TimeUnit.SECONDS), "server requests " + barrier.getNumberWaiting() + "<" + count + " - client: " + client.dump());
List<Destination> destinations = client.getDestinations();
assertEquals(1, destinations.size());
HttpDestination destination = (HttpDestination)destinations.get(0);
AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool();
assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count));
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
}

@ParameterizedTest
Expand Down
Expand Up @@ -64,10 +64,12 @@ public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
assertNull(connection);
// There are no queued requests, so no connection should be created.
connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
assertNull(connection);
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle.
connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
}
assertNotNull(connection);
}
}

Expand Down

0 comments on commit e3690c6

Please sign in to comment.