Skip to content

Commit

Permalink
Fixes #6603 - HTTP/2 max local stream count exceeded (#6639)
Browse files Browse the repository at this point in the history
Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis.
Updated Pool javadocs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
Co-authored-by: Greg Wilkins <gregw@webtide.com>
(cherry picked from commit 525fcb3)
  • Loading branch information
sbordet committed Aug 30, 2021
1 parent 4be1e63 commit 864e59e
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 210 deletions.
Expand Up @@ -55,14 +55,20 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen

protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(strategy, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
this.pool = pool;
pool.setMaxMultiplex(1); // Force the use of multiplexing.
addBean(pool);
}

Expand Down
Expand Up @@ -24,7 +24,7 @@
public interface ConnectionPool extends Closeable
{
/**
* Optionally pre-create up to <code>connectionCount</code>
* Optionally pre-create up to {@code connectionCount}
* connections so they are immediately ready for use.
* @param connectionCount the number of connections to pre-start.
*/
Expand Down Expand Up @@ -106,7 +106,7 @@ interface Factory
}

/**
* Marks a connection pool as supporting multiplexed connections.
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
Expand All @@ -117,7 +117,11 @@ interface Multiplexable

/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed
*/
void setMaxMultiplex(int maxMultiplex);
@Deprecated
default void setMaxMultiplex(int maxMultiplex)
{
}
}
}
Expand Up @@ -29,9 +29,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal

public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

@Deprecated
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);
Expand Down
Expand Up @@ -20,7 +20,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
Expand All @@ -29,9 +29,26 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,

public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, new Pool<Connection>(strategy, maxConnections, cache)
{
@Override
protected int getMaxMultiplex(Connection connection)
{
int multiplex = (connection instanceof Multiplexable)
? ((Multiplexable)connection).getMaxMultiplex()
: super.getMaxMultiplex(connection);
return multiplex > 0 ? multiplex : 1;
}
}, requester);
setMaxMultiplex(maxMultiplex);
}

@Deprecated
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
Expand Down
Expand Up @@ -35,15 +35,15 @@ public MultiplexHttpDestination(HttpClient client, Origin origin)
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
if (connectionPool instanceof AbstractConnectionPool)
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}
}
Expand Up @@ -26,6 +26,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
}
}
Expand Up @@ -51,7 +51,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
Expand Down
Expand Up @@ -786,7 +786,10 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Cons
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
failFn.accept(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
failFn.accept(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -799,7 +802,7 @@ protected IStream createLocalStream(int streamId, MetaData.Request request, Cons
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -834,6 +837,9 @@ protected IStream createRemoteStream(int streamId, MetaData.Request request)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand All @@ -847,7 +853,7 @@ protected IStream createRemoteStream(int streamId, MetaData.Request request)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -1019,7 +1025,7 @@ void scheduleTimeout(HTTP2Stream stream)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

Expand Down
Expand Up @@ -65,14 +65,6 @@ private Promise<Connection> httpConnectionPromise()
@Override
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
{
HttpDestination destination = destination();
if (destination instanceof HttpDestination.Multiplexed)
((HttpDestination.Multiplexed)destination).setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
}
// The first SETTINGS frame is the server preface reply.
if (!connection.isMarked())
onServerPreface(session);
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
Expand All @@ -46,7 +47,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);

Expand Down Expand Up @@ -78,6 +79,12 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}

@Override
public int getMaxMultiplex()
{
return ((HTTP2Session)session).getMaxLocalStreams();
}

@Override
protected Iterator<HttpChannel> getHttpChannels()
{
Expand Down
Expand Up @@ -16,6 +16,7 @@
import java.io.IOException;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand Down Expand Up @@ -71,6 +72,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MaxConcurrentStreamsTest extends AbstractTest
{
Expand Down Expand Up @@ -538,6 +540,111 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, HttpFields.EMPTY);
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();

// Prime the 2 connections.
primeConnection();

String host = "localhost";
int port = connector.getLocalPort();

assertEquals(1, client.getDestinations().size());
HttpDestination destination = (HttpDestination)client.getDestinations().get(0);
AbstractConnectionPool pool = (AbstractConnectionPool)destination.getConnectionPool();
assertEquals(2, pool.getConnectionCount());

// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());

// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}

assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down
Expand Up @@ -95,9 +95,7 @@ public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
Expand All @@ -111,6 +109,7 @@ protected void removed(Connection connection)
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(connectionPool.getBean(Pool.class));
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
Expand Down Expand Up @@ -156,9 +155,7 @@ public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnection
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
{
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
poolRef.set(pool);
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, MAX_MULTIPLEX)
{
@Override
protected void onCreated(Connection connection)
Expand All @@ -172,6 +169,7 @@ protected void removed(Connection connection)
poolRemoveCounter.incrementAndGet();
}
};
poolRef.set(connectionPool.getBean(Pool.class));
connectionPool.setMaxDuration(maxDuration);
return connectionPool;
});
Expand Down

0 comments on commit 864e59e

Please sign in to comment.