Skip to content

Commit

Permalink
Issue #5105 - add optional configuration to not wait for suspended re…
Browse files Browse the repository at this point in the history
…quests in StatisticsHandler

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Aug 19, 2020
1 parent 32358b1 commit a65f001
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 16 deletions.
6 changes: 5 additions & 1 deletion jetty-server/src/main/config/etc/jetty-stats.xml
Expand Up @@ -5,7 +5,11 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="insertHandler">
<Arg>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler"></New>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler">
<Set name="waitForSuspendedRequestsOnShutdown">
<Property name="jetty.statistics.waitForSuspendedRequestsOnShutdown" default="true"/>
</Set>
</New>
</Arg>
</Call>
<Call class="org.eclipse.jetty.server.ServerConnectionStatistics" name="addToAllConnectors">
Expand Down
5 changes: 5 additions & 0 deletions jetty-server/src/main/config/modules/stats.mod
Expand Up @@ -15,3 +15,8 @@ etc/jetty-stats.xml

[ini]
jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet

[ini-template]

## If the Graceful shutdown should wait for suspended requests as well as dispatched ones.
# jetty.statistics.waitForSuspendedRequestsOnShutdown=true
Expand Up @@ -66,6 +66,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();

private boolean waitForSuspendedRequestsOnShutdown = true;

private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
{
@Override
Expand Down Expand Up @@ -98,21 +100,20 @@ public void onError(AsyncEvent event) throws IOException
@Override
public void onComplete(AsyncEvent event) throws IOException
{
System.err.println("On Async Complete for " + event);
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();

Request request = state.getBaseRequest();
final long elapsed = System.currentTimeMillis() - request.getTimeStamp();

long d = _requestStats.decrement();
long numRequests = _requestStats.decrement();
_requestTimeStats.record(elapsed);

updateResponse(request);

_asyncWaitStats.decrement();

// If we have no more dispatches, should we signal shutdown?
if (d == 0)
if (numRequests == 0 && waitForSuspendedRequestsOnShutdown)
{
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
Expand Down Expand Up @@ -178,8 +179,8 @@ public void handle(String path, Request baseRequest, HttpServletRequest request,
final long now = System.currentTimeMillis();
final long dispatched = now - start;

// TODO: make dispatchedStats optional metric for shutdown
_dispatchedStats.decrement();
long numRequests = -1;
long numDispatches = _dispatchedStats.decrement();
_dispatchedTimeStats.record(dispatched);

if (state.isInitial())
Expand All @@ -191,20 +192,21 @@ public void handle(String path, Request baseRequest, HttpServletRequest request,
}
else
{
long d = _requestStats.decrement();
numRequests = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);

// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
}
}
}

FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();

// If we either have no more requests or dispatches, we can complete shutdown.
if (waitForSuspendedRequestsOnShutdown ? (numRequests == 0) : (numDispatches == 0))
shutdown.succeeded();
}
}
}

Expand Down Expand Up @@ -257,6 +259,16 @@ protected void doStop() throws Exception
super.doStop();
}

/**
* Set whether the graceful shutdown should wait for all requests to complete (including suspended requests)
* or whether it should only wait for all the actively dispatched requests to complete.
* @param waitForSuspendedRequests true to wait for suspended requests on graceful shutdown.
*/
public void waitForSuspendedRequestsOnShutdown(boolean waitForSuspendedRequests)
{
this.waitForSuspendedRequestsOnShutdown = waitForSuspendedRequests;
}

/**
* @return the number of requests handled by this handler
* since {@link #statsReset()} was last called, excluding
Expand Down
Expand Up @@ -19,10 +19,12 @@
package org.eclipse.jetty.server.handler;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
Expand All @@ -45,6 +47,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class StatisticsHandlerTest
Expand Down Expand Up @@ -422,6 +425,114 @@ public void handle(String path, Request request, HttpServletRequest httpRequest,
barrier[3].await();
}

@Test
public void waitForSuspendedRequestTest() throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(3);
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
final CountDownLatch dispatched = new CountDownLatch(1);
_statsHandler.waitForSuspendedRequestsOnShutdown(true);
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
{
request.setHandled(true);

try
{
if (path.contains("async"))
{
asyncHolder.set(request.startAsync());
barrier.await();
}
else
{
barrier.await();
dispatched.await();
}
}
catch (Exception e)
{
throw new ServletException(e);
}
}
});
_server.start();

// One request to block while dispatched other will go async.
_connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
_connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n");

// Ensure the requests have been dispatched and async started.
barrier.await();
AsyncContext asyncContext = Objects.requireNonNull(asyncHolder.get());

// Shutdown should timeout as there are two active requests.
Future<Void> shutdown = _statsHandler.shutdown();
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));

// When the dispatched thread exits we should still be waiting on the async request.
dispatched.countDown();
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));

// Shutdown should complete only now the AsyncContext is completed.
asyncContext.complete();
shutdown.get(5, TimeUnit.MILLISECONDS);
}

@Test
public void doNotWaitForSuspendedRequestTest() throws Exception
{
CyclicBarrier barrier = new CyclicBarrier(3);
final AtomicReference<AsyncContext> asyncHolder = new AtomicReference<>();
final CountDownLatch dispatched = new CountDownLatch(1);
_statsHandler.waitForSuspendedRequestsOnShutdown(false);
_statsHandler.setHandler(new AbstractHandler()
{
@Override
public void handle(String path, Request request, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException
{
request.setHandled(true);

try
{
if (path.contains("async"))
{
asyncHolder.set(request.startAsync());
barrier.await();
}
else
{
barrier.await();
dispatched.await();
}
}
catch (Exception e)
{
throw new ServletException(e);
}
}
});
_server.start();

// One request to block while dispatched other will go async.
_connector.executeRequest("GET / HTTP/1.1\r\nHost: localhost\r\n\r\n");
_connector.executeRequest("GET /async HTTP/1.1\r\nHost: localhost\r\n\r\n");

// Ensure the requests have been dispatched and async started.
barrier.await();
assertNotNull(asyncHolder.get());

// Shutdown should timeout as there is a request dispatched.
Future<Void> shutdown = _statsHandler.shutdown();
assertThrows(TimeoutException.class, () -> shutdown.get(1, TimeUnit.SECONDS));

// When the dispatched thread exits we should shutdown even though we have a waiting async request.
dispatched.countDown();
shutdown.get(5, TimeUnit.MILLISECONDS);
}

@Test
public void testSuspendExpire() throws Exception
{
Expand Down

0 comments on commit a65f001

Please sign in to comment.