diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 6ca7165b350c..fd8ae0bb52b5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -440,9 +440,14 @@ public void release(Connection connection) { // Trigger the next request after releasing the connection. if (connectionPool.release(connection)) + { send(false); + } else + { connection.close(); + send(true); + } } else { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 57486da0360b..7b909f01d10a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,22 +18,22 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicInteger; - import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; @ManagedObject public class RoundRobinConnectionPool extends MultiplexConnectionPool { private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class); - private final AtomicInteger offset = new AtomicInteger(); + private final Locker lock = new Locker(); private final Pool pool; + private int offset; public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { @@ -47,26 +47,32 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, } @Override - protected Connection activate() + protected Connection acquire(boolean create) { - int offset = this.offset.get(); - Connection connection = activate(offset); - if (connection != null) - this.offset.getAndIncrement(); - return connection; + // If there are queued requests and connections get + // closed due to idle timeout or overuse, we want to + // aggressively try to open new connections to replace + // those that were closed to process queued requests. + return super.acquire(true); } - private Connection activate(int offset) + @Override + protected Connection activate() { - Pool.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries())); - if (LOG.isDebugEnabled()) - LOG.debug("activated '{}'", entry); - if (entry != null) + Pool.Entry entry; + try (Locker.Lock l = lock.lock()) { - Connection connection = entry.getPooled(); - acquired(connection); - return connection; + int index = Math.abs(offset % pool.getMaxEntries()); + entry = pool.acquireAt(index); + if (LOG.isDebugEnabled()) + LOG.debug("activated at index={} entry={}", index, entry); + if (entry != null) + ++offset; } - return null; + if (entry == null) + return null; + Connection connection = entry.getPooled(); + acquired(connection); + return connection; } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index 6d79e0bedd94..90881bd69eaf 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -25,7 +25,6 @@ import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http2.ErrorCode; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -99,12 +98,9 @@ public void send(HttpExchange exchange) @Override public void release() { + setStream(null); connection.release(this); - } - - void onStreamClosed(IStream stream) - { - connection.onStreamClosed(stream, this); + getHttpDestination().release(getHttpConnection()); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 2258aea802ec..c138b059a6b2 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.ErrorCode; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -119,16 +118,6 @@ else if (isRecycleHttpChannels()) } } - void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} closed for {}", stream, channel); - channel.setStream(null); - // Only non-push channels are released. - if (stream.isLocal()) - getHttpDestination().release(this); - } - @Override public boolean onIdleTimeout(long idleTimeout) { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 854b6380538f..17c2477cb414 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -38,7 +38,6 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; @@ -201,12 +200,6 @@ public void onFailure(Stream stream, int error, String reason, Throwable failure callback.succeeded(); } - @Override - public void onClosed(Stream stream) - { - getHttpChannel().onStreamClosed((IStream)stream); - } - private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback) { contentNotifier.offer(exchange, frame, callback); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java index 45b0a512df78..ad63a8e2d292 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -180,4 +182,56 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate))); } } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testMultiplexWithMaxUsage(Transport transport) throws Exception + { + init(transport); + + int multiplex = 1; + if (scenario.transport.isHttp2Based()) + multiplex = 2; + int maxMultiplex = multiplex; + + int maxUsage = 2; + int maxConnections = 2; + int count = maxConnections * maxMultiplex * maxUsage; + + List remotePorts = new CopyOnWriteArrayList<>(); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + remotePorts.add(request.getRemotePort()); + } + }); + scenario.client.getTransport().setConnectionPoolFactory(destination -> + { + RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex); + pool.setMaxUsageCount(maxUsage); + return pool; + }); + + CountDownLatch clientLatch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + scenario.client.newRequest(scenario.newURI()) + .path("/" + i) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + }); + } + assertTrue(clientLatch.await(count, TimeUnit.SECONDS)); + assertEquals(count, remotePorts.size()); + + Map results = remotePorts.stream() + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + assertEquals(count / maxUsage, results.size(), remotePorts.toString()); + assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString()); + } }