Skip to content

Commit

Permalink
Merge pull request #5175 from eclipse/jetty-9.4.x-5105-StatisticsHandler
Browse files Browse the repository at this point in the history
Issue #5105 - StatisticsHandler Graceful Shutdown of Async Requests
  • Loading branch information
lachlan-roberts committed Aug 28, 2020
2 parents 7cf6058 + a9c90d3 commit 001def4
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 48 deletions.
4 changes: 3 additions & 1 deletion jetty-server/src/main/config/etc/jetty-stats.xml
Expand Up @@ -5,7 +5,9 @@
<Configure id="Server" class="org.eclipse.jetty.server.Server">
<Call name="insertHandler">
<Arg>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler"></New>
<New id="StatsHandler" class="org.eclipse.jetty.server.handler.StatisticsHandler">
<Set name="gracefulShutdownWaitsForRequests"><Property name="jetty.statistics.gracefulShutdownWaitsForRequests" default="true"/></Set>
</New>
</Arg>
</Call>
<Call name="addBeanToAllConnectors">
Expand Down
5 changes: 5 additions & 0 deletions jetty-server/src/main/config/modules/stats.mod
Expand Up @@ -15,3 +15,8 @@ etc/jetty-stats.xml

[ini]
jetty.webapp.addServerClasses+=,-org.eclipse.jetty.servlet.StatisticsServlet

[ini-template]

## If the Graceful shutdown should wait for async requests as well as the currently dispatched ones.
# jetty.statistics.gracefulShutdownWaitsForRequests=true
Expand Up @@ -61,7 +61,6 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
final Thread thread = Thread.currentThread();
final String old_name = thread.getName();

boolean suspend = false;
boolean retry = false;
String name = (String)request.getAttribute("org.eclipse.jetty.thread.name");
if (name == null)
Expand Down Expand Up @@ -103,11 +102,10 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
finally
{
thread.setName(old_name);
suspend = baseRequest.getHttpChannelState().isSuspended();
if (suspend)
if (baseRequest.getHttpChannelState().isAsyncStarted())
{
request.setAttribute("org.eclipse.jetty.thread.name", name);
print(name, "SUSPEND");
print(name, "ASYNC");
}
else
print(name, "RESPONSE " + base_response.getStatus() + (ex == null ? "" : ("/" + ex)) + " " + base_response.getContentType());
Expand Down
Expand Up @@ -20,7 +20,6 @@

import java.io.IOException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import javax.servlet.AsyncEvent;
Expand Down Expand Up @@ -59,6 +58,7 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful

private final LongAdder _asyncDispatches = new LongAdder();
private final LongAdder _expires = new LongAdder();
private final LongAdder _errors = new LongAdder();

private final LongAdder _responses1xx = new LongAdder();
private final LongAdder _responses2xx = new LongAdder();
Expand All @@ -67,6 +67,8 @@ public class StatisticsHandler extends HandlerWrapper implements Graceful
private final LongAdder _responses5xx = new LongAdder();
private final LongAdder _responsesTotalBytes = new LongAdder();

private boolean _gracefulShutdownWaitsForRequests = true;

private final Graceful.Shutdown _shutdown = new Graceful.Shutdown()
{
@Override
Expand All @@ -76,44 +78,42 @@ protected FutureCallback newShutdownCallback()
}
};

private final AtomicBoolean _wrapWarning = new AtomicBoolean();

