Skip to content

Commit

Permalink
Fixes #5217 - Review RoundRobinConnectionPool
Browse files Browse the repository at this point in the history
Updates after review.
Fixed broken tests.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 1, 2020
1 parent f1a821f commit d20cea6
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 25 deletions.
Expand Up @@ -42,9 +42,9 @@ public abstract class IndexedConnectionPool extends MultiplexConnectionPool

private final Pool<Connection> pool;

public IndexedConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester, maxMultiplex);
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

Expand Down
Expand Up @@ -30,9 +30,9 @@
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester, maxMultiplex);
super(destination, maxConnections, requester, maxMultiplex);
}

@Override
Expand Down
Expand Up @@ -39,6 +39,11 @@
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.</p>
* <p>Applications using this class should {@link #preCreateConnections(int) pre-create}
* the connections to ensure that they are already opened when the application starts to requests
* them, otherwise the first connection that is opened may be used multiple times before the others
* are opened, resulting in a behavior that is more random-like than more round-robin-like (and
* that confirms that round-robin behavior is almost impossible to achieve).</p>
*
* @see RandomConnectionPool
*/
Expand All @@ -54,7 +59,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
super(destination, maxConnections, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
Expand All @@ -65,6 +70,6 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
@Override
protected int getIndex(int maxConnections)
{
return Math.abs(offset.getAndIncrement() % maxConnections);
return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
}
}
Expand Up @@ -72,7 +72,7 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), true, destination, 1))
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}

Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand Down Expand Up @@ -72,17 +73,21 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
});

int maxConnections = 3;
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination));

// Prime the connections, so that they are all opened
// before we actually test the round robin behavior.
for (int i = 0; i < maxConnections; ++i)
CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
return pool;
});

// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);

record.set(true);
int requests = 2 * maxConnections - 1;
Expand Down Expand Up @@ -119,6 +124,7 @@ public void testMultiplex(Transport transport) throws Exception
int maxConnections = 3;
int count = maxConnections * maxMultiplex;

AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count);
Expand All @@ -130,10 +136,13 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
{
try
{
remotePorts.add(request.getRemotePort());
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
if (record.get())
{
remotePorts.add(request.getRemotePort());
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
}
}
catch (Exception x)
{
Expand All @@ -142,11 +151,23 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
}
});

scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex));
CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
return pool;
});

// Do not prime the connections, to see if the behavior is
// correct even if the connections are not pre-created.
// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);

record.set(true);
CountDownLatch clientLatch = new CountDownLatch(count);
AtomicInteger requests = new AtomicInteger();
for (int i = 0; i < count; ++i)
Expand All @@ -172,7 +193,7 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
barrier.await();

assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertThat(remotePorts.size(), Matchers.equalTo(count));
assertThat(remotePorts.toString(), remotePorts.size(), Matchers.equalTo(count));
for (int i = 0; i < count; ++i)
{
int base = i % maxConnections;
Expand Down

0 comments on commit d20cea6

Please sign in to comment.