Skip to content

Commit

Permalink
Work In Progress
Browse files Browse the repository at this point in the history
  • Loading branch information
gregw committed Aug 27, 2021
1 parent 5834e5f commit 7b78b90
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 242 deletions.
Expand Up @@ -70,14 +70,20 @@ protected AbstractConnectionPool(Destination destination, int maxConnections, Ca

protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(strategy, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
this.pool = pool;
pool.setMaxMultiplex(1); // Force the use of MultiUseEntry
addBean(pool);
}

Expand Down
Expand Up @@ -102,20 +102,4 @@ interface Factory
*/
ConnectionPool newConnectionPool(HttpDestination destination);
}

/**
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
/**
* @return the max number of requests that can be multiplexed on a connection
*/
int getMaxMultiplex();

/**
* @param maxMultiplex the max number of requests that can be multiplexed on a connection
*/
void setMaxMultiplex(int maxMultiplex);
}
}
Expand Up @@ -34,9 +34,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal

public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

@Deprecated
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);
Expand Down
Expand Up @@ -34,12 +34,21 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,

public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
super(destination, new Pool<Connection>(strategy, maxConnections, cache)
{
@Override
protected int getMaxMultiplex(Connection connection)
{
if (connection instanceof Multiplexable)
return ((Multiplexable)connection).getMaxMultiplex();
return super.getMaxMultiplex(connection);
}
}, requester);
setMaxMultiplex(maxMultiplex);
}

Expand Down Expand Up @@ -68,4 +77,15 @@ public void setMaxUsageCount(int maxUsageCount)
{
super.setMaxUsageCount(maxUsageCount);
}

/**
* Marks a connection as supporting multiplexed requests.
*/
public interface Multiplexable extends Connection
{
/**
* @return the max number of requests that can be multiplexed on a connection
*/
int getMaxMultiplex();
}
}
Expand Up @@ -31,6 +31,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
}
}
Expand Up @@ -56,7 +56,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
Expand Down
Expand Up @@ -28,22 +28,23 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, MultiplexConnectionPool.Multiplexable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

Expand All @@ -53,7 +54,6 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels = true;
private int maxMultiplex = 1;

public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
Expand All @@ -76,14 +76,10 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}

@Override
public int getMaxMultiplex()
{
return maxMultiplex;
}

public void setMaxMultiplex(int maxMultiplex)
{
this.maxMultiplex = maxMultiplex;
return ((HTTP2Session)session).getMaxLocalStreams();
}

@Override
Expand Down
Expand Up @@ -100,10 +100,12 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
{
poolRef.set(getBean(Pool.class));
}

@Override
protected void onCreated(Connection connection)
{
Expand Down Expand Up @@ -161,22 +163,27 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
MultiplexConnectionPool connectionPool =
new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
poolCreateCounter.incrementAndGet();
}
{
poolRef.set(getBean(Pool.class));
}

@Override
protected void onCreated(Connection connection)
{
poolCreateCounter.incrementAndGet();
}

@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};


@Override
protected void removed(Connection connection)
{
poolRemoveCounter.incrementAndGet();
}
};
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
Expand Down

0 comments on commit 7b78b90

Please sign in to comment.