diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index f82a194eaf48..2686b22cf95b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -65,7 +65,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest private final ProxyConfiguration.Proxy proxy; private final ClientConnectionFactory connectionFactory; private final HttpField hostField; - private final TimeoutTask timeout; + private final RequestTimeouts requestTimeouts; private ConnectionPool connectionPool; public HttpDestination(HttpClient client, Origin origin) @@ -78,7 +78,7 @@ public HttpDestination(HttpClient client, Origin origin) this.requestNotifier = new RequestNotifier(client); this.responseNotifier = new ResponseNotifier(); - this.timeout = new TimeoutTask(client.getScheduler()); + this.requestTimeouts = new RequestTimeouts(client.getScheduler()); ProxyConfiguration proxyConfig = client.getProxyConfiguration(); proxy = proxyConfig.match(origin); @@ -272,7 +272,7 @@ public void send(HttpExchange exchange) { long expiresAt = request.getTimeoutAt(); if (expiresAt != -1) - timeout.schedule(expiresAt); + requestTimeouts.schedule(expiresAt); if (!client.isRunning() && exchanges.remove(exchange)) { @@ -425,7 +425,7 @@ public void close() if (LOG.isDebugEnabled()) LOG.debug("Closed {}", this); connectionPool.close(); - timeout.destroy(); + requestTimeouts.destroy(); } public void release(Connection connection) @@ -547,15 +547,15 @@ public String toString() } /** - * This class enforces the total timeout for exchanges that are still in the queue. - * The total timeout for exchanges that are not in the destination queue is enforced - * by {@link HttpChannel}. + *

Enforces the total timeout for for exchanges that are still in the queue.

+ *

The total timeout for exchanges that are not in the destination queue + * is enforced in {@link HttpChannel} by {@link TimeoutCompleteListener}.

*/ - private class TimeoutTask extends CyclicTimeout + private class RequestTimeouts extends CyclicTimeout { - private final AtomicLong nextTimeout = new AtomicLong(Long.MAX_VALUE); + private final AtomicLong earliestTimeout = new AtomicLong(Long.MAX_VALUE); - private TimeoutTask(Scheduler scheduler) + private RequestTimeouts(Scheduler scheduler) { super(scheduler); } @@ -564,14 +564,18 @@ private TimeoutTask(Scheduler scheduler) public void onTimeoutExpired() { if (LOG.isDebugEnabled()) - LOG.debug("{} timeout expired", this); + LOG.debug("{} timeouts check", this); - nextTimeout.set(Long.MAX_VALUE); long now = System.nanoTime(); - long nextExpiresAt = Long.MAX_VALUE; - - // Check all queued exchanges for those that have expired - // and to determine when the next check must be. + long earliest = Long.MAX_VALUE; + // Reset the earliest timeout so we can expire again. + // A concurrent call to schedule(long) may lose an earliest + // value, but the corresponding exchange is already enqueued + // and will be seen by scanning the exchange queue below. + earliestTimeout.set(earliest); + + // Scan the message queue to abort expired exchanges + // and to find the exchange that expire the earliest. for (HttpExchange exchange : exchanges) { HttpRequest request = exchange.getRequest(); @@ -580,34 +584,27 @@ public void onTimeoutExpired() continue; if (expiresAt <= now) request.abort(new TimeoutException("Total timeout " + request.getTimeout() + " ms elapsed")); - else if (expiresAt < nextExpiresAt) - nextExpiresAt = expiresAt; + else if (expiresAt < earliest) + earliest = expiresAt; } - if (nextExpiresAt < Long.MAX_VALUE && client.isRunning()) - schedule(nextExpiresAt); + if (earliest < Long.MAX_VALUE && client.isRunning()) + schedule(earliest); } private void schedule(long expiresAt) { - // Schedule a timeout for the soonest any known exchange can expire. - // If subsequently that exchange is removed from the queue, the - // timeout is not cancelled, instead the entire queue is swept - // for expired exchanges and a new timeout is set. - long timeoutAt = nextTimeout.getAndUpdate(e -> Math.min(e, expiresAt)); - if (timeoutAt != expiresAt) + // Schedule a timeout for the earliest exchange that may expire. + // When the timeout expires, scan the exchange queue for the next + // earliest exchange that may expire, and reschedule a new timeout. + long prevEarliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt)); + if (expiresAt < prevEarliest) { - long delay = expiresAt - System.nanoTime(); - if (delay <= 0) - { - onTimeoutExpired(); - } - else - { - schedule(delay, TimeUnit.NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("{} scheduled timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); - } + // A new request expires earlier than previous requests, schedule it. + long delay = Math.max(0, expiresAt - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("{} scheduling timeout in {} ms", this, TimeUnit.NANOSECONDS.toMillis(delay)); + schedule(delay, TimeUnit.NANOSECONDS); } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java index 7457fbd32e1c..464178857797 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/TimeoutCompleteListener.java @@ -34,7 +34,7 @@ public class TimeoutCompleteListener extends CyclicTimeout implements Response.C { private static final Logger LOG = Log.getLogger(TimeoutCompleteListener.class); - private final AtomicReference request = new AtomicReference<>(); + private final AtomicReference requestTimeout = new AtomicReference<>(); public TimeoutCompleteListener(Scheduler scheduler) { @@ -44,7 +44,7 @@ public TimeoutCompleteListener(Scheduler scheduler) @Override public void onTimeoutExpired() { - Request request = this.request.getAndSet(null); + Request request = requestTimeout.getAndSet(null); if (LOG.isDebugEnabled()) LOG.debug("Total timeout {} ms elapsed for {} on {}", request.getTimeout(), request, this); if (request != null) @@ -54,7 +54,7 @@ public void onTimeoutExpired() @Override public void onComplete(Result result) { - Request request = this.request.getAndSet(null); + Request request = requestTimeout.getAndSet(null); if (request != null) { boolean cancelled = cancel(); @@ -65,19 +65,12 @@ public void onComplete(Result result) void schedule(HttpRequest request, long timeoutAt) { - if (this.request.compareAndSet(null, request)) + if (requestTimeout.compareAndSet(null, request)) { - long delay = timeoutAt - System.nanoTime(); - if (delay <= 0) - { - onTimeoutExpired(); - } - else - { - schedule(delay, TimeUnit.NANOSECONDS); - if (LOG.isDebugEnabled()) - LOG.debug("Scheduled timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this); - } + long delay = Math.max(0, timeoutAt - System.nanoTime()); + if (LOG.isDebugEnabled()) + LOG.debug("Scheduling timeout in {} ms for {} on {}", TimeUnit.NANOSECONDS.toMillis(delay), request, this); + schedule(delay, TimeUnit.NANOSECONDS); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java index e4b5f7e6cf01..60a04ec0b39b 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/CyclicTimeout.java @@ -33,6 +33,23 @@ *

Subclasses should implement {@link #onTimeoutExpired()}.

*

This implementation is optimised assuming that the timeout * will mostly be cancelled and then reused with a similar value.

+ *

The typical scenario to use this class is when you have events + * that postpone (by re-scheduling), or cancel then re-schedule, a + * timeout for a single entity. + * For example: connection idleness, where for each connection there + * is a CyclicTimeout and a read/write postpones the timeout; when + * the timeout expires, the implementation checks against a timestamp + * if the connection is really idle. + * Another example: HTTP session expiration, where for each HTTP + * session there is a CyclicTimeout and at the beginning of the + * request processing the timeout is canceled (via cancel()), but at + * the end of the request processing the timeout is re-scheduled.

+ *

Another typical scenario is for a parent entity to manage + * the timeouts of many children entities; the timeout is scheduled + * for the child entity that expires the earlier; when the timeout + * expires, the implementation scans the children entities to find + * the expired child entities and to find the next child entity + * that expires the earlier.

*

This implementation has a {@link Timeout} holding the time * at which the scheduled task should fire, and a linked list of * {@link Wakeup}, each holding the actual scheduled task.

@@ -102,9 +119,12 @@ public boolean schedule(long delay, TimeUnit units) if (_timeout.compareAndSet(timeout, new Timeout(newTimeoutAt, wakeup))) { if (LOG.isDebugEnabled()) - LOG.debug("Installed timeout in {} ms, waking up in {} ms", + { + LOG.debug("Installed timeout in {} ms, {} wake up in {} ms", units.toMillis(delay), + newWakeup != null ? "new" : "existing", TimeUnit.NANOSECONDS.toMillis(wakeup._at - now)); + } break; } } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java index e841e7b90199..ef818d9434a3 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientTimeoutTest.java @@ -54,12 +54,14 @@ import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.hamcrest.Matchers; import org.junit.jupiter.api.Assumptions; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; import org.opentest4j.TestAbortedException; import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -501,6 +503,83 @@ protected void service(String target, org.eclipse.jetty.server.Request jettyRequ assertTrue(latch.await(5, TimeUnit.SECONDS)); } + @ParameterizedTest + @ArgumentsSource(TransportProvider.class) + public void testRequestQueuedDoesNotCancelTimeoutOfQueuedRequests(Transport transport) throws Exception + { + init(transport); + + CountDownLatch serverLatch = new CountDownLatch(1); + scenario.start(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + if (request.getRequestURI().startsWith("/one")) + { + try + { + serverLatch.await(); + } + catch (InterruptedException x) + { + throw new InterruptedIOException(); + } + } + } + }); + + scenario.client.setMaxConnectionsPerDestination(1); + scenario.setMaxRequestsPerConnection(1); + + // Send the first request so that the others get queued. + CountDownLatch latch1 = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .path("/one") + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + latch1.countDown(); + }); + + // Queue a second request, it should expire in the queue. + long timeout = 1000; + CountDownLatch latch2 = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .path("/two") + .timeout(2 * timeout, TimeUnit.MILLISECONDS) + .send(result -> + { + assertTrue(result.isFailed()); + assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class)); + latch2.countDown(); + }); + + Thread.sleep(timeout); + + // Queue a third request, it should not reset the timeout of the second request. + CountDownLatch latch3 = new CountDownLatch(1); + scenario.client.newRequest(scenario.newURI()) + .path("/three") + .timeout(2 * timeout, TimeUnit.MILLISECONDS) + .send(result -> + { + assertTrue(result.isSucceeded()); + assertEquals(HttpStatus.OK_200, result.getResponse().getStatus()); + latch3.countDown(); + }); + + // We have already slept a timeout, expect the second request to be back in another timeout. + assertTrue(latch2.await(2 * timeout, TimeUnit.MILLISECONDS)); + + // Release the first request so the third can be served as well. + serverLatch.countDown(); + + assertTrue(latch1.await(2 * timeout, TimeUnit.MILLISECONDS)); + assertTrue(latch3.await(2 * timeout, TimeUnit.MILLISECONDS)); + } + private void assumeConnectTimeout(String host, int port, int connectTimeout) { try (Socket socket = new Socket()) @@ -516,7 +595,6 @@ private void assumeConnectTimeout(String host, int port, int connectTimeout) catch (SocketTimeoutException x) { // Expected timeout during connect, continue the test. - return; } catch (Throwable x) { @@ -525,7 +603,7 @@ private void assumeConnectTimeout(String host, int port, int connectTimeout) } } - private class TimeoutHandler extends AbstractHandler + private static class TimeoutHandler extends AbstractHandler { private final long timeout; diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java index ee807064dbec..7034315db684 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/TransportScenario.java @@ -286,6 +286,13 @@ public void setServerIdleTimeout(long idleTimeout) setConnectionIdleTimeout(idleTimeout); } + public void setMaxRequestsPerConnection(int maxRequestsPerConnection) + { + AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class); + if (h2 != null) + h2.setMaxConcurrentStreams(maxRequestsPerConnection); + } + public void start(Handler handler) throws Exception { start(handler, null);