Skip to content

Commit

Permalink
Fixes #5217 - Review RoundRobinConnectionPool
Browse files Browse the repository at this point in the history
Introduced IndexedConnectionPool and RandomConnectionPool.
Clarified semantic of RoundRobinConnectionPool.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Sep 1, 2020
1 parent 001def4 commit f1a821f
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 44 deletions.
Expand Up @@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
private boolean maximizeConnections;

/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
Expand Down Expand Up @@ -151,11 +152,28 @@ public boolean isEmpty()
}

@Override
@ManagedAttribute("Whether this pool is closed")
public boolean isClosed()
{
return pool.isClosed();
}

@ManagedAttribute("Whether the pool tries to maximize the number of connections used")
public boolean isMaximizeConnections()
{
return maximizeConnections;
}

/**
* <p>Sets whether the number of connections should be maximized.</p>
*
* @param maximizeConnections whether the number of connections should be maximized
*/
public void setMaximizeConnections(boolean maximizeConnections)
{
this.maximizeConnections = maximizeConnections;
}

@Override
public Connection acquire()
{
Expand All @@ -164,7 +182,8 @@ public Connection acquire()

/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
Expand All @@ -178,7 +197,7 @@ protected Connection acquire(boolean create)
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && create)
if (connection == null && (create || isMaximizeConnections()))
{
tryCreate(destination.getQueuedRequestCount());
connection = activate();
Expand Down Expand Up @@ -357,8 +376,8 @@ protected void removed(Connection connection)
}

/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable queue working as a view of the idle connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Queue<Connection> getIdleConnections()
Expand All @@ -371,8 +390,8 @@ public Queue<Connection> getIdleConnections()
}

/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable collection working as a view of the active connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Collection<Connection> getActiveConnections()
Expand Down
@@ -0,0 +1,79 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

/**
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
* <p>The algorithm that decides the index value is decided by subclasses.</p>
* <p>To acquire a connection, this class obtains the index value and attempts
* to activate the pool entry at that index.
* If this activation fails, another attempt to activate an alternative pool
* entry is performed, to avoid stalling connection acquisition if there is
* an available entry at a different index.</p>
*/
@ManagedObject
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class);

private final Pool<Connection> pool;

public IndexedConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

/**
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
* used to attempt to acquire the connection at that index in the pool.</p>
*
* @param maxConnections the upper bound of the index (exclusive)
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
*/
protected abstract int getIndex(int maxConnections);

@Override
protected Connection activate()
{
int index = getIndex(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activating at index={} entry={}", index, entry);
if (entry == null)
{
entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activating alternative entry={}", entry);
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}
@@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//

package org.eclipse.jetty.client;

import java.util.concurrent.ThreadLocalRandom;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
* <p>An indexed {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester, maxMultiplex);
}

@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
}
}
Expand Up @@ -18,22 +18,34 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;

/**
* <p>A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.</p>
* <p>The round-robin behavior is almost impossible to achieve for several reasons:</p>
* <ul>
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
* <p>Do not expect this class to provide connections in a perfect recurring sequence such as
* {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.</p>
*
* @see RandomConnectionPool
*/
@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
public class RoundRobinConnectionPool extends IndexedConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);

private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;
private final AtomicInteger offset = new AtomicInteger();

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
Expand All @@ -43,36 +55,16 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}

@Override
protected Connection acquire(boolean create)
{
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
setMaximizeConnections(true);
}

@Override
protected Connection activate()
protected int getIndex(int maxConnections)
{
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
return Math.abs(offset.getAndIncrement() % maxConnections);
}
}
Expand Up @@ -71,7 +71,8 @@ public static Stream<ConnectionPoolFactory> poolsNoRoundRobin()
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), true, destination, 1))
);
}

Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -194,9 +195,9 @@ public void testMultiplexWithMaxUsage(Transport transport) throws Exception
multiplex = 2;
int maxMultiplex = multiplex;

int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;
int maxUsage = 3;
int maxConnections = 4;
int count = 2 * maxConnections * maxMultiplex * maxUsage;

List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
Expand Down Expand Up @@ -229,9 +230,14 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());

// Maps {remote_port -> number_of_times_port_was_used}.
Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
// RoundRobinConnectionPool may open more connections than expected.
// For example with maxUsage=2, requests could be sent to these ports:
// [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7]
// Opening p5 and p6 was delayed, so the opening of p7 was triggered
// to replace p4 while p5 and p6 were busy sending their requests.
assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size()));
}
}

0 comments on commit f1a821f

Please sign in to comment.