From b13249fd14aded9f5e17fa8f30233a51fcadcf03 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 11 May 2021 22:14:45 +0200 Subject: [PATCH] Fixes #6254 - Total timeout not enforced for queued requests. Fixed logic in HttpDestination.RequestTimeouts, where now a timeout is scheduled only when the expiration time is less than the existing one. Various code cleanups. Renamed HttpDestination.TimeoutTask to RequestTimeouts for clarity. Improved javadocs, code comments and logging. Signed-off-by: Simone Bordet (cherry picked from commit 5f23689aa7f44c0660ba2ad92c7c6a15d7c4af15) (cherry picked from commit da50e06b640d448d42e642c842cf9bc647797a49) (cherry picked from commit 88ac10439a8b5ec1c34aaab4ccbf0f590aee33f8) --- .../eclipse/jetty/client/HttpDestination.java | 71 ++++++++--------- .../jetty/client/TimeoutCompleteListener.java | 23 ++---- .../org/eclipse/jetty/io/CyclicTimeout.java | 22 +++++- .../http/client/HttpClientTimeoutTest.java | 79 +++++++++++++++++++ .../jetty/http/client/TransportScenario.java | 7 ++ 5 files changed, 149 insertions(+), 53 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index f089895aac7e..8a83f0b3047c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -60,7 +60,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest private final ProxyConfiguration.Proxy proxy; private final ClientConnectionFactory connectionFactory; private final HttpField hostField; - private final TimeoutTask timeout; + private final RequestTimeouts requestTimeouts; private ConnectionPool connectionPool; public HttpDestination(HttpClient client, Origin origin) @@ -73,7 +73,7 @@ public HttpDestination(HttpClient client, Origin origin) this.requestNotifier = new RequestNotifier(client); this.responseNotifier = new ResponseNotifier(); - this.timeout = new TimeoutTask(client.getScheduler()); + this.requestTimeouts = new RequestTimeouts(client.getScheduler()); String host = HostPort.normalizeHost(getHost()); if (!client.isDefaultPort(getScheme(), getPort())) @@ -257,7 +257,7 @@ public void send(HttpExchange exchange) { long expiresAt = request.getTimeoutAt(); if (expiresAt != -1) - timeout.schedule(expiresAt); + requestTimeouts.schedule(expiresAt); if (!client.isRunning() && exchanges.remove(exchange)) { @@ -409,7 +409,7 @@ public void close() if (LOG.isDebugEnabled()) LOG.debug("Closed {}", this); connectionPool.close(); - timeout.destroy(); + requestTimeouts.destroy(); } public void release(Connection connection) @@ -527,15 +527,15 @@ public interface Multiplexed } /** - * This class enforces the total timeout for exchanges that are still in the queue. - * The total timeout for exchanges that are not in the destination queue is enforced - * by {@link HttpChannel}. + *

Enforces the total timeout for for exchanges that are still in the queue.

+ *

The total timeout for exchanges that are not in the destination queue + * is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.

