Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6603 - HTTP/2 max local stream count exceeded #6639

Merged
merged 13 commits into from Aug 30, 2021
Expand Up @@ -70,14 +70,20 @@ protected AbstractConnectionPool(Destination destination, int maxConnections, Ca

protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(strategy, maxConnections, cache), requester);
}

protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
this.pool = pool;
pool.setMaxMultiplex(1); // Force the use of multiplexing.
addBean(pool);
}

Expand Down
Expand Up @@ -29,7 +29,7 @@
public interface ConnectionPool extends Closeable
{
/**
* Optionally pre-create up to <code>connectionCount</code>
* Optionally pre-create up to {@code connectionCount}
* connections so they are immediately ready for use.
* @param connectionCount the number of connections to pre-start.
*/
Expand Down Expand Up @@ -104,7 +104,7 @@ interface Factory
}

/**
* Marks a connection pool as supporting multiplexed connections.
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
Expand All @@ -115,7 +115,11 @@ interface Multiplexable

/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @deprecated do not use, as the maxMultiplex value is pulled, rather than pushed
*/
void setMaxMultiplex(int maxMultiplex);
@Deprecated
default void setMaxMultiplex(int maxMultiplex)
{
}
}
}
Expand Up @@ -34,9 +34,10 @@ public DuplexConnectionPool(HttpDestination destination, int maxConnections, Cal

public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
super(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester);
}

@Deprecated
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
Expand All @@ -34,9 +34,26 @@ public MultiplexConnectionPool(HttpDestination destination, int maxConnections,

public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
this(destination, Pool.StrategyType.FIRST, maxConnections, cache, requester, maxMultiplex);
}

public MultiplexConnectionPool(HttpDestination destination, Pool.StrategyType strategy, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, new Pool<Connection>(strategy, maxConnections, cache)
{
@Override
protected int getMaxMultiplex(Connection connection)
{
int multiplex = (connection instanceof Multiplexable)
? ((Multiplexable)connection).getMaxMultiplex()
: super.getMaxMultiplex(connection);
return multiplex > 0 ? multiplex : 1;
}
}, requester);
setMaxMultiplex(maxMultiplex);
}

@Deprecated
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
Expand Down
Expand Up @@ -28,15 +28,15 @@ protected MultiplexHttpDestination(HttpClient client, Origin origin)
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
if (connectionPool instanceof AbstractConnectionPool)
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}
}
Expand Up @@ -31,6 +31,6 @@ public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.RANDOM, maxConnections, false, requester, maxMultiplex);
}
}
Expand Up @@ -56,7 +56,7 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections,

public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
super(destination, Pool.StrategyType.ROUND_ROBIN, maxConnections, false, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
Expand Down
Expand Up @@ -738,7 +738,10 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
promise.failed(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -751,7 +754,7 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -786,6 +789,9 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand All @@ -799,7 +805,7 @@ protected IStream createRemoteStream(int streamId)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -945,7 +951,7 @@ public void onFrame(Frame frame)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

Expand Down
Expand Up @@ -60,7 +60,7 @@ public HttpClientTransportOverHTTP2(HTTP2Client client)
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the 1 is there until overridden by the settings frame? Eitherway a comment would be good explaining why 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

});
}

Expand Down Expand Up @@ -211,9 +211,6 @@ private Promise<Connection> connectionPromise()
@Override
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
if (!connection.isMarked())
onServerPreface(session);
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
Expand All @@ -36,13 +37,14 @@
import org.eclipse.jetty.client.SendFailure;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Session;
import org.eclipse.jetty.http2.api.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

Expand Down Expand Up @@ -74,6 +76,12 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}

@Override
public int getMaxMultiplex()
{
return ((HTTP2Session)session).getMaxLocalStreams();
}

@Override
protected Iterator<HttpChannel> getHttpChannels()
{
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -40,6 +41,7 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
Expand Down Expand Up @@ -76,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MaxConcurrentStreamsTest extends AbstractTest
{
Expand Down Expand Up @@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();

// Prime the 2 connections.
primeConnection();

String host = "localhost";
int port = connector.getLocalPort();

AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool();
assertEquals(2, pool.getConnectionCount());

// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());

// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}

assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down