Skip to content

Commit

Permalink
Enhanced fix for #5855 (#5858)
Browse files Browse the repository at this point in the history
Issue #5855 - HttpClient may not send queued requests 

Moved field pending from Pool to AbstractConnectionPool.
As a consequence, AbstractConnectionPool.tryCreate() now performs a demand/supply calculation to decide whether to create a new connection.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
gregw committed Jan 7, 2021
1 parent e3690c6 commit 6f14c1c
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 150 deletions.
Expand Up @@ -20,9 +20,12 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import org.eclipse.jetty.client.api.Connection;
Expand All @@ -46,6 +49,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);

private final AtomicInteger pending = new AtomicInteger();
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
Expand Down Expand Up @@ -82,12 +86,23 @@ protected void doStop() throws Exception
@Override
public CompletableFuture<Void> preCreateConnections(int connectionCount)
{
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
if (LOG.isDebugEnabled())
LOG.debug("Pre-creating connections {}/{}", connectionCount, getMaxConnectionCount());

List<CompletableFuture<?>> futures = new ArrayList<>();
for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateAsync(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
break;
pending.incrementAndGet();
Promise.Completable<Connection> future = new FutureConnection(entry);
futures.add(future);
if (LOG.isDebugEnabled())
LOG.debug("Pre-creating connection {}/{} at {}", futures.size(), getMaxConnectionCount(), entry);
destination.newConnection(future);
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

protected int getMaxMultiplex()
Expand Down Expand Up @@ -148,7 +163,7 @@ public int getPendingCount()
@ManagedAttribute(value = "The number of pending connections", readonly = true)
public int getPendingConnectionCount()
{
return pool.getReservedCount();
return pending.get();
}

@Override
Expand Down Expand Up @@ -200,7 +215,7 @@ public Connection acquire()
*
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
* @see #tryCreate(boolean)
*/
protected Connection acquire(boolean create)
{
Expand All @@ -209,90 +224,63 @@ protected Connection acquire(boolean create)
Connection connection = activate();
if (connection == null)
{
if (create || isMaximizeConnections())
{
// Try to forcibly create a connection if none is available.
tryCreate(-1);
}
else
{
// QueuedRequests may be stale and different from pool.pending.
// So tryCreate() may be a no-operation (when queuedRequests < pool.pending);
// or tryCreate() may create more connections than necessary, when
// queuedRequests read below is stale and some request has just been
// dequeued to be processed causing queuedRequests > pool.pending.
int queuedRequests = destination.getQueuedRequestCount();
tryCreate(queuedRequests);
}
tryCreate(create);
connection = activate();
}
return connection;
}

/**
* <p>Schedules the opening of a new connection.</p>
* <p>Whether a new connection is scheduled for opening is determined by the {@code maxPending} parameter:
* if {@code maxPending} is greater than the current number of connections scheduled for opening,
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
* <p>Tries to create a new connection.</p>
* <p>Whether a new connection is created is determined by the {@code create} parameter
* and a count of demand and supply, where the demand is derived from the number of
* queued requests, and the supply is the number of pending connections time the
* {@link #getMaxMultiplex()} factor: is the demand is less than the supply, the
* connection will not be created.</p>
* <p>Since the number of queued requests used to derive the demand may be a stale
* value, it is possible that few more connections than strictly necessary may be
* created, but enough to satisfy the demand.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
* @param create a hint to request to create a connection
*/
protected void tryCreate(int maxPending)
{
tryCreateAsync(maxPending);
}

private CompletableFuture<?> tryCreateAsync(int maxPending)
protected void tryCreate(boolean create)
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());

Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null)
return CompletableFuture.completedFuture(null);
// If we have already pending sufficient multiplexed connections, then do not create another.
int multiplexed = getMaxMultiplex();
while (true)
{
int pending = this.pending.get();
int supply = pending * multiplexed;
int demand = destination.getQueuedRequestCount() + (create ? 1 : 0);

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
boolean tryCreate = isMaximizeConnections() || supply < demand;

Promise.Completable<Connection> future = new Promise.Completable<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded at {}: {}", connectionCount, getMaxConnectionCount(), entry, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}
if (LOG.isDebugEnabled())
LOG.debug("Try creating({}) connection, pending/demand/supply: {}/{}/{}, result={}", create, pending, demand, supply, tryCreate);

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation failed at {}", connectionCount, getMaxConnectionCount(), entry, x);
entry.remove();
completeExceptionally(x);
requester.failed(x);
}
};
if (!tryCreate)
return;

