Skip to content

Commit

Permalink
Fixes #6603 - HTTP/2 max local stream count exceeded
Browse files Browse the repository at this point in the history
Made MAX_CONCURRENT_STREAMS setting work on a per-connection basis.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 18, 2021
1 parent ac73b3a commit 6ea6bda
Show file tree
Hide file tree
Showing 8 changed files with 183 additions and 11 deletions.
Expand Up @@ -140,6 +140,23 @@ protected void setMaxMultiplex(int maxMultiplex)
pool.setMaxMultiplex(maxMultiplex);
}

protected void setMaxMultiplex(Connection connection, int maxMultiplex)
{
if (connection == null)
{
setMaxMultiplex(maxMultiplex);
}
else
{
if (connection instanceof Attachable)
{
Object attachment = ((Attachable)connection).getAttachment();
if (attachment instanceof EntryHolder)
((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex);
}
}
}

protected int getMaxUsageCount()
{
return pool.getMaxUsageCount();
Expand Down
Expand Up @@ -109,13 +109,22 @@ interface Factory
interface Multiplexable
{
/**
* @return the max number of requests multiplexable on a single connection
* @return the default max number of requests multiplexable on a connection
*/
int getMaxMultiplex();

/**
* @param maxMultiplex the max number of requests multiplexable on a single connection
* @param maxMultiplex the default max number of requests multiplexable on a connection
*/
void setMaxMultiplex(int maxMultiplex);

/**
* @param connection the multiplexed connection
* @param maxMultiplex the max number of requests multiplexable on the given connection
*/
default void setMaxMultiplex(Connection connection, int maxMultiplex)
{
setMaxMultiplex(maxMultiplex);
}
}
}
Expand Up @@ -56,6 +56,12 @@ public void setMaxMultiplex(int maxMultiplex)
super.setMaxMultiplex(maxMultiplex);
}

@Override
public void setMaxMultiplex(Connection connection, int maxMultiplex)
{
super.setMaxMultiplex(connection, maxMultiplex);
}

@Override
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.eclipse.jetty.client;

import org.eclipse.jetty.client.api.Connection;

public abstract class MultiplexHttpDestination extends HttpDestination
{
protected MultiplexHttpDestination(HttpClient client, Origin origin)
Expand All @@ -34,9 +36,14 @@ public int getMaxRequestsPerConnection()
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
setMaxRequestsPerConnection(null, maxRequestsPerConnection);
}

public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection)
{
ConnectionPool connectionPool = getConnectionPool();
if (connectionPool instanceof ConnectionPool.Multiplexable)
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection);
((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection);
}
}
Expand Up @@ -737,7 +737,10 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
int maxCount = getMaxLocalStreams();
if (maxCount >= 0 && localCount >= maxCount)
{
promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded"));
IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded");
if (LOG.isDebugEnabled())
LOG.debug("Could not create local stream #{} for {}", streamId, this, failure);
promise.failed(failure);
return null;
}
if (localStreamCount.compareAndSet(localCount, localCount + 1))
Expand All @@ -750,7 +753,7 @@ protected IStream createLocalStream(int streamId, Promise<Stream> promise)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {}", stream);
LOG.debug("Created local {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -785,6 +788,8 @@ protected IStream createRemoteStream(int streamId)
int maxCount = getMaxRemoteStreams();
if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not create remote stream #{} for {}", streamId, this);
reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId)));
return null;
}
Expand All @@ -798,7 +803,7 @@ protected IStream createRemoteStream(int streamId)
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (LOG.isDebugEnabled())
LOG.debug("Created remote {}", stream);
LOG.debug("Created remote {} for {}", stream, this);
return stream;
}
else
Expand Down Expand Up @@ -944,7 +949,7 @@ public void onFrame(Frame frame)
private void onStreamCreated(int streamId)
{
if (LOG.isDebugEnabled())
LOG.debug("Created stream #{} for {}", streamId, this);
LOG.debug("Creating stream #{} for {}", streamId, this);
streamsState.onStreamCreated();
}

Expand Down
Expand Up @@ -212,8 +212,9 @@ private Promise<Connection> connectionPromise()
public void onSettings(Session session, SettingsFrame frame)
{
Map<Integer, Integer> settings = frame.getSettings();
if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS))
destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS));
Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS);
if (maxConcurrentStreams != null)
destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams);
if (!connection.isMarked())
onServerPreface(session);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
Expand All @@ -40,6 +41,7 @@
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
Expand Down Expand Up @@ -76,6 +78,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class MaxConcurrentStreamsTest extends AbstractTest
{
Expand Down Expand Up @@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map<String, Object> context)
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
}

@Test
public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception
{
long processing = 125;
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
{
private Session session1;
private Session session2;

@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
MetaData.Request request = (MetaData.Request)frame.getMetaData();
switch (request.getURI().getPath())
{
case "/prime":
{
session1 = stream.getSession();
// Send another request from here to force the opening of the 2nd connection.
client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result ->
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
});
break;
}
case "/prime2":
{
session2 = stream.getSession();
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
case "/update_max_streams":
{
Session session = stream.getSession() == session1 ? session2 : session1;
Map<Integer, Integer> settings = new HashMap<>();
settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2);
session.settings(new SettingsFrame(settings, false), Callback.NOOP);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
default:
{
sleep(processing);
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
break;
}
}
return null;
}
});
http2.setMaxConcurrentStreams(1);
prepareServer(http2);
server.start();
prepareClient();
client.setMaxConnectionsPerDestination(2);
client.start();

// Prime the 2 connections.
primeConnection();

String host = "localhost";
int port = connector.getLocalPort();

AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool();
assertEquals(2, pool.getConnectionCount());

// Send a request on one connection, which sends back a SETTINGS frame on the other connection.
ContentResponse response = client.newRequest(host, port)
.path("/update_max_streams")
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());

// Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued.
int count = 4;
CountDownLatch latch = new CountDownLatch(count);
for (int i = 0; i < count; ++i)
{
client.newRequest(host, port)
.path("/" + i)
.send(result ->
{
if (result.isSucceeded())
{
int status = result.getResponse().getStatus();
if (status == HttpStatus.OK_200)
latch.countDown();
else
fail("unexpected status " + status);
}
else
{
fail(result.getFailure());
}
});
}

assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS));
}

private void primeConnection() throws Exception
{
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
Expand Down
25 changes: 23 additions & 2 deletions jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java
Expand Up @@ -131,7 +131,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache)
this.maxEntries = maxEntries;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
}

public int getReservedCount()
Expand Down Expand Up @@ -169,6 +169,14 @@ public final void setMaxMultiplex(int maxMultiplex)
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;

try (Locker.Lock l = locker.lock())
{
if (closed)
return;

entries.forEach(entry -> entry.setMaxMultiplex(maxMultiplex));
}
}

/**
Expand Down Expand Up @@ -507,15 +515,28 @@ public class Entry
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
// lo: multiplexing counter
private final AtomicBiInteger state;

// The pooled item. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
private T pooled;
private volatile int maxMultiplex;

Entry()
{
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
this.maxMultiplex = Pool.this.maxMultiplex;
}

public int getMaxMultiplex()
{
return maxMultiplex;
}

public void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
}

// for testing only
Expand Down

0 comments on commit 6ea6bda

Please sign in to comment.