diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 093cc37a8ff5..cb22c753113e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -140,6 +140,23 @@ protected void setMaxMultiplex(int maxMultiplex) pool.setMaxMultiplex(maxMultiplex); } + protected void setMaxMultiplex(Connection connection, int maxMultiplex) + { + if (connection == null) + { + setMaxMultiplex(maxMultiplex); + } + else + { + if (connection instanceof Attachable) + { + Object attachment = ((Attachable)connection).getAttachment(); + if (attachment instanceof EntryHolder) + ((EntryHolder)attachment).entry.setMaxMultiplex(maxMultiplex); + } + } + } + protected int getMaxUsageCount() { return pool.getMaxUsageCount(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 4ae507896777..27ee72fb82d0 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -109,13 +109,22 @@ interface Factory interface Multiplexable { /** - * @return the max number of requests multiplexable on a single connection + * @return the default max number of requests multiplexable on a connection */ int getMaxMultiplex(); /** - * @param maxMultiplex the max number of requests multiplexable on a single connection + * @param maxMultiplex the default max number of requests multiplexable on a connection */ void setMaxMultiplex(int maxMultiplex); + + /** + * @param connection the multiplexed connection + * @param maxMultiplex the max number of requests multiplexable on the given connection + */ + default void setMaxMultiplex(Connection connection, int maxMultiplex) + { + setMaxMultiplex(maxMultiplex); + } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java index 9f4bfd7ee7fd..f52ca50b54f9 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexConnectionPool.java @@ -56,6 +56,12 @@ public void setMaxMultiplex(int maxMultiplex) super.setMaxMultiplex(maxMultiplex); } + @Override + public void setMaxMultiplex(Connection connection, int maxMultiplex) + { + super.setMaxMultiplex(connection, maxMultiplex); + } + @Override @ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed") public int getMaxUsageCount() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index bfa1dad3b514..925e66c79d8b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -18,6 +18,8 @@ package org.eclipse.jetty.client; +import org.eclipse.jetty.client.api.Connection; + public abstract class MultiplexHttpDestination extends HttpDestination { protected MultiplexHttpDestination(HttpClient client, Origin origin) @@ -34,9 +36,14 @@ public int getMaxRequestsPerConnection() } public void setMaxRequestsPerConnection(int maxRequestsPerConnection) + { + setMaxRequestsPerConnection(null, maxRequestsPerConnection); + } + + public void setMaxRequestsPerConnection(Connection connection, int maxRequestsPerConnection) { ConnectionPool connectionPool = getConnectionPool(); if (connectionPool instanceof ConnectionPool.Multiplexable) - ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(maxRequestsPerConnection); + ((ConnectionPool.Multiplexable)connectionPool).setMaxMultiplex(connection, maxRequestsPerConnection); } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 793982a3c38d..b6680b9a2ce8 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -737,7 +737,10 @@ protected IStream createLocalStream(int streamId, Promise promise) int maxCount = getMaxLocalStreams(); if (maxCount >= 0 && localCount >= maxCount) { - promise.failed(new IllegalStateException("Max local stream count " + maxCount + " exceeded")); + IllegalStateException failure = new IllegalStateException("Max local stream count " + maxCount + " exceeded"); + if (LOG.isDebugEnabled()) + LOG.debug("Could not create local stream #{} for {}", streamId, this, failure); + promise.failed(failure); return null; } if (localStreamCount.compareAndSet(localCount, localCount + 1)) @@ -750,7 +753,7 @@ protected IStream createLocalStream(int streamId, Promise promise) stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) - LOG.debug("Created local {}", stream); + LOG.debug("Created local {} for {}", stream, this); return stream; } else @@ -785,6 +788,8 @@ protected IStream createRemoteStream(int streamId) int maxCount = getMaxRemoteStreams(); if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount) { + if (LOG.isDebugEnabled()) + LOG.debug("Could not create remote stream #{} for {}", streamId, this); reset(null, new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.from(() -> onStreamDestroyed(streamId))); return null; } @@ -798,7 +803,7 @@ protected IStream createRemoteStream(int streamId) stream.setIdleTimeout(getStreamIdleTimeout()); flowControl.onStreamCreated(stream); if (LOG.isDebugEnabled()) - LOG.debug("Created remote {}", stream); + LOG.debug("Created remote {} for {}", stream, this); return stream; } else @@ -944,7 +949,7 @@ public void onFrame(Frame frame) private void onStreamCreated(int streamId) { if (LOG.isDebugEnabled()) - LOG.debug("Created stream #{} for {}", streamId, this); + LOG.debug("Creating stream #{} for {}", streamId, this); streamsState.onStreamCreated(); } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java index 182ca869f098..77356249a0eb 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpClientTransportOverHTTP2.java @@ -212,8 +212,9 @@ private Promise connectionPromise() public void onSettings(Session session, SettingsFrame frame) { Map settings = frame.getSettings(); - if (settings.containsKey(SettingsFrame.MAX_CONCURRENT_STREAMS)) - destination().setMaxRequestsPerConnection(settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS)); + Integer maxConcurrentStreams = settings.get(SettingsFrame.MAX_CONCURRENT_STREAMS); + if (maxConcurrentStreams != null) + destination().setMaxRequestsPerConnection(connection.getReference(), maxConcurrentStreams); if (!connection.isMarked()) onServerPreface(session); } diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java index 636fd75108f7..b75c293cf0cf 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Queue; @@ -40,6 +41,7 @@ import org.eclipse.jetty.client.HttpDestination; import org.eclipse.jetty.client.HttpResponseException; import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.Origin; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -76,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public class MaxConcurrentStreamsTest extends AbstractTest { @@ -545,6 +548,109 @@ public Connection newConnection(EndPoint endPoint, Map context) assertTrue(response3Latch.await(5, TimeUnit.SECONDS)); } + @Test + public void testDifferentMaxConcurrentStreamsForDifferentConnections() throws Exception + { + long processing = 125; + RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter() + { + private Session session1; + private Session session2; + + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + switch (request.getURI().getPath()) + { + case "/prime": + { + session1 = stream.getSession(); + // Send another request from here to force the opening of the 2nd connection. + client.newRequest("localhost", connector.getLocalPort()).path("/prime2").send(result -> + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, result.getResponse().getStatus(), new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + }); + break; + } + case "/prime2": + { + session2 = stream.getSession(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + case "/update_max_streams": + { + Session session = stream.getSession() == session1 ? session2 : session1; + Map settings = new HashMap<>(); + settings.put(SettingsFrame.MAX_CONCURRENT_STREAMS, 2); + session.settings(new SettingsFrame(settings, false), Callback.NOOP); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + default: + { + sleep(processing); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + break; + } + } + return null; + } + }); + http2.setMaxConcurrentStreams(1); + prepareServer(http2); + server.start(); + prepareClient(); + client.setMaxConnectionsPerDestination(2); + client.start(); + + // Prime the 2 connections. + primeConnection(); + + String host = "localhost"; + int port = connector.getLocalPort(); + + AbstractConnectionPool pool = (AbstractConnectionPool)client.resolveDestination(new Origin("http", host, port)).getConnectionPool(); + assertEquals(2, pool.getConnectionCount()); + + // Send a request on one connection, which sends back a SETTINGS frame on the other connection. + ContentResponse response = client.newRequest(host, port) + .path("/update_max_streams") + .send(); + assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Send 4 requests at once: 1 should go on one connection, 2 on the other connection, and 1 queued. + int count = 4; + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + client.newRequest(host, port) + .path("/" + i) + .send(result -> + { + if (result.isSucceeded()) + { + int status = result.getResponse().getStatus(); + if (status == HttpStatus.OK_200) + latch.countDown(); + else + fail("unexpected status " + status); + } + else + { + fail(result.getFailure()); + } + }); + } + + assertTrue(awaitLatch(latch, count * processing * 10, TimeUnit.MILLISECONDS)); + } + private void primeConnection() throws Exception { // Prime the connection so that the maxConcurrentStream setting arrives to the client. diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index 183b169249a3..e7138b297abd 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -131,7 +131,7 @@ public Pool(StrategyType strategyType, int maxEntries, boolean cache) this.maxEntries = maxEntries; this.strategyType = strategyType; this.cache = cache ? new ThreadLocal<>() : null; - nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; + this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null; } public int getReservedCount() @@ -169,6 +169,14 @@ public final void setMaxMultiplex(int maxMultiplex) if (maxMultiplex < 1) throw new IllegalArgumentException("Max multiplex must be >= 1"); this.maxMultiplex = maxMultiplex; + + try (Locker.Lock l = locker.lock()) + { + if (closed) + return; + + entries.forEach(entry -> entry.setMaxMultiplex(maxMultiplex)); + } } /** @@ -507,15 +515,28 @@ public class Entry // hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending // lo: multiplexing counter private final AtomicBiInteger state; - // The pooled item. This is not volatile as it is set once and then never changed. // Other threads accessing must check the state field above first, so a good before/after // relationship exists to make a memory barrier. private T pooled; + private volatile int maxMultiplex; Entry() { this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0); + this.maxMultiplex = Pool.this.maxMultiplex; + } + + public int getMaxMultiplex() + { + return maxMultiplex; + } + + public void setMaxMultiplex(int maxMultiplex) + { + if (maxMultiplex < 1) + throw new IllegalArgumentException("Max multiplex must be >= 1"); + this.maxMultiplex = maxMultiplex; } // for testing only