From e3690c645319302d3873aa751ba0be7cf02500f5 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 5 Jan 2021 23:13:07 +0100 Subject: [PATCH] Fixes #5855 - HttpClient may not send queued requests. Changed the AbstractConnectionPool.acquire() logic to call tryCreate() even when create=false. This is necessary when e.g. a sender thread T2 with create=true steals a connection whose creation was triggered by another sender thread T1. In the old code, T2 did not trigger the creation of a connection, possibly leaving a request queued. In the new code, T2 would call tryCreate(queuedRequests), possibly triggering the creation of a connection. This change re-introduces the fact that when sending e.g. 20 requests concurrently, 20+ connections may be created. However, it is better to err on creating more than creating less and leaving requests queued. Signed-off-by: Simone Bordet --- .../jetty/client/AbstractConnectionPool.java | 63 ++++++++----- .../eclipse/jetty/client/HttpDestination.java | 44 +++++----- .../jetty/client/ConnectionPoolTest.java | 88 +++++++++++++++++-- .../http/HttpDestinationOverHTTPTest.java | 10 ++- 4 files changed, 150 insertions(+), 55 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 61a725adc895..84e9b3790dbf 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -190,11 +190,15 @@ public Connection acquire() *

Returns an idle connection, if available; * if an idle connection is not available, and the given {@code create} parameter is {@code true} * or {@link #isMaximizeConnections()} is {@code true}, - * then schedules the opening of a new connection, if possible within the configuration of this + * then attempts to open a new connection, if possible within the configuration of this * connection pool (for example, if it does not exceed the max connection count); - * otherwise returns {@code null}.

+ * otherwise it attempts to open a new connection, if the number of queued requests is + * greater than the number of pending connections; + * if no connection is available even after the attempts to open, return {@code null}.

+ *

The {@code create} parameter is just a hint: the connection may be created even if + * {@code false}, or may not be created even if {@code true}.

* - * @param create whether to schedule the opening of a connection if no idle connections are available + * @param create a hint to attempt to open a new connection if no idle connections are available * @return an idle connection or {@code null} if no idle connections are available * @see #tryCreate(int) */ @@ -203,9 +207,23 @@ protected Connection acquire(boolean create) if (LOG.isDebugEnabled()) LOG.debug("Acquiring create={} on {}", create, this); Connection connection = activate(); - if (connection == null && (create || isMaximizeConnections())) + if (connection == null) { - tryCreate(destination.getQueuedRequestCount()); + if (create || isMaximizeConnections()) + { + // Try to forcibly create a connection if none is available. + tryCreate(-1); + } + else + { + // QueuedRequests may be stale and different from pool.pending. + // So tryCreate() may be a no-operation (when queuedRequests < pool.pending); + // or tryCreate() may create more connections than necessary, when + // queuedRequests read below is stale and some request has just been + // dequeued to be processed causing queuedRequests > pool.pending. + int queuedRequests = destination.getQueuedRequestCount(); + tryCreate(queuedRequests); + } connection = activate(); } return connection; @@ -226,7 +244,7 @@ protected void tryCreate(int maxPending) tryCreateAsync(maxPending); } - private CompletableFuture tryCreateAsync(int maxPending) + private CompletableFuture tryCreateAsync(int maxPending) { int connectionCount = getConnectionCount(); if (LOG.isDebugEnabled()) @@ -237,39 +255,42 @@ private CompletableFuture tryCreateAsync(int maxPending) return CompletableFuture.completedFuture(null); if (LOG.isDebugEnabled()) - LOG.debug("Creating connection {}/{}", connectionCount, getMaxConnectionCount()); + LOG.debug("Creating connection {}/{} at {}", connectionCount, getMaxConnectionCount(), entry); - CompletableFuture future = new CompletableFuture<>(); - destination.newConnection(new Promise() + Promise.Completable future = new Promise.Completable() { @Override public void succeeded(Connection connection) { if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation succeeded {}", connectionCount, getMaxConnectionCount(), connection); - if (!(connection instanceof Attachable)) + LOG.debug("Connection {}/{} creation succeeded at {}: {}", connectionCount, getMaxConnectionCount(), entry, connection); + if (connection instanceof Attachable) + { + ((Attachable)connection).setAttachment(entry); + onCreated(connection); + entry.enable(connection, false); + idle(connection, false); + complete(null); + proceed(); + } + else { failed(new IllegalArgumentException("Invalid connection object: " + connection)); - return; } - ((Attachable)connection).setAttachment(entry); - onCreated(connection); - entry.enable(connection, false); - idle(connection, false); - future.complete(null); - proceed(); } @Override public void failed(Throwable x) { if (LOG.isDebugEnabled()) - LOG.debug("Connection {}/{} creation failed", connectionCount, getMaxConnectionCount(), x); + LOG.debug("Connection {}/{} creation failed at {}", connectionCount, getMaxConnectionCount(), entry, x); entry.remove(); - future.completeExceptionally(x); + completeExceptionally(x); requester.failed(x); } - }); + }; + + destination.newConnection(future); return future; } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index c8866e6dc00e..cfcc06e181bd 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -109,7 +109,7 @@ public HttpDestination(HttpClient client, Origin origin) protected void doStart() throws Exception { this.connectionPool = newConnectionPool(client); - addBean(connectionPool); + addBean(connectionPool, true); super.doStart(); Sweeper sweeper = client.getBean(Sweeper.class); if (sweeper != null && connectionPool instanceof Sweeper.Sweepable) @@ -311,9 +311,8 @@ public void send() private void send(boolean create) { - if (getHttpExchanges().isEmpty()) - return; - process(create); + if (!getHttpExchanges().isEmpty()) + process(create); } private void process(boolean create) @@ -321,7 +320,10 @@ private void process(boolean create) // The loop is necessary in case of a new multiplexed connection, // when a single thread notified of the connection opening must // process all queued exchanges. - // In other cases looping is a work-stealing optimization. + // It is also necessary when thread T1 cannot acquire a connection + // (for example, it has been stolen by thread T2 and the pool has + // enough pending reservations). T1 returns without doing anything + // and therefore it is T2 that must send both request R1 and R2. while (true) { Connection connection; @@ -331,14 +333,15 @@ private void process(boolean create) connection = connectionPool.acquire(); if (connection == null) break; - ProcessResult result = process(connection); - if (result == ProcessResult.FINISH) + boolean proceed = process(connection); + if (proceed) + create = false; + else break; - create = result == ProcessResult.RESTART; } } - private ProcessResult process(Connection connection) + private boolean process(Connection connection) { HttpClient client = getHttpClient(); HttpExchange exchange = getHttpExchanges().poll(); @@ -354,7 +357,7 @@ private ProcessResult process(Connection connection) LOG.debug("{} is stopping", client); connection.close(); } - return ProcessResult.FINISH; + return false; } else { @@ -372,9 +375,7 @@ private ProcessResult process(Connection connection) // is created. Aborting the exchange a second time will result in // a no-operation, so we just abort here to cover that edge case. exchange.abort(cause); - return getHttpExchanges().size() > 0 - ? (released ? ProcessResult.CONTINUE : ProcessResult.RESTART) - : ProcessResult.FINISH; + return getQueuedRequestCount() > 0; } SendFailure failure = send(connection, exchange); @@ -382,7 +383,7 @@ private ProcessResult process(Connection connection) { // Aggressively send other queued requests // in case connections are multiplexed. - return getQueuedRequestCount() > 0 ? ProcessResult.CONTINUE : ProcessResult.FINISH; + return getQueuedRequestCount() > 0; } if (LOG.isDebugEnabled()) @@ -392,10 +393,10 @@ private ProcessResult process(Connection connection) // Resend this exchange, likely on another connection, // and return false to avoid to re-enter this method. send(exchange); - return ProcessResult.FINISH; + return false; } request.abort(failure.failure); - return getHttpExchanges().size() > 0 ? ProcessResult.RESTART : ProcessResult.FINISH; + return getQueuedRequestCount() > 0; } } @@ -474,7 +475,7 @@ else if (removed) // Process queued requests that may be waiting. // We may create a connection that is not // needed, but it will eventually idle timeout. - process(true); + send(true); } return removed; } @@ -541,8 +542,8 @@ public String toString() asString(), hashCode(), proxy == null ? "" : "(via " + proxy + ")", - exchanges.size(), - connectionPool); + getQueuedRequestCount(), + getConnectionPool()); } /** @@ -610,9 +611,4 @@ private void schedule(long expiresAt) } } } - - private enum ProcessResult - { - RESTART, CONTINUE, FINISH - } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 0a679a1b155e..e6afe43e15c1 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -23,10 +23,12 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; +import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -244,9 +246,12 @@ else if (serverClose) } @ParameterizedTest - @MethodSource("pools") + @MethodSource("poolsNoRoundRobin") public void testQueuedRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception { + // Round robin connection pool does open a few more + // connections than expected, exclude it from this test. + startServer(new EmptyServerHandler()); HttpClientTransport transport = new HttpClientTransportOverHTTP(1); @@ -300,11 +305,10 @@ public void resolve(String host, int port, Promise> prom } @ParameterizedTest - @MethodSource("poolsNoRoundRobin") - public void testConcurrentRequestsDontOpenTooManyConnections(ConnectionPoolFactory factory) throws Exception + @MethodSource("pools") + public void testConcurrentRequestsWithSlowAddressResolver(ConnectionPoolFactory factory) throws Exception { - // Round robin connection pool does open a few more - // connections than expected, exclude it from this test. + // ConnectionPools may open a few more connections than expected. startServer(new EmptyServerHandler()); @@ -351,9 +355,81 @@ public void resolve(String host, int port, Promise> prom assertTrue(latch.await(count, TimeUnit.SECONDS)); List destinations = client.getDestinations(); assertEquals(1, destinations.size()); + } + + @ParameterizedTest + @MethodSource("pools") + public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception + { + int count = 50; + testConcurrentRequestsAllBlockedOnServer(factory, count, 2 * count); + } + + @ParameterizedTest + @MethodSource("pools") + public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception + { + int count = 50; + testConcurrentRequestsAllBlockedOnServer(factory, count, count); + } + + private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception + { + CyclicBarrier barrier = new CyclicBarrier(count); + + QueuedThreadPool serverThreads = new QueuedThreadPool(2 * count); + serverThreads.setName("server"); + server = new Server(serverThreads); + connector = new ServerConnector(server); + server.addConnector(connector); + server.setHandler(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + barrier.await(); + } + catch (Exception x) + { + throw new ServletException(x); + } + } + }); + server.start(); + + QueuedThreadPool clientThreads = new QueuedThreadPool(2 * count); + clientThreads.setName("client"); + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.setExecutor(clientThreads); + client.setMaxConnectionsPerDestination(maxConnections); + client.start(); + + // Send N requests to the server, all waiting on the server. + // This should open N connections, and the test verifies that + // all N are sent (i.e. the client does not keep any queued). + CountDownLatch latch = new CountDownLatch(count); + for (int i = 0; i < count; ++i) + { + int id = i; + clientThreads.execute(() -> client.newRequest("localhost", connector.getLocalPort()) + .path("/" + id) + .send(result -> + { + if (result.isSucceeded()) + latch.countDown(); + })); + } + + assertTrue(latch.await(5, TimeUnit.SECONDS), "server requests " + barrier.getNumberWaiting() + "<" + count + " - client: " + client.dump()); + List destinations = client.getDestinations(); + assertEquals(1, destinations.size()); HttpDestination destination = (HttpDestination)destinations.get(0); AbstractConnectionPool connectionPool = (AbstractConnectionPool)destination.getConnectionPool(); - assertThat(connectionPool.getConnectionCount(), Matchers.lessThanOrEqualTo(count)); + assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count)); } @ParameterizedTest diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java index edc150c5fd87..c5e1cc1462e7 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java @@ -64,10 +64,12 @@ public void testAcquireWithEmptyQueue(Scenario scenario) throws Exception destination.start(); DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool(); Connection connection = connectionPool.acquire(); - assertNull(connection); - // There are no queued requests, so no connection should be created. - connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS); - assertNull(connection); + if (connection == null) + { + // There are no queued requests, so the newly created connection will be idle. + connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS); + } + assertNotNull(connection); } }