private final AsyncListener _onCompletion = new AsyncListener()
{
@Override
public void onTimeout(AsyncEvent event) throws IOException
public void onStartAsync(AsyncEvent event)
{
_expires.increment();
event.getAsyncContext().addListener(this);
}

@Override
public void onStartAsync(AsyncEvent event) throws IOException
public void onTimeout(AsyncEvent event)
{
event.getAsyncContext().addListener(this);
_expires.increment();
}

@Override
public void onError(AsyncEvent event) throws IOException
public void onError(AsyncEvent event)
{
_errors.increment();
}

@Override
public void onComplete(AsyncEvent event) throws IOException
public void onComplete(AsyncEvent event)
{
HttpChannelState state = ((AsyncContextEvent)event).getHttpChannelState();

Request request = state.getBaseRequest();
final long elapsed = System.currentTimeMillis() - request.getTimeStamp();

long d = _requestStats.decrement();
long numRequests = _requestStats.decrement();
_requestTimeStats.record(elapsed);

updateResponse(request);

_asyncWaitStats.decrement();

// If we have no more dispatches, should we signal shutdown?
if (d == 0)
if (numRequests == 0 && _gracefulShutdownWaitsForRequests)
{
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
Expand Down Expand Up @@ -149,6 +149,14 @@ public void statsReset()
@Override
public void handle(String path, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
Handler handler = getHandler();
if (handler == null || !isStarted() || isShutdown())
{
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
return;
}

_dispatchedStats.increment();

final long start;
Expand All @@ -168,51 +176,39 @@ public void handle(String path, Request baseRequest, HttpServletRequest request,

try
{
Handler handler = getHandler();
if (handler != null && !_shutdown.isShutdown() && isStarted())
handler.handle(path, baseRequest, request, response);
else
{
if (!baseRequest.isHandled())
baseRequest.setHandled(true);
else if (_wrapWarning.compareAndSet(false, true))
LOG.warn("Bad statistics configuration. Latencies will be incorrect in {}", this);
if (!baseRequest.getResponse().isCommitted())
response.sendError(HttpStatus.SERVICE_UNAVAILABLE_503);
}
handler.handle(path, baseRequest, request, response);
}
finally
{
final long now = System.currentTimeMillis();
final long dispatched = now - start;

_dispatchedStats.decrement();
long numRequests = -1;
long numDispatches = _dispatchedStats.decrement();
_dispatchedTimeStats.record(dispatched);

if (state.isSuspended())
if (state.isInitial())
{
if (state.isInitial())
if (state.isAsyncStarted())
{
state.addListener(_onCompletion);
_asyncWaitStats.increment();
}
}
else if (state.isInitial())
{
long d = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);

// If we have no more dispatches, should we signal shutdown?
FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
else
{
response.flushBuffer();
if (d == 0)
shutdown.succeeded();
numRequests = _requestStats.decrement();
_requestTimeStats.record(dispatched);
updateResponse(baseRequest);
}
}
// else onCompletion will handle it.

FutureCallback shutdown = _shutdown.get();
if (shutdown != null)
{
response.flushBuffer();
if (_gracefulShutdownWaitsForRequests ? (numRequests == 0) : (numDispatches == 0))
shutdown.succeeded();
}
}
}

Expand Down Expand Up @@ -251,6 +247,8 @@ protected void updateResponse(Request request)
@Override
protected void doStart() throws Exception
{
if (getHandler() == null)
throw new IllegalStateException("StatisticsHandler has no Wrapped Handler");
_shutdown.cancel();
super.doStart();
statsReset();
Expand All @@ -263,6 +261,29 @@ protected void doStop() throws Exception
super.doStop();
}

/**
* Set whether the graceful shutdown should wait for all requests to complete including
* async requests which are not currently dispatched, or whether it should only wait for all the
* actively dispatched requests to complete.
* @param gracefulShutdownWaitsForRequests true to wait for async requests on graceful shutdown.
*/
public void setGracefulShutdownWaitsForRequests(boolean gracefulShutdownWaitsForRequests)
{
_gracefulShutdownWaitsForRequests = gracefulShutdownWaitsForRequests;
}

/**
* @return whether the graceful shutdown will wait for all requests to complete including
* async requests which are not currently dispatched, or whether it will only wait for all the
* actively dispatched requests to complete.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("if graceful shutdown will wait for all requests")
public boolean getGracefulShutdownWaitsForRequests()
{
return _gracefulShutdownWaitsForRequests;
}

/**
* @return the number of requests handled by this handler
* since {@link #statsReset()} was last called, excluding
Expand Down Expand Up @@ -467,6 +488,16 @@ public int getExpires()
return _expires.intValue();
}

/**
* @return the number of async errors that occurred.
* @see #getAsyncDispatches()
*/
@ManagedAttribute("number of async errors that occurred")
public int getErrors()
{
return _errors.intValue();
}

/**
* @return the number of responses with a 1xx status returned by this context
* since {@link #statsReset()} was last called.
Expand Down
Expand Up @@ -330,8 +330,9 @@ public void run()
assertThat(response, containsString(" 200 OK"));
assertThat(response, containsString("read 10/10"));

assertThat(stats.getRequests(), is(2));
assertThat(stats.getResponses5xx(), is(1));
// The StatisticsHandler was shutdown when it received the second request so does not contribute to the stats.
assertThat(stats.getRequests(), is(1));
assertThat(stats.getResponses4xx(), is(0));
}
}

Expand Down

0 comments on commit 001def4

Please sign in to comment.