Skip to content

Commit

Permalink
Issue #6603 - HTTP/2 max local stream count exceeded
Browse files Browse the repository at this point in the history
Updates after review.
Updated the maxMultiplex mechanism to always work on Pool.Entry, rather than on Pool.

Updated Pool javadocs.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 27, 2021
1 parent 216616a commit 9caf6fa
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 103 deletions.
Expand Up @@ -142,18 +142,11 @@ protected void setMaxMultiplex(int maxMultiplex)

protected void setMaxMultiplex(Connection connection, int maxMultiplex)
{
if (connection == null)
{
setMaxMultiplex(maxMultiplex);
}
else
if (connection instanceof Attachable)
{
if (connection instanceof Attachable)
{
Object attachment = ((Attachable)connection).getAttachment();
if (attachment instanceof EntryHolder)
((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
}
Object attachment = ((Attachable)connection).getAttachment();
if (attachment instanceof EntryHolder)
((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
}
}

Expand Down Expand Up @@ -545,6 +538,8 @@ public void succeeded(Connection connection)
onCreated(connection);
pending.decrementAndGet();
reserved.enable(connection, false);
if (connection instanceof Multiplexable)
setMaxMultiplex(connection, ((ConnectionPool.Multiplexable)connection).getMaxMultiplex());
idle(connection, false);
complete(null);
proceed();
Expand Down
Expand Up @@ -104,27 +104,18 @@ interface Factory
}

/**
* Marks a connection pool as supporting multiplexed connections.
* Marks a connection as supporting multiplexed requests.
*/
interface Multiplexable
{
/**
* @return the default max number of requests multiplexable on a connection
* @return the max number of requests that can be multiplexed on a connection
*/
int getMaxMultiplex();

/**
* @param maxMultiplex the default max number of requests multiplexable on a connection
* @param maxMultiplex the max number of requests that can be multiplexed on a connection
*/
void setMaxMultiplex(int maxMultiplex);

/**
* @param connection the multiplexed connection
* @param maxMultiplex the max number of requests multiplexable on the given connection
*/
default void setMaxMultiplex(Connection connection, int maxMultiplex)
{
setMaxMultiplex(maxMultiplex);
}
}
}
Expand Up @@ -25,7 +25,7 @@
import org.eclipse.jetty.util.annotation.ManagedObject;

@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class MultiplexConnectionPool extends AbstractConnectionPool
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
Expand Down Expand Up @@ -56,12 +56,6 @@ public void setMaxMultiplex(int maxMultiplex)
super.setMaxMultiplex(maxMultiplex);
}

@Override
public void setMaxMultiplex(Connection connection, int maxMultiplex)
{
super.setMaxMultiplex(connection, maxMultiplex);
}

@Override
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
Expand Down
Expand Up @@ -30,20 +30,22 @@ protected MultiplexHttpDestination(HttpClient client, Origin origin)
public int getMaxRequestsPerConnection()
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
return ((ConnectionPool.Multiplexable)connectionPool).getMaxMultiplex();
if (connectionPool instanceof AbstractConnectionPool)
return ((AbstractConnectionPool)connectionPool).getMaxMultiplex();
return 1;
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
setMaxRequestsPerConnection(null, maxRequestsPerConnection);
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
}

public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
if (connectionPool instanceof AbstractConnectionPool)
((AbstractConnectionPool)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
}
}
Expand Up @@ -738,7 +738,7 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded");
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded: " + localCount);
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
promise.failed(failure);
Expand Down Expand Up @@ -789,8 +789,9 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
IllegalStateException failure = new IllegalStateException("Max remote stream count " + maxCount + " exceeded: " + remoteCount + "+" + remoteClosing);
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this);
LOG.debug("Could not create remote stream #{} for {}", streamId, this, failure);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand Down
Expand Up @@ -60,7 +60,7 @@ public HttpClientTransportOverHTTP2(HTTP2Client client)
setConnectionPoolFactory(destination ->
{
HttpClient httpClient = getHttpClient();
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, httpClient.getMaxRequestsQueuedPerDestination());
return new MultiplexConnectionPool(destination, httpClient.getMaxConnectionsPerDestination(), destination, 1);
});
}

Expand Down Expand Up @@ -213,15 +213,24 @@ public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
if (maxConcurrentStreams != null)
destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams);
if (!connection.isMarked())
onServerPreface(session);
boolean[] initialized = new boolean[1];
Connection connection = this.connection.get(initialized);
if (initialized[0])
{
if (maxConcurrentStreams != null && connection != null)
destination().setMaxRequestsPerConnection(connection, maxConcurrentStreams);
}
else
{
onServerPreface(session, maxConcurrentStreams);
}
}

private void onServerPreface(Session session)
private void onServerPreface(Session session, Integer maxConcurrentStreams)
{
HttpConnectionOverHTTP2 connection = newHttpConnection(destination(), session);
if (maxConcurrentStreams != null)
connection.setMaxMultiplex(maxConcurrentStreams);
if (this.connection.compareAndSet(null, connection, false, true))
connectionPromise().succeeded(connection);
}
Expand Down
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpConnection;
import org.eclipse.jetty.client.HttpDestination;
Expand All @@ -42,7 +43,7 @@
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.Multiplexable
{
private static final Logger LOG = Log.getLogger(HttpConnection.class);

Expand All @@ -52,6 +53,7 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S
private final AtomicInteger sweeps = new AtomicInteger();
private final Session session;
private boolean recycleHttpChannels = true;
private int maxMultiplex = 1;

public HttpConnectionOverHTTP2(HttpDestination destination, Session session)
{
Expand All @@ -74,6 +76,16 @@ public void setRecycleHttpChannels(boolean recycleHttpChannels)
this.recycleHttpChannels = recycleHttpChannels;
}

public int getMaxMultiplex()
{
return maxMultiplex;
}

public void setMaxMultiplex(int maxMultiplex)
{
this.maxMultiplex = maxMultiplex;
}

@Override
protected Iterator<HttpChannel> getHttpChannels()
{
Expand Down

0 comments on commit 9caf6fa

Please sign in to comment.