Skip to content

Commit

Permalink
Fixes #6254 - Total timeout not enforced for queued requests.
Browse files Browse the repository at this point in the history
Fixed logic in HttpDestination.RequestTimeouts, where now a timeout
is scheduled only when the expiration time is less than the existing one.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed May 11, 2021
1 parent 5f23689 commit da50e06
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 3 deletions.
Expand Up @@ -597,7 +597,7 @@ private void schedule(long expiresAt)
// When the timeout expires, scan the exchange queue for the next
// earliest exchange that may expire, and reschedule a new timeout.
long earliest = earliestTimeout.getAndUpdate(t -> Math.min(t, expiresAt));
if (expiresAt != earliest)
if (expiresAt < earliest)
{
// A new request expires earlier than previous requests, schedule it.
long delay = Math.max(0, expiresAt - System.nanoTime());
Expand Down
Expand Up @@ -54,12 +54,14 @@
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.opentest4j.TestAbortedException;

import static org.eclipse.jetty.http.client.Transport.UNIX_SOCKET;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -501,6 +503,83 @@ protected void service(String target, org.eclipse.jetty.server.Request jettyRequ
assertTrue(latch.await(5, TimeUnit.SECONDS));
}

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testRequestQueuedDoesNotCancelTimeoutOfQueuedRequests(Transport transport) throws Exception
{
init(transport);

CountDownLatch serverLatch = new CountDownLatch(1);
scenario.start(new EmptyServerHandler()
{
@Override
protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
if (request.getRequestURI().startsWith("/one"))
{
try
{
serverLatch.await();
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
}
});

scenario.client.setMaxConnectionsPerDestination(1);
scenario.setMaxRequestsPerConnection(1);

// Send the first request so that the others get queued.
CountDownLatch latch1 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.path("/one")
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch1.countDown();
});

// Queue a second request, it should expire in the queue.
long timeout = 1000;
CountDownLatch latch2 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.path("/two")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
assertTrue(result.isFailed());
assertThat(result.getFailure(), Matchers.instanceOf(TimeoutException.class));
latch2.countDown();
});

Thread.sleep(timeout);

// Queue a third request, it should not reset the timeout of the second request.
CountDownLatch latch3 = new CountDownLatch(1);
scenario.client.newRequest(scenario.newURI())
.path("/three")
.timeout(2 * timeout, TimeUnit.MILLISECONDS)
.send(result ->
{
assertTrue(result.isSucceeded());
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
latch3.countDown();
});

// We have already slept a timeout, expect the second request to be back in another timeout.
assertTrue(latch2.await(2 * timeout, TimeUnit.MILLISECONDS));

// Release the first request so the third can be served as well.
serverLatch.countDown();

assertTrue(latch1.await(2 * timeout, TimeUnit.MILLISECONDS));
assertTrue(latch3.await(2 * timeout, TimeUnit.MILLISECONDS));
}

private void assumeConnectTimeout(String host, int port, int connectTimeout)
{
try (Socket socket = new Socket())
Expand All @@ -516,7 +595,6 @@ private void assumeConnectTimeout(String host, int port, int connectTimeout)
catch (SocketTimeoutException x)
{
// Expected timeout during connect, continue the test.
return;
}
catch (Throwable x)
{
Expand All @@ -525,7 +603,7 @@ private void assumeConnectTimeout(String host, int port, int connectTimeout)
}
}

private class TimeoutHandler extends AbstractHandler
private static class TimeoutHandler extends AbstractHandler
{
private final long timeout;

Expand Down
Expand Up @@ -286,6 +286,13 @@ public void setServerIdleTimeout(long idleTimeout)
setConnectionIdleTimeout(idleTimeout);
}

public void setMaxRequestsPerConnection(int maxRequestsPerConnection)
{
AbstractHTTP2ServerConnectionFactory h2 = connector.getConnectionFactory(AbstractHTTP2ServerConnectionFactory.class);
if (h2 != null)
h2.setMaxConcurrentStreams(maxRequestsPerConnection);
}

public void start(Handler handler) throws Exception
{
start(handler, null);
Expand Down

0 comments on commit da50e06

Please sign in to comment.