*/ - private class TimeoutTask extends CyclicTimeout + private class RequestTimeouts extends CyclicTimeout { - private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE); - private TimeoutTask(Scheduler scheduler) + private RequestTimeouts(Scheduler scheduler) { super(scheduler); } @@ -544,14 +544,18 @@ private TimeoutTask(Scheduler scheduler) public void onTimeoutExpired() { if (LOG.isDebugEnabled()) - LOG.debug("{} timeout expired", this); + LOG.debug("{} timeouts check", this); - nextTimeout.set(Long.MAX_VALUE); long now = System.nanoTime(); - long nextExpiresAt = Long.MAX_VALUE; - - // Check all queued exchanges for those that have expired - // and to determine when the next check must be. + long earliest = Long.MAX_VALUE; + // Reset the earliest timeout so we can expire again. + // A concurrent call to schedule(long) may lose an earliest + // value, but the corresponding exchange is already enqueued + // and will be seen by scanning the exchange queue below. + earliestTimeout.set(earliest); + + // Scan the message queue to abort expired exchanges + // and to find the exchange that expire the earliest. for (HttpExchange exchange : exchanges) { HttpRequest request = exchange.getRequest(); @@ -560,34 +564,27 @@ public void onTimeoutExpired() continue; if (expiresAt <= now) request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed")); - else if (expiresAt < nextExpiresAt) - nextExpiresAt = expiresAt; + else if (expiresAt < earliest) + earliest = expiresAt; } - if (nextExpiresAt < Long.MAX_VALUE && client.isRunning()) - schedule(nextExpiresAt); + if (earliest < Long.MAX_VALUE && client.isRunning()) + schedule(earliest); } private void schedule(long expiresAt) { - // Schedule a timeout for the soonest any known exchange can expire. - // If subsequently that exchange is removed from the queue, the - // timeout is not cancelled, instead the entire queue is swept - // for expired exchanges and a new timeout is set. - long timeoutAt = nextTimeout.getAndUpdate(e -> Math.min(e, expiresAt)); - if (timeoutAt != expiresAt) + // Schedule a timeout for the earliest exchange that may expire. + // When the timeout expires, scan the exchange queue for the next + // earliest exchange that may expire, and reschedule a new timeout. + long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt)); + if (expiresAt < prevEarliest) { - long delay = expiresAt - System.nanoTime(); - if (delay <= 0) - { - onTimeoutExpired(); - } - else - { - schedule(delay, TimeUnit.NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("{} scheduled timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); - } + // A new request expires earlier than previous requests, schedule it. + long delay = Math.max(0, expiresAt - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); + schedule(delay, TimeUnit.NANOSECONDS); } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java index 2f3e7978fe5f..d8c2ca140369 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java @@ -29,7 +29,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C { private static final Logger LOG = LoggerFactory.getLogger(TimeoutCompleteListener.class); - private final AtomicReference request = new AtomicReference<>(); + private final AtomicReference requestTimeout = new AtomicReference<>(); public TimeoutCompleteListener(Scheduler scheduler) { @@ -39,7 +39,7 @@ public TimeoutCompleteListener(Scheduler scheduler) @Override public void onTimeoutExpired() { - Request request = this.request.getAndSet(null); + Request request = requestTimeout.getAndSet(null); if (LOG.isDebugEnabled()) LOG.debug("Total timeout {} ms elapsed for {} on {}", request.getTimeout(), request, this); if (request != null) @@ -49,7 +49,7 @@ public void onTimeoutExpired() @Override public void onComplete(Result result) { - Request request = this.request.getAndSet(null); + Request request = requestTimeout.getAndSet(null); if (request != null) { boolean cancelled = cancel(); @@ -60,19 +60,12 @@ public void onComplete(Result result) void schedule(HttpRequest request, long timeoutAt) { - if (this.request.compareAndSet(null, request)) + if (requestTimeout.compareAndSet(null, request)) { - long delay = timeoutAt - System.nanoTime(); - if (delay <= 0) - { - onTimeoutExpired(); - } - else - { - schedule(delay, TimeUnit.NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("Scheduled timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this); - } + long delay = Math.max(0, timeoutAt - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Scheduling timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this); + schedule(delay, TimeUnit.NANOSECONDS); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java index 76e1fc9dda98..5b88dc46e3d0 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java @@ -28,6 +28,23 @@ *

Subclasses should implement {@link #onTimeoutExpired()}.

*

This implementation is optimised assuming that the timeout * will mostly be cancelled and then reused with a similar value.

+ *

The typical scenario to use this class is when you have events + * that postpone (by re-scheduling), or cancel then re-schedule, a + * timeout for a single entity. + * For example: connection idleness, where for each connection there + * is a CyclicTimeout and a read/write postpones the timeout; when + * the timeout expires, the implementation checks against a timestamp + * if the connection is really idle. + * Another example: HTTP session expiration, where for each HTTP + * session there is a CyclicTimeout and at the beginning of the + * request processing the timeout is canceled (via cancel()), but at + * the end of the request processing the timeout is re-scheduled.

+ *

Another typical scenario is for a parent entity to manage + * the timeouts of many children entities; the timeout is scheduled + * for the child entity that expires the earlier; when the timeout + * expires, the implementation scans the children entities to find + * the expired child entities and to find the next child entity + * that expires the earlier.

*

This implementation has a {@link Timeout} holding the time * at which the scheduled task should fire, and a linked list of * {@link Wakeup}, each holding the actual scheduled task.

@@ -97,9 +114,12 @@ public boolean schedule(long delay, TimeUnit units) if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup))) { if (LOG.isDebugEnabled()) - LOG.debug("Installed timeout in {} ms, waking up in {} ms", + { + LOG.debug("Installed timeout in {} ms, {} wake up in {} ms", units.toMillis(delay), + newWakeup != null ? "new" : "existing", TimeUnit.NANOSECONDS.toMillis(wakeup._at - now)); + } break; } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java index b763c0fa32a6..8525f1a30f16 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java @@ -49,12 +49,14 @@ import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; import org.opentest4j.TestAbortedException; import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -494,6 +496,83 @@ protected void service(String target, org.eclipse.jetty.server.Request jettyRequ assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testRequestQueuedDoesNotCancelTimeoutOfQueuedRequests(Transport transport) throws Exception + { + init(transport); + + CountDownLatch serverLatch = new CountDownLatch(1); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + if (request.getRequestURI().startsWith("/one")) + { + try + { + serverLatch.await(); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + } + }); + + scenario.client.setMaxConnectionsPerDestination(1); + scenario.setMaxRequestsPerConnection(1); + + // Send the first request so that the others get queued. + CountDownLatch latch1 = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .path("/one") + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + latch1.countDown(); + }); + + // Queue a second request, it should expire in the queue. + long timeout = 1000; + CountDownLatch latch2 = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .path("/two") + .timeout(2 * timeout, TimeUnit.MILLISECONDS) + .send(result -> + { + assertTrue(result.isFailed()); + assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class)); + latch2.countDown(); + }); + + Thread.sleep(timeout); + + // Queue a third request, it should not reset the timeout of the second request. + CountDownLatch latch3 = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .path("/three") + .timeout(2 * timeout, TimeUnit.MILLISECONDS) + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + latch3.countDown(); + }); + + // We have already slept a timeout, expect the second request to be back in another timeout. + assertTrue(latch2.await(2 * timeout, TimeUnit.MILLISECONDS)); + + // Release the first request so the third can be served as well. + serverLatch.countDown(); + + assertTrue(latch1.await(2 * timeout, TimeUnit.MILLISECONDS)); + assertTrue(latch3.await(2 * timeout, TimeUnit.MILLISECONDS)); + } + private void assumeConnectTimeout(String host, int port, int connectTimeout) { try (Socket socket = new Socket()) diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java index 11e79649551a..a0d49548f065 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java @@ -266,6 +266,13 @@ public void setServerIdleTimeout(long idleTimeout) setConnectionIdleTimeout(idleTimeout); } + public void setMaxRequestsPerConnection(int maxRequestsPerConnection) + { + AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class); + if (h2 != null) + h2.setMaxConcurrentStreams(maxRequestsPerConnection); + } + public void start(Handler handler) throws Exception { start(handler, null);