Skip to content

Commit

Permalink
Enhanced fix for #5855
Browse files Browse the repository at this point in the history
Avoid the race on request queue size by enhancing Pool to track demand.
  • Loading branch information
gregw committed Jan 6, 2021
1 parent e3690c6 commit 5083864
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 92 deletions.
Expand Up @@ -20,7 +20,9 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -82,12 +84,63 @@ protected void doStop() throws Exception
@Override
public CompletableFuture<Void> preCreateConnections(int connectionCount)
{
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
for (int i = 0; i < connectionCount; i++)
if (LOG.isDebugEnabled())
LOG.debug("Precreating connections {}/{}", connectionCount, getMaxConnectionCount());

List<CompletableFuture<?>> futures = new ArrayList<>();
loop : for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateAsync(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.reserve();
while (entry == null)
{
if (pool.size() >= pool.getMaxEntries())
break loop;
if (pool.getMaxMultiplex() <= 1)
throw new IllegalStateException();
entry = pool.reserve();
}

final Pool<Connection>.Entry reserved = entry;

Promise.Completable<Connection> future = new Promise.Completable<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
if (connection instanceof Attachable)
{
((Attachable)connection).setAttachment(reserved);
onCreated(connection);
reserved.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
}
}

@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection creation failed {}", reserved, x);
reserved.remove();
completeExceptionally(x);
requester.failed(x);
}
};

futures.add(future);
if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", futures.size() + 1, getMaxConnectionCount(), reserved);
destination.newConnection(future);
}
return CompletableFuture.allOf(futures);
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
}

protected int getMaxMultiplex()
Expand Down Expand Up @@ -200,7 +253,7 @@ public Connection acquire()
*
* @param create a hint to attempt to open a new connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
* @see #tryCreate()
*/
protected Connection acquire(boolean create)
{
Expand All @@ -210,20 +263,7 @@ protected Connection acquire(boolean create)
if (connection == null)
{
if (create || isMaximizeConnections())
{
// Try to forcibly create a connection if none is available.
tryCreate(-1);
}
else
{
// QueuedRequests may be stale and different from pool.pending.
// So tryCreate() may be a no-operation (when queuedRequests < pool.pending);
// or tryCreate() may create more connections than necessary, when
// queuedRequests read below is stale and some request has just been
// dequeued to be processed causing queuedRequests > pool.pending.
int queuedRequests = destination.getQueuedRequestCount();
tryCreate(queuedRequests);
}
tryCreate();
connection = activate();
}
return connection;
Expand All @@ -236,28 +276,22 @@ protected Connection acquire(boolean create)
* then this method returns without scheduling the opening of a new connection;
* if {@code maxPending} is negative, a new connection is always scheduled for opening.</p>
*
* @param maxPending the max desired number of connections scheduled for opening,
* or a negative number to always trigger the opening of a new connection
*/
protected void tryCreate(int maxPending)
{
tryCreateAsync(maxPending);
}

private CompletableFuture<?> tryCreateAsync(int maxPending)
protected void tryCreate()
{
int connectionCount = getConnectionCount();
if (LOG.isDebugEnabled())
LOG.debug("Try creating connection {}/{} with {}/{} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount(), maxPending);
LOG.debug("Try creating connection {}/{} with {} pending", connectionCount, getMaxConnectionCount(), getPendingConnectionCount());

Pool<Connection>.Entry entry = pool.reserve(maxPending);
Pool<Connection>.Entry entry = pool.reserve();
if (entry == null)
return CompletableFuture.completedFuture(null);
return;

if (LOG.isDebugEnabled())
LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry);

Promise.Completable<Connection> future = new Promise.Completable<Connection>()
Promise<Connection> future = new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
Expand All @@ -270,7 +304,6 @@ public void succeeded(Connection connection)
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
complete(null);
proceed();
}
else
Expand All @@ -285,14 +318,11 @@ public void failed(Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation failed at {}", connectionCount, getMaxConnectionCount(), entry, x);
entry.remove();
completeExceptionally(x);
requester.failed(x);
}
};

destination.newConnection(future);

return future;
}

protected void proceed()
Expand Down
Expand Up @@ -19,7 +19,6 @@
package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;

