Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #6603 - HTTP/2 max local stream count exceeded #6639

Merged
merged 13 commits into from Aug 30, 2021
Expand Up @@ -140,6 +140,16 @@ protected void setMaxMultiplex(int maxMultiplex)
pool.setMaxMultiplex(maxMultiplex);
}

protected void setMaxMultiplex(Connection connection, int maxMultiplex)
{
if (connection instanceof Attachable)
{
Object attachment = ((Attachable)connection).getAttachment();
if (attachment instanceof EntryHolder)
((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
}
}

protected int getMaxUsageCount()
{
return pool.getMaxUsageCount();
Expand Down Expand Up @@ -528,6 +538,8 @@ public void succeeded(Connection connection)
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
if (connection instanceof Multiplexable)
setMaxMultiplex(connection, ((ConnectionPool.Multiplexable)connection).getMaxMultiplex());
idle(connection, false);
complete(null);
proceed();
Expand Down
Expand Up @@ -104,17 +104,17 @@ interface Factory
}

/**
* Marks a connection pool as supporting multiplexed connections.
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
/**
* @return the max number of requests multiplexable on a single connection
* @return the max number of requests that can be multiplexed on a connection
*/
int getMaxMultiplex();

/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @param maxMultiplex the max number of requests that can be multiplexed on a connection
*/
void setMaxMultiplex(int maxMultiplex);
}
Expand Down
Expand Up @@ -25,7 +25,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;

public abstract class MultiplexHttpDestination extends HttpDestination
{
protected MultiplexHttpDestination(HttpClient client, Origin origin)
Expand All @@ -28,15 +30,22 @@ protected MultiplexHttpDestination(HttpClient client, Origin origin)
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
if (connectionPool instanceof AbstractConnectionPool)
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}

public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
}
}
Expand Up @@ -737,7 +737,10 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
promise.failed(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -750,7 +753,7 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -785,6 +788,9 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand All @@ -798,7 +804,7 @@ protected IStream createRemoteStream(int streamId)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -944,7 +950,7 @@ public void onFrame(Frame frame)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

Expand Down
Expand Up @@ -60,7 +60,7 @@ public HttpClientTransportOverHTTP2(HTTP2Client client)
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the 1 is there until overridden by the settings frame? Eitherway a comment would be good explaining why 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

});
}

Expand Down Expand Up @@ -212,15 +212,25 @@ private Promise<Connection> connectionPromise()
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
if (!connection.isMarked())
onServerPreface(session);
Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
boolean[] initialized = new boolean[1];
Connection connection = this.connection.get(initialized);
if (initialized[0])
{
if (maxConcurrentStreams != null && connection != null)
destination().setMaxRequestsPerConnection(connection, maxConcurrentStreams);
}
else
{
onServerPreface(session, maxConcurrentStreams);
}
}

private void onServerPreface(Session session)
private void onServerPreface(Session session, Integer maxConcurrentStreams)
{
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
if (maxConcurrentStreams != null)
connection.setMaxMultiplex(maxConcurrentStreams);
if (this.connection.compareAndSet(null, connection, false, true))
connectionPromise().succeeded(connection);
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
Expand All @@ -42,7 +43,7 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

Expand All @@ -52,6 +53,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels = true;
private int maxMultiplex = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is duplicated state... which is almost always bad. See below:

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh!!! it id worse!!! This is triple state as it is also stored in Http2Session, but at least there is is kept correctly as the local and remote value.


public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
Expand All @@ -74,6 +76,16 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}

public int getMaxMultiplex()
{
return maxMultiplex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid duplicate state:

Suggested change
return maxMultiplex;
Object attachment = getAttachment();
return (attachment instanceof ConnectionPool.Multiplexable)
? ((ConnectionPool.Multiplexable)attachment).getMaxMultiplex()
: 0;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sbordet I think this could at least remove some state duplication.... but might not be needed if we can delegate all the way down to HTTP2Session.

}

public void setMaxMultiplex(int maxMultiplex)
{
this.maxMultiplex = maxMultiplex;
}

@Override
protected Iterator<HttpChannel> getHttpChannels()
{
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -40,6 +41,7 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
Expand Down Expand Up @@ -76,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MaxConcurrentStreamsTest extends AbstractTest
{
Expand Down Expand Up @@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();

// Prime the 2 connections.
primeConnection();

String host = "localhost";
int port = connector.getLocalPort();

AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool();
assertEquals(2, pool.getConnectionCount());

// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());

// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}

assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down