diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index b56e73e32562..59d338ed55d8 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -72,26 +72,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable | v READY ------ write/flush/close ------> PENDING - ^ ^ | | - | | | | + ^ ^ | | | + | | | | | | +----------isReady==true----------+ | | - | | | | - |onWriteComplete | | |onWriteComplete - | | | | + | +----------------------------+ | | | + |onWriteComplete | | | |onWriteComplete + | | | | | | +----------isReady==false------------+ | - | | | | - | v | v - UNREADY +---ASYNC + | | | | | + | v | | v + UNREADY | +---ASYNC + */ - enum ApiState - { - BLOCKING, // Open in blocking mode - BLOCKED, // Blocked in blocking operation - ASYNC, // Open in async mode - READY, // isReady() has returned true - PENDING, // write operating in progress - UNREADY, // write operating in progress, isReady has returned false - } /* OPEN/BLOCKING ---- close ----> CLOSING/BLOCKED @@ -107,7 +99,47 @@ enum ApiState CLOSING/READY ------- onWriteComplete ----> CLOSED/READY CLOSING/PENDING ----- onWriteComplete ----> CLOSED/ASYNC CLOSING/UNREADY ----- onWriteComplete ----> CLOSED/UNREADY + + + + + + OPEN/BLOCKING -- close -----------+ CLOSING/BLOCKING + / | ^ \ / | ^ + / | | \ / | | + / v | \ / v | + | OPEN/BLOCKED ----close ---+ +--|->CLOSING/BLOCKED + \ ^ | \ + \ +----------------+ \ + v v + OPEN/READY -----close-----------+ CLOSING/READY CLOSED/READY + ^^ | \ \ ^ | | + // | \ \ / | | + // v \ \ / v | + /| OPEN/PENDING-|---close ---+ +----|->CLOSING/PENDING | + | \ / | ^ / | / \ / | | + | / | +--/-------------+ / / +--------------------+ | + | / \ v v / / \ v v + \| OPEN/ASYNC -----close ----------+ | CLOSING/ASYNC CLOSED/ASYNC + \\ \ + \\ \ + \v v + OPEN/UNREADY ----close --+ CLOSING/UNREADY + ^ | + +---------------+ + + */ + enum ApiState + { + BLOCKING, // Open in blocking mode + BLOCKED, // Blocked in blocking operation + ASYNC, // Open in async mode + READY, // isReady() has returned true + PENDING, // write operating in progress + UNREADY, // write operating in progress, isReady has returned false + } + enum State { OPEN, // Open @@ -297,6 +329,7 @@ void onWriteComplete(boolean last, Throwable failure) release = true; } + // Did somebody call close(Callback) while we were writing? if (_closedCallback != null && _state == State.OPEN) { _state = State.CLOSING; @@ -431,6 +464,9 @@ public void close(Callback callback) } } + if (LOG.isDebugEnabled()) + LOG.debug("close({}) {} s={} e={}, c={}", callback, stateString(), succeeded, error, BufferUtil.toDetailString(content)); + if (succeeded) { callback.succeeded(); @@ -444,30 +480,124 @@ public void close(Callback callback) } if (content != null) - channelWrite(content, true, - new AsyncCloseCB(callback)); + channelWrite(content, true, new AsyncCloseCB(callback)); } @Override public void close() throws IOException { + // This close is not implemented as a call to close(Callback) with + // a blocking callback because in some cases we need to make this + // call async - ie it returns immediately and the close happens in + // the background + if (_channel.getResponse().isIncluding()) { + if (LOG.isDebugEnabled()) + LOG.debug("close() include softclose"); _softClose = true; flush(); return; } - try (Blocker blocker = _writeBlocker.acquire()) + ByteBuffer content = null; + Throwable error = null; + Blocker blocker = null; + synchronized (_channelState) { - close(blocker); - blocker.block(); - onWriteComplete(true, null); + if (_onError != null) + { + error = _onError; + } + else if (_state != State.CLOSED) + { + switch (_apiState) + { + case UNREADY: + case PENDING: + // An async operation is in progress, so we soft close now + _softClose = true; + + // If we are OPEN and we will not close in onWriteComplete, + if (_state == State.OPEN && _closedCallback == null) + // then use a NOOP to trigger a close from onWriteComplete + _closedCallback = Callback.NOOP; + break; + + case ASYNC: + case READY: + // We are async, but with no outstanding operation, so we close asynchronously + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case BLOCKED: + // A blocking operation is in progress. Let's just block until it is complete + blocker = _writeBlocker.acquire(); + _closedCallback = Callback.combine(_closedCallback, blocker); + break; + + case BLOCKING: + // Do a blocking close + blocker = _writeBlocker.acquire(); + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + default: + throw new IllegalStateException(stateString()); + } + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("close() {} e={}, c={}, b={}", stateString(), error, BufferUtil.toDetailString(content), blocker); + + // Throw any error + if (error != null) + { + if (error instanceof IOException) + throw (IOException)error; + if (error instanceof RuntimeException) + throw (RuntimeException)error; + if (error instanceof Error) + throw (Error)error; + throw new IOException(error); } - catch (Throwable failure) + + if (content == null) { - onWriteComplete(true, failure); - throw failure; + if (blocker == null) + // nothing to do or block for. + return; + + // Just wait for another close to finish. + try (Blocker b = blocker) + { + b.block(); + } + } + else + { + if (blocker == null) + { + // Do an async close + channelWrite(content, true, new AsyncCloseCB(Callback.NOOP)); + } + else + { + // Do a blocking close + try (Blocker b = blocker) + { + channelWrite(content, true, blocker); + b.block(); + onWriteComplete(true, null); + } + catch (Throwable t) + { + onWriteComplete(true, t); + } + } } } @@ -478,12 +608,13 @@ public void close() throws IOException */ public void closed() { + // TODO do we really need this - if so document why!!!!! Callback callback = null; synchronized (_channelState) { if (_state != State.CLOSED) { - callback = _closedCallback; + callback = _closedCallback; // TODO is this ever non null???? _closedCallback = null; _state = State.CLOSED; } @@ -1369,7 +1500,7 @@ public void run() private String stateString() { - return String.format("s=%s,api=%s", _state, _apiState); + return String.format("s=%s,api=%s,sc=%b,e=%s", _state, _apiState, _softClose, _onError); } @Override diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 5c7c1ed5d935..1932837f8ffd 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -175,9 +175,9 @@ public static Stream tests() tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), false, 499, "Test async sendError"}); tests.add(new Object[]{new AsyncReadyCompleteHandler(), false, 200, __data}); tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), false, 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), true, 200, __data}); + tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), false, 200, __data}); tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), false, 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), true, 200, __data}); + tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), false, 200, __data}); tests.add(new Object[]{new BlockingWriteCompleteHandler(), true, 200, __data}); return tests.stream().map(Arguments::of); } @@ -351,7 +351,7 @@ public void onError(Throwable t) @Override public String toString() { - return String.format("%s@%x{ur=%b,c=%b}", this.getClass().getSimpleName(), hashCode(), _unReady, _close); + return String.format("AWCH@%x{ur=%b,c=%b}", hashCode(), _unReady, _close); } }