From a6723a21eb783d7feede0971ff69b5901af214cb Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Fri, 13 Dec 2019 11:52:25 +1100 Subject: [PATCH] Issue #4331 Close Complete Code cleanup. Use a CLOSE state rather than non null closedCallback to be clearer that it is a state. Renamed close(Callback) to complete(Callback) Renamed and simplified closed() to completed() Signed-off-by: Greg Wilkins --- .../org/eclipse/jetty/server/HttpChannel.java | 4 +- .../jetty/server/HttpChannelState.java | 5 +- .../org/eclipse/jetty/server/HttpOutput.java | 135 +++++++++--------- .../org/eclipse/jetty/server/HttpWriter.java | 4 +- .../org/eclipse/jetty/server/Response.java | 14 +- .../eclipse/jetty/server/ResponseWriter.java | 4 +- 6 files changed, 87 insertions(+), 79 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 9134588cfd99..42794c91460f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -509,7 +509,7 @@ public boolean handle() // TODO that is done. // Set a close callback on the HttpOutput to make it an async callback - _response.closeOutput(Callback.from(_state::completed)); + _response.completeOutput(Callback.from(_state::completed)); break; } @@ -1212,7 +1212,7 @@ public void failed(final Throwable x) @Override public void succeeded() { - _response.getHttpOutput().closed(); + _response.getHttpOutput().completed(); super.failed(x); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 337a803df60d..a6593f8c734e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -888,6 +888,9 @@ public void sendError(int code, String message) if (LOG.isDebugEnabled()) LOG.debug("sendError {}", toStringLocked()); + if (_outputState != OutputState.OPEN) + throw new IllegalStateException(_outputState.toString()); + switch (_state) { case HANDLING: @@ -969,7 +972,7 @@ protected void completed() } // release any aggregate buffer from a closing flush - _channel.getResponse().getHttpOutput().closed(); + _channel.getResponse().getHttpOutput().completed(); if (event != null) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 0294c2b346f1..828beeaccb2e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -83,6 +83,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable | v | v UNREADY +---ASYNC +TODO rework this for the CLOSE state: OPEN/BLOCKING----close---------+ CLOSED/BLOCKING / | ^ \ ^ @@ -109,9 +110,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable swl = setWriteListener w = write - <>close = close sets _closeCallback only. - owc = onWriteComplete(false,null),_closeCallback==null - owcC = onWriteComplete(false,null),_closeCallback!=null + <>close = close sets _completeCallback only. + owc = onWriteComplete(false,null),_completeCallback==null + owcC = onWriteComplete(false,null),_completeCallback!=null isf = isReady()==false ist = osReady()==true */ @@ -128,6 +129,7 @@ enum ApiState enum State { OPEN, // Open + CLOSE, // Close needed from onWriteCompletion CLOSING, // Close in progress after close API called CLOSED // Closed } @@ -297,7 +299,7 @@ void onWriteComplete(boolean last, Throwable failure) LOG.debug("onWriteComplete", failure); boolean wake = false; - Callback callback = null; + Callback closedCallback = null; boolean release = false; ByteBuffer closeContent = null; synchronized (_channelState) @@ -309,14 +311,16 @@ void onWriteComplete(boolean last, Throwable failure) if (_state == State.CLOSING || last || failure != null) { _state = State.CLOSED; - callback = _closedCallback; + closedCallback = _closedCallback; _closedCallback = null; release = true; } - // Did somebody call close(Callback) while we were writing? - if (_closedCallback != null && _state == State.OPEN) + // Did somebody require a close while we were writing? + if (_state == State.CLOSE) { + // We can now send a (probably empty) last buffer and then when it completes + // onWriteCompletion will be called again to actually execute the _completeCallback _state = State.CLOSING; closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; } @@ -357,18 +361,18 @@ void onWriteComplete(boolean last, Throwable failure) if (closeContent != null) { - channelWrite(closeContent, true, new AsyncCloseCB(Callback.NOOP)); + channelWrite(closeContent, true, new WriteCompleteCB()); return; } try { - if (callback != null) + if (closedCallback != null) { if (failure == null) - callback.succeeded(); + closedCallback.succeeded(); else - callback.failed(failure); + closedCallback.failed(failure); } } finally @@ -378,30 +382,23 @@ void onWriteComplete(boolean last, Throwable failure) if (wake) _channel.execute(_channel); // TODO can we call directly? Why execute? } - } - public void closedBySendError() + public void softClose() { synchronized (_channelState) { - switch (_apiState) - { - case BLOCKING: - case BLOCKED: - case READY: - case ASYNC: - _softClose = true; - return; - - default: - throw new IllegalStateException(stateString()); - } + _softClose = true; } } - public void close(Callback callback) + public void complete(Callback callback) { + // This method is invoked for the COMPLETE action handling in + // HttpChannel.handle. The callback passed typically will call completed + // to finish the request cycle and so may need to asynchronously wait for: + // a pending/blocked operation to finish and then either an async close or + // wait for an application close to complete. boolean succeeded = false; Throwable error = null; ByteBuffer content = null; @@ -413,6 +410,7 @@ public void close(Callback callback) succeeded = true; break; + case CLOSE: case CLOSING: _closedCallback = Callback.combine(_closedCallback, callback); break; @@ -424,9 +422,12 @@ public void close(Callback callback) break; } + _closedCallback = Callback.combine(_closedCallback, callback); + switch (_apiState) { case BLOCKING: + // Output is idle blocking state, but we still do an async close _apiState = ApiState.BLOCKED; _state = State.CLOSING; content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; @@ -434,6 +435,7 @@ public void close(Callback callback) case ASYNC: case READY: + // Output is idle in async state, so we can do an async close _apiState = ApiState.PENDING; _state = State.CLOSING; content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; @@ -442,7 +444,9 @@ public void close(Callback callback) case BLOCKED: case UNREADY: case PENDING: - _closedCallback = Callback.combine(_closedCallback, callback); + // Output is currently doing an operation, so we just move to state to trigger close when + // that operation completes + _state = State.CLOSE; break; } break; @@ -465,7 +469,19 @@ public void close(Callback callback) } if (content != null) - channelWrite(content, true, new AsyncCloseCB(callback)); + channelWrite(content, true, new WriteCompleteCB()); + } + + /** + * Called to indicate that the request cycle has been completed. + */ + public void completed() + { + synchronized (_channelState) + { + _state = State.CLOSED; + } + releaseBuffer(); } @Override @@ -503,10 +519,10 @@ else if (_state != State.CLOSED) // An async operation is in progress, so we soft close now _softClose = true; - // If we are OPEN and we will not close in onWriteComplete, - if (_state == State.OPEN && _closedCallback == null) - // then use a NOOP to trigger a close from onWriteComplete - _closedCallback = Callback.NOOP; + // If we are OPEN, + if (_state == State.OPEN) + // then trigger a close from onWriteComplete + _state = State.CLOSE; break; case ASYNC: @@ -518,7 +534,12 @@ else if (_state != State.CLOSED) break; case BLOCKED: - // A blocking operation is in progress. Let's just block until it is complete + // A blocking operation is in progress. + // If we are still OPEN + if (_state == State.OPEN) + // we will close from onWriteComplete + _state = State.CLOSE; + // and block until CLOSED blocker = _writeBlocker.acquire(); _closedCallback = Callback.combine(_closedCallback, blocker); break; @@ -567,7 +588,7 @@ else if (_state != State.CLOSED) if (blocker == null) { // Do an async close - channelWrite(content, true, new AsyncCloseCB(Callback.NOOP)); + channelWrite(content, true, new WriteCompleteCB()); } else { @@ -586,29 +607,6 @@ else if (_state != State.CLOSED) } } - /** - * Called to indicate that the output has been closed externally - * via other means that may not have involve writing - * the last chunk. - */ - public void closed() - { - // TODO do we really need this - if so document why!!!!! - Callback callback = null; - synchronized (_channelState) - { - if (_state != State.CLOSED) - { - callback = _closedCallback; // TODO is this ever non null???? - _closedCallback = null; - _state = State.CLOSED; - } - } - releaseBuffer(); - if (callback != null) - callback.succeeded(); - } - public ByteBuffer getBuffer() { return _aggregate; @@ -713,7 +711,6 @@ public void flush() throws IOException private void checkWritable() throws EofException { - // TODO check this if (_softClose) throw new EofException("Closed"); @@ -1478,8 +1475,7 @@ public void run() } finally { - // Initiate an async close - close(Callback.NOOP); + IO.close(this); } } @@ -1844,25 +1840,34 @@ protected long getIdleTimeout() } } - private class AsyncCloseCB extends Callback.Nested + private class WriteCompleteCB implements Callback { - public AsyncCloseCB(Callback callback) + final Callback _callback; + + public WriteCompleteCB() { - super(callback); + this(null); + } + + public WriteCompleteCB(Callback callback) + { + _callback = callback; } @Override public void succeeded() { onWriteComplete(true, null); - super.succeeded(); + if (_callback != null) + _callback.succeeded(); } @Override public void failed(Throwable x) { onWriteComplete(true, x); - super.failed(x); + if (_callback != null) + _callback.succeeded(); } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java index 7a1c4dde6e32..d225d89adbe0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java @@ -48,9 +48,9 @@ public void close() throws IOException _out.close(); } - public void close(Callback callback) + public void complete(Callback callback) { - _out.close(callback); + _out.complete(callback); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 935a695baa50..027eed491d6d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -148,7 +148,7 @@ public void reopen() public void softClose() { setErrorSent(true); - _out.closedBySendError(); + _out.softClose(); } /** @@ -496,7 +496,7 @@ public void sendRedirect(int code, String location) throws IOException resetBuffer(); setHeader(HttpHeader.LOCATION, location); setStatus(code); - closeOutput(); + completeOutput(); } @Override @@ -788,7 +788,7 @@ public void setContentLength(int len) { try { - closeOutput(); + completeOutput(); } catch (IOException e) { @@ -826,7 +826,7 @@ public boolean isContentComplete(long written) return (_contentLength < 0 || written >= _contentLength); } - public void closeOutput() throws IOException + public void completeOutput() throws IOException { if (_outputType == OutputType.WRITER) _writer.close(); @@ -834,12 +834,12 @@ public void closeOutput() throws IOException _out.close(); } - public void closeOutput(Callback callback) + public void completeOutput(Callback callback) { if (_outputType == OutputType.WRITER) - _writer.close(callback); + _writer.complete(callback); else - _out.close(callback); + _out.complete(callback); } public long getLongContentLength() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java index 65ae95825db0..7837c9281844 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java @@ -172,13 +172,13 @@ public void close() } } - public void close(Callback callback) + public void complete(Callback callback) { synchronized (lock) { _isClosed = true; } - _httpWriter.close(callback); + _httpWriter.complete(callback); } @Override