From 0af5f676cd401c693cef52c96c2d26fb277379e3 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Sat, 22 Aug 2020 22:10:08 +0200 Subject: [PATCH] Issue #5147 - HTTP2 RoundRobinConnectionPool with maxUsage Reworked HTTP/2 release after an exchange is terminated. Previously, the release was bound to 2 events: onStreamClosed(), introduced for #2796, and exchangeTerminated(). Unfortunately, if the former happens before the latter and closes the connection, the latter will see the exchange as aborted, while in fact it was successful, causing what reported in #5147, an AsynchronousCloseException. Now, the release is always performed by the exchangeTerminated() event. With respect to #2796, the stream is always already closed by the time the exchangeTerminated() event fires (it was not before). Reworked the implementation of RoundRobinConnectionPool using a lock and aggressively trying to open new connections. A second fix is related to HttpDestination.release(Connection). If the connection is closed for e.g. overuse, we need to trigger the processing of queued requests via send(create: true). Signed-off-by: Simone Bordet --- .../eclipse/jetty/client/HttpDestination.java | 5 ++ .../client/RoundRobinConnectionPool.java | 42 ++++++++------- .../client/http/HttpChannelOverHTTP2.java | 8 +-- .../client/http/HttpConnectionOverHTTP2.java | 11 ---- .../client/http/HttpReceiverOverHTTP2.java | 7 --- .../client/RoundRobinConnectionPoolTest.java | 54 +++++++++++++++++++ 6 files changed, 85 insertions(+), 42 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 6ca7165b350c..fd8ae0bb52b5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -440,9 +440,14 @@ public void release(Connection connection) { // Trigger the next request after releasing the connection. if (connectionPool.release(connection)) + { send(false); + } else + { connection.close(); + send(true); + } } else { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 57486da0360b..7b909f01d10a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,22 +18,22 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicInteger; - import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Locker; @ManagedObject public class RoundRobinConnectionPool extends MultiplexConnectionPool { private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class); - private final AtomicInteger offset = new AtomicInteger(); + private final Locker lock = new Locker(); private final Pool pool; + private int offset; public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { @@ -47,26 +47,32 @@ public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, } @Override - protected Connection activate() + protected Connection acquire(boolean create) { - int offset = this.offset.get(); - Connection connection = activate(offset); - if (connection != null) - this.offset.getAndIncrement(); - return connection; + // If there are queued requests and connections get + // closed due to idle timeout or overuse, we want to + // aggressively try to open new connections to replace + // those that were closed to process queued requests. + return super.acquire(true); } - private Connection activate(int offset) + @Override + protected Connection activate() { - Pool.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries())); - if (LOG.isDebugEnabled()) - LOG.debug("activated '{}'", entry); - if (entry != null) + Pool.Entry entry; + try (Locker.Lock l = lock.lock()) { - Connection connection = entry.getPooled(); - acquired(connection); - return connection; + int index = Math.abs(offset % pool.getMaxEntries()); + entry = pool.acquireAt(index); + if (LOG.isDebugEnabled()) + LOG.debug("activated at index={} entry={}", index, entry); + if (entry != null) + ++offset; } - return null; + if (entry == null) + return null; + Connection connection = entry.getPooled(); + acquired(connection); + return connection; } } diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index 6d79e0bedd94..90881bd69eaf 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -25,7 +25,6 @@ import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http2.ErrorCode; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.ResetFrame; @@ -99,12 +98,9 @@ public void send(HttpExchange exchange) @Override public void release() { + setStream(null); connection.release(this); - } - - void onStreamClosed(IStream stream) - { - connection.onStreamClosed(stream, this); + getHttpDestination().release(getHttpConnection()); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java index 2258aea802ec..c138b059a6b2 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpConnectionOverHTTP2.java @@ -35,7 +35,6 @@ import org.eclipse.jetty.client.SendFailure; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http2.ErrorCode; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -119,16 +118,6 @@ else if (isRecycleHttpChannels()) } } - void onStreamClosed(IStream stream, HttpChannelOverHTTP2 channel) - { - if (LOG.isDebugEnabled()) - LOG.debug("{} closed for {}", stream, channel); - channel.setStream(null); - // Only non-push channels are released. - if (stream.isLocal()) - getHttpDestination().release(this); - } - @Override public boolean onIdleTimeout(long idleTimeout) { diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java index 854b6380538f..17c2477cb414 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpReceiverOverHTTP2.java @@ -38,7 +38,6 @@ import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; -import org.eclipse.jetty.http2.IStream; import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; @@ -201,12 +200,6 @@ public void onFailure(Stream stream, int error, String reason, Throwable failure callback.succeeded(); } - @Override - public void onClosed(Stream stream) - { - getHttpChannel().onStreamClosed((IStream)stream); - } - private void notifyContent(HttpExchange exchange, DataFrame frame, Callback callback) { contentNotifier.offer(exchange, frame, callback); diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java index 45b0a512df78..ad63a8e2d292 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java @@ -28,6 +28,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -180,4 +182,56 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r assertThat(remotePorts.get(i - 1), Matchers.not(Matchers.equalTo(candidate))); } } + + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testMultiplexWithMaxUsage(Transport transport) throws Exception + { + init(transport); + + int multiplex = 1; + if (scenario.transport.isHttp2Based()) + multiplex = 2; + int maxMultiplex = multiplex; + + int maxUsage = 2; + int maxConnections = 2; + int count = maxConnections * maxMultiplex * maxUsage; + + List remotePorts = new CopyOnWriteArrayList<>(); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) + { + remotePorts.add(request.getRemotePort()); + } + }); + scenario.client.getTransport().setConnectionPoolFactory(destination -> + { + RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex); + pool.setMaxUsageCount(maxUsage); + return pool; + }); + + CountDownLatch clientLatch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + scenario.client.newRequest(scenario.newURI()) + .path("/" + i) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + if (result.getResponse().getStatus() == HttpStatus.OK_200) + clientLatch.countDown(); + }); + } + assertTrue(clientLatch.await(count, TimeUnit.SECONDS)); + assertEquals(count, remotePorts.size()); + + Map results = remotePorts.stream() + .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); + assertEquals(count / maxUsage, results.size(), remotePorts.toString()); + assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString()); + } }