destination.newConnection(future);
if (this.pending.compareAndSet(pending, pending + 1))
break;
}

return future;
// Create the connection.
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
{
pending.decrementAndGet();
return;
}

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);
Promise<Connection> future = new FutureConnection(entry);
destination.newConnection(future);
}

protected void proceed()
Expand Down Expand Up @@ -465,13 +453,58 @@ public boolean sweep()
@Override
public String toString()
{
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d,q=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getActiveConnectionCount(),
getIdleConnectionCount());
getIdleConnectionCount(),
destination.getQueuedRequestCount());
}

private class FutureConnection extends Promise.Completable<Connection>
{
private final Pool<Connection>.Entry reserved;

public FutureConnection(Pool<Connection>.Entry reserved)
{
this.reserved = reserved;
}

@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
// reduce pending on failure and if not multiplexing also reduce demand
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
// reduce pending on failure and if not multiplexing also reduce demand
pending.decrementAndGet();
reserved.remove();
completeExceptionally(x);
requester.failed(x);
}
}
}
Expand Up @@ -19,7 +19,6 @@
package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;

public class ConnectionPoolHelper
{
Expand All @@ -28,8 +27,8 @@ public static Connection acquire(AbstractConnectionPool connectionPool, boolean
return connectionPool.acquire(create);
}

public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
public static void tryCreate(AbstractConnectionPool connectionPool)
{
connectionPool.tryCreate(pending);
connectionPool.tryCreate(true);
}
}
Expand Up @@ -651,7 +651,7 @@ protected int networkFill(ByteBuffer input) throws IOException
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool, -1);
ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created.
while (true)
{
Expand Down Expand Up @@ -747,7 +747,7 @@ protected int networkFill(ByteBuffer input) throws IOException
HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger the creation of a new connection, but don't use it.
ConnectionPoolHelper.tryCreate(connectionPool, -1);
ConnectionPoolHelper.tryCreate(connectionPool);
// Verify that the connection has been created.
while (true)
{
Expand Down
Expand Up @@ -85,7 +85,7 @@ public void testAcquireWithOneExchangeQueued(Scenario scenario) throws Exception
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();

// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);

Connection connection = ConnectionPoolHelper.acquire(connectionPool, false);
if (connection == null)
Expand All @@ -106,7 +106,7 @@ public void testSecondAcquireAfterFirstAcquireWithEmptyQueueReturnsSameConnectio
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();

// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);

Connection connection1 = connectionPool.acquire();
if (connection1 == null)
Expand Down Expand Up @@ -158,7 +158,7 @@ protected void onCreated(Connection connection)
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();

// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);

// Make sure we entered idleCreated().
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
Expand All @@ -169,7 +169,7 @@ protected void onCreated(Connection connection)
assertNull(connection1);

// Trigger creation of a second connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);

// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire();
Expand Down Expand Up @@ -197,7 +197,7 @@ public void testAcquireProcessReleaseAcquireReturnsSameConnection(Scenario scena
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();

// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);

Connection connection1 = connectionPool.acquire();
if (connection1 == null)
Expand Down Expand Up @@ -234,7 +234,7 @@ public void testIdleConnectionIdleTimeout(Scenario scenario) throws Exception
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();

// Trigger creation of one connection.
ConnectionPoolHelper.tryCreate(connectionPool, 1);
ConnectionPoolHelper.tryCreate(connectionPool);

Connection connection1 = connectionPool.acquire();
if (connection1 == null)
Expand Down

0 comments on commit 6f14c1c

Please sign in to comment.