public class ConnectionPoolHelper
{
Expand All @@ -30,6 +29,6 @@ public static Connection acquire(AbstractConnectionPool connectionPool, boolean

public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
{
connectionPool.tryCreate(pending);
connectionPool.tryCreate();
}
}
89 changes: 79 additions & 10 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Expand Up @@ -54,7 +54,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
private final List<Entry> entries = new CopyOnWriteArrayList<>();

private final int maxEntries;
private final AtomicInteger pending = new AtomicInteger();
private final AtomicBiInteger pending = new AtomicBiInteger(); // Lo reserved; Hi demand
private final StrategyType strategyType;

/*
Expand Down Expand Up @@ -137,7 +137,12 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)

public int getReservedCount()
{
return pending.get();
return pending.getLo();
}

public int getDemand()
{
return pending.getHi();
}

public int getIdleCount()
Expand Down Expand Up @@ -216,7 +221,9 @@ public final void setMaxUsageCount(int maxUsageCount)
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
* {@link #getMaxEntries()} entries, or the allotment has already been reserved
* @deprecated Use {@link #reserve()}
*/
@Deprecated
public Entry reserve(int allotment)
{
try (Locker.Lock l = locker.lock())
Expand All @@ -231,9 +238,59 @@ public Entry reserve(int allotment)
// The pending count is an AtomicInteger that is only ever incremented here with
// the lock held. Thus the pending count can be reduced immediately after the
// test below, but never incremented. Thus the allotment limit can be enforced.
if (allotment >= 0 && (pending.get() * getMaxMultiplex()) >= allotment)
if (allotment >= 0 && (pending.getLo() * getMaxMultiplex()) >= allotment)
return null;
pending.addAndGetLo(1);

Entry entry = new Entry();
entries.add(entry);
return entry;
}
}

/**
* Create a new disabled slot into the pool.
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
* {@link Pool#remove(Pool.Entry)}.
* <p>For multiplexed entries, a call to reserve may return null if a previously
* reserved entry has excess capacity, which is determined by each call to
* reserve() incrementing demand. </p>
*
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
* {@link #getMaxEntries()} entries, or the allotment has already been reserved
*/
public Entry reserve()
{
try (Locker.Lock l = locker.lock())
{
if (closed)
return null;
pending.incrementAndGet();

int space = maxEntries - entries.size();
if (space <= 0)
return null;

while (true)
{
long encoded = pending.get();
int reserved = AtomicBiInteger.getLo(encoded);
int demand = AtomicBiInteger.getHi(encoded);

if (reserved * getMaxMultiplex() <= demand)
{
// we need a new connection
if (pending.compareAndSet(encoded, demand + 1, reserved + 1))
break;
}
else
{
// We increment demand on existing reservations
if (pending.compareAndSet(encoded, demand + 1, reserved))
return null;
}
}

Entry entry = new Entry();
entries.add(entry);
Expand Down Expand Up @@ -342,7 +399,7 @@ public Entry acquire(Function<Pool<T>.Entry, T> creator)
if (entry != null)
return entry;

entry = reserve(-1);
entry = reserve();
if (entry == null)
return null;

Expand Down Expand Up @@ -457,12 +514,14 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return String.format("%s@%x[size=%d closed=%s pending=%d]",
long encoded = pending.get();
return String.format("%s@%x[size=%d closed=%s reserved=%d, demand=%d]",
getClass().getSimpleName(),
hashCode(),
entries.size(),
closed,
pending.get());
AtomicBiInteger.getLo(encoded),
AtomicBiInteger.getHi(encoded));
}

public class Entry
Expand All @@ -488,7 +547,7 @@ void setUsageCount(int usageCount)
}

/** Enable a reserved entry {@link Entry}.
* An entry returned from the {@link #reserve(int)} method must be enabled with this method,
* An entry returned from the {@link #reserve()} method must be enabled with this method,
* once and only once, before it is usable by the pool.
* The entry may be enabled and not acquired, in which case it is immediately available to be
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
Expand Down Expand Up @@ -517,7 +576,17 @@ public boolean enable(T pooled, boolean acquire)
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
pending.decrementAndGet();

while (true)
{
long encoded = pending.get();
int reserved = AtomicBiInteger.getLo(encoded);
int demand = AtomicBiInteger.getHi(encoded);
reserved = reserved - 1;
demand = Math.max(0, demand - getMaxMultiplex());
if (pending.compareAndSet(encoded, demand, reserved))
break;
}
return true;
}

Expand Down Expand Up @@ -620,7 +689,7 @@ boolean tryRemove()
if (removed)
{
if (usageCount == Integer.MIN_VALUE)
pending.decrementAndGet();
pending.addAndGetLo(-1);
return newMultiplexCount == 0;
}
}
Expand Down

0 comments on commit 5083864

Please sign in to comment.