From c5acf965067478784b54e2d241ec58fdb0b2c9fe Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 19 Dec 2019 12:17:11 +1100 Subject: [PATCH] Jetty 9.4.x 4331 async close complete3 (#4409) * Issue #4376 Async Content Complete Added test harness to reproduce unready completing write. Fixed test by not closing output prior to becoming READY Signed-off-by: Greg Wilkins * Issue #4331 Async Write Complete Test harness to reproduce unready when closing/completing. Signed-off-by: Greg Wilkins * Issue #4331 Async Write Complete test both PENDING and UNREADY Signed-off-by: Greg Wilkins * Issue #4331 Async Write Complete test cleanups Signed-off-by: Greg Wilkins * Issue #4331 Async Close Complete Cleanups of write Signed-off-by: Greg Wilkins * WIP Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Work in progress Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Added async close to HttpWriter and ResponseWriter Always use async close, with blocker if necessary. Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Working async close complete! Signed-off-by: Greg Wilkins * Issue #4331 Close Complete invert test as we can now call complete when not ready! Signed-off-by: Greg Wilkins * Issue #4331 Close Complete fixed transition to ERROR state Signed-off-by: Greg Wilkins * Issue #4331 Close Complete async close after onError Signed-off-by: Greg Wilkins * Issue #4331 Close Complete minor cleanups Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Fix for proxy tests Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Fix write loop to handle clear of p=0,l=0 rather than p=l Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Removed old close on all content mechanism Cleanups and some more TODOs Signed-off-by: Greg Wilkins * Issue #4331 Close Complete a reworking of HttpOutput to separate out API state. Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Soft close for Dispatcher release buffer in onWriteComplete Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Set _onError in onWriteComplete NOOP callback instead of null Signed-off-by: Greg Wilkins * Issue #4331 Close Complete failure closes HttpOutput Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Moved closedCallback handling to onWriteComplete Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Additional test of complete during blocking write. Signed-off-by: Greg Wilkins * Issue #4331 Close Complete reimplemented blocking close to sometimes be async Signed-off-by: Greg Wilkins * Issue #4331 Close Complete ascii "art" Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Code cleanup. Use a CLOSE state rather than non null closedCallback to be clearer that it is a state. Renamed close(Callback) to complete(Callback) Renamed and simplified closed() to completed() Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Do not dispatch Better ascii art improved close impl to be similar to complete Signed-off-by: Greg Wilkins * Issue #4331 Close Complete More test cases Signed-off-by: Greg Wilkins * Issue #4331 Close Complete retain execute behaviour in 9.4. review in 10. Signed-off-by: Greg Wilkins * Improved javadoc and ascii art * Improved CLOSING Switch to CLOSING state as soon as last write is done, even if several non last channelWrites will be done. This allows a subsequent call to close to know that nothing needs to be written and can avoid some EOF exceptions. Now onWriteComplete acts only on the passed in last parameter. Added test for sendContent * WIP Aggregate within lock pipeline test debug * Avoid creating ignored exception when Idle or Failed. * Try a parse without fill to avoid unconsumed input debug * fixed pipeline size * release buffer before callback * turn off debug Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Better javadoc refactored onWriteComplete logic to be simpler fixed bug with flush of last written byte Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Completely reworked test harness for better coverage. Signed-off-by: Greg Wilkins * Issue #4331 Close Complete Reworked order of ifs to match logic above in onWriteComplete Signed-off-by: Greg Wilkins --- .../jetty/http2/client/AsyncIOTest.java | 7 +- .../org/eclipse/jetty/io/WriteFlusher.java | 10 +- .../jetty/server/AsyncContextState.java | 6 +- .../org/eclipse/jetty/server/Dispatcher.java | 53 +- .../jetty/server/EncodingHttpWriter.java | 7 +- .../org/eclipse/jetty/server/HttpChannel.java | 4 +- .../jetty/server/HttpChannelState.java | 7 +- .../eclipse/jetty/server/HttpConnection.java | 29 +- .../org/eclipse/jetty/server/HttpOutput.java | 1498 ++++++++++------- .../org/eclipse/jetty/server/HttpWriter.java | 10 +- .../jetty/server/Iso88591HttpWriter.java | 7 +- .../org/eclipse/jetty/server/Response.java | 19 +- .../eclipse/jetty/server/ResponseWriter.java | 12 +- .../eclipse/jetty/server/Utf8HttpWriter.java | 7 +- .../jetty/server/AsyncCompletionTest.java | 625 ++++++- .../jetty/server/HttpServerTestBase.java | 17 +- .../java/org/eclipse/jetty/util/Callback.java | 52 + .../eclipse/jetty/util/thread/Invocable.java | 14 + 18 files changed, 1600 insertions(+), 784 deletions(-) diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java index 1881a10a6d3e..b91cb162655e 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java @@ -41,7 +41,6 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; -import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.util.Callback; @@ -243,8 +242,8 @@ public void onWritePossible() throws IOException // The write is too large and will stall. output.write(ByteBuffer.wrap(new byte[2 * clientWindow])); - // We cannot call complete() now before checking for isReady(). - // This will abort the response and the client will receive a reset. + // We can now call complete() now before checking for isReady(). + // This will asynchronously complete when the write is finished. asyncContext.complete(); } @@ -275,7 +274,7 @@ public Map onPreface(Session session) session.newStream(frame, promise, new Stream.Listener.Adapter() { @Override - public void onReset(Stream stream, ResetFrame frame) + public void onClosed(Stream stream) { latch.countDown(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 743f0a1fcef3..84cabdf5a708 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -518,7 +518,15 @@ public boolean onFail(Throwable cause) public void onClose() { - onFail(new ClosedChannelException()); + switch (_state.get().getType()) + { + case IDLE: + case FAILED: + return; + + default: + onFail(new ClosedChannelException()); + } } boolean isFailed() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java index d02f9768e1f4..eb30265ba33c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java @@ -146,7 +146,11 @@ public void start(final Runnable task) @Override public void run() { - state().getAsyncContextEvent().getContext().getContextHandler().handle(channel.getRequest(), task); + ContextHandler.Context context = state().getAsyncContextEvent().getContext(); + if (context == null) + task.run(); + else + context.getContextHandler().handle(channel.getRequest(), task); } }); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java index 09249f955ab7..9251b202e97a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java @@ -220,7 +220,7 @@ protected void forward(ServletRequest request, ServletResponse response, Dispatc _contextHandler.handle(_pathInContext, baseRequest, (HttpServletRequest)request, (HttpServletResponse)response); if (!baseRequest.getHttpChannelState().isAsync()) - commitResponse(response, baseRequest); + baseRequest.getResponse().softClose(); } } finally @@ -242,57 +242,6 @@ public String toString() return String.format("Dispatcher@0x%x{%s,%s}", hashCode(), _named, _uri); } - @SuppressWarnings("Duplicates") - private void commitResponse(ServletResponse response, Request baseRequest) throws IOException, ServletException - { - if (baseRequest.getResponse().isWriting()) - { - try - { - // Try closing Writer first (based on knowledge in Response obj) - response.getWriter().close(); - } - catch (IllegalStateException ex) - { - try - { - // Try closing OutputStream as alternate route - // This path is possible due to badly behaving Response wrappers - response.getOutputStream().close(); - } - catch (IllegalStateException ex2) - { - ServletException servletException = new ServletException("Unable to commit the response", ex2); - servletException.addSuppressed(ex); - throw servletException; - } - } - } - else - { - try - { - // Try closing OutputStream first (based on knowledge in Response obj) - response.getOutputStream().close(); - } - catch (IllegalStateException ex) - { - try - { - // Try closing Writer as alternate route - // This path is possible due to badly behaving Response wrappers - response.getWriter().close(); - } - catch (IllegalStateException ex2) - { - ServletException servletException = new ServletException("Unable to commit the response", ex2); - servletException.addSuppressed(ex); - throw servletException; - } - } - } - } - private class ForwardAttributes implements Attributes { final Attributes _attr; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java index 23a2413903c1..ab101829f9e0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java @@ -47,16 +47,11 @@ public EncodingHttpWriter(HttpOutput out, String encoding) public void write(char[] s, int offset, int length) throws IOException { HttpOutput out = _out; - if (length == 0 && out.isAllContentWritten()) - { - out.close(); - return; - } while (length > 0) { _bytes.reset(); - int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length; + int chars = Math.min(length, MAX_OUTPUT_CHARS); _converter.write(s, offset, chars); _converter.flush(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 9134588cfd99..42794c91460f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -509,7 +509,7 @@ public boolean handle() // TODO that is done. // Set a close callback on the HttpOutput to make it an async callback - _response.closeOutput(Callback.from(_state::completed)); + _response.completeOutput(Callback.from(_state::completed)); break; } @@ -1212,7 +1212,7 @@ public void failed(final Throwable x) @Override public void succeeded() { - _response.getHttpOutput().closed(); + _response.getHttpOutput().completed(); super.failed(x); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 74e72098f64f..c2b0dc05952b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -889,6 +889,9 @@ public void sendError(int code, String message) if (LOG.isDebugEnabled()) LOG.debug("sendError {}", toStringLocked()); + if (_outputState != OutputState.OPEN) + throw new IllegalStateException(_outputState.toString()); + switch (_state) { case HANDLING: @@ -902,7 +905,7 @@ public void sendError(int code, String message) throw new IllegalStateException("Response is " + _outputState); response.setStatus(code); - response.closedBySendError(); + response.softClose(); request.setAttribute(ErrorHandler.ERROR_CONTEXT, request.getErrorContext()); request.setAttribute(ERROR_REQUEST_URI, request.getRequestURI()); @@ -970,7 +973,7 @@ protected void completed() } // release any aggregate buffer from a closing flush - _channel.getResponse().getHttpOutput().closed(); + _channel.getResponse().getHttpOutput().completed(); if (event != null) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index e42223a751e7..a73aefbcc75e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -407,20 +407,25 @@ public void onCompleted() } else if (_parser.inContentState() && _generator.isPersistent()) { - // If we are async, then we have problems to complete neatly - if (_input.isAsync()) + // Try to progress without filling. + parseRequestBuffer(); + if (_parser.inContentState()) { - if (LOG.isDebugEnabled()) - LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this); - _channel.abort(new IOException("unconsumed input")); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this); - // Complete reading the request - if (!_input.consumeAll()) + // If we are async, then we have problems to complete neatly + if (_input.isAsync()) + { + if (LOG.isDebugEnabled()) + LOG.debug("{}unconsumed input while async {}", _parser.isChunking() ? "Possible " : "", this); _channel.abort(new IOException("unconsumed input")); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this); + // Complete reading the request + if (!_input.consumeAll()) + _channel.abort(new IOException("unconsumed input")); + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f651179862ab..3e98573abce4 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.server; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -31,7 +30,6 @@ import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; @@ -44,7 +42,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IteratingCallback; -import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; @@ -63,29 +60,68 @@ public class HttpOutput extends ServletOutputStream implements Runnable { private static final String LSTRING_FILE = "javax.servlet.LocalStrings"; - private static final Callback BLOCKING_CLOSE_CALLBACK = new Callback() {}; private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE); - /* - ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED - -------------------------------------------------------------------------------------------------- - setWriteListener() READY->owp ise ise ise ise ise ise - write() OPEN ise PENDING wpe wpe eof eof - flush() OPEN ise PENDING wpe wpe eof eof - close() CLOSING CLOSING CLOSING CLOSED CLOSED CLOSING CLOSED - isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true CLOSED:true - write completed - - - ASYNC READY->owp CLOSED - - */ + /** + * The output state + */ enum State { - OPEN, // Open in blocking mode + OPEN, // Open + CLOSE, // Close needed from onWriteCompletion + CLOSING, // Close in progress after close API called + CLOSED // Closed + } + + /** + * The API State which combines with the output State: + *
+              OPEN/BLOCKING---last----------------------------+                      CLOSED/BLOCKING
+             /   |    ^                                        \                         ^  ^
+            /    w    |                                         \                       /   |
+           /     |   owc   +--owcL------------------->--owcL-----\---------------------+    |
+          |      v    |   /                         /             V                         |
+         swl  OPEN/BLOCKED----last---->CLOSE/BLOCKED----owc----->CLOSING/BLOCKED--owcL------+
+          |
+           \
+            \
+             V
+          +-->OPEN/READY------last---------------------------+
+         /    ^    |                                          \
+        /    /     w                                           \
+       |    /      |       +--owcL------------------->--owcL----\---------------------------+
+       |   /       v      /                         /            V                          |
+       | irt  OPEN/PENDING----last---->CLOSE/PENDING----owc---->CLOSING/PENDING--owcL----+  |
+       |   \  /    |                        |                    ^     |                 |  |
+      owc   \/    owc                      irf                  /     irf                |  |
+       |    /\     |                        |                  /       |                 |  |
+       |   /  \    V                        |                 /        |                 V  V
+       | irf  OPEN/ASYNC------last----------|----------------+         |             CLOSED/ASYNC
+       |   \                                |                          |                 ^  ^
+        \   \                               |                          |                 |  |
+         \   \                              |                          |                 |  |
+          \   v                             v                          v                 |  |
+           +--OPEN/UNREADY----last---->CLOSE/UNREADY----owc----->CLOSING/UNREADY--owcL---+  |
+                          \                         \                                       |
+                           +--owcL------------------->--owcL--------------------------------+
+
+      swl  : setWriteListener
+      w    : write
+      owc  : onWriteComplete last == false
+      owcL : onWriteComplete last == true
+      irf  : isReady() == false
+      irt  : isReady() == true
+      last : close() or complete(Callback) or write of known last content
+     
+ */ + enum ApiState + { + BLOCKING, // Open in blocking mode + BLOCKED, // Blocked in blocking operation ASYNC, // Open in async mode READY, // isReady() has returned true PENDING, // write operating in progress UNREADY, // write operating in progress, isReady has returned false - ERROR, // An error has occured - CLOSING, // Asynchronous close in progress - CLOSED // Closed } /** @@ -153,9 +189,12 @@ default void resetBuffer() throws IllegalStateException private static Logger LOG = Log.getLogger(HttpOutput.class); private static final ThreadLocal _encoder = new ThreadLocal<>(); - private final AtomicReference _state = new AtomicReference<>(State.OPEN); private final HttpChannel _channel; + private final HttpChannelState _channelState; private final SharedBlockingCallback _writeBlocker; + private ApiState _apiState = ApiState.BLOCKING; + private State _state = State.OPEN; + private boolean _softClose = false; private Interceptor _interceptor; private long _written; private long _flushed; @@ -165,11 +204,12 @@ default void resetBuffer() throws IllegalStateException private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; - private Callback _closeCallback; + private Callback _closedCallback; public HttpOutput(HttpChannel channel) { _channel = channel; + _channelState = channel.getState(); _interceptor = channel; _writeBlocker = new WriteBlocker(channel); HttpConfiguration config = channel.getHttpConfiguration(); @@ -209,18 +249,10 @@ public long getWritten() public void reopen() { - _state.set(State.OPEN); - } - - private boolean isLastContentToWrite(int len) - { - _written += len; - return _channel.getResponse().isAllContentWritten(_written); - } - - public boolean isAllContentWritten() - { - return _channel.getResponse().isAllContentWritten(_written); + synchronized (_channelState) + { + _softClose = false; + } } protected Blocker acquireWriteBlockingCallback() throws IOException @@ -228,27 +260,16 @@ protected Blocker acquireWriteBlockingCallback() throws IOException return _writeBlocker.acquire(); } - private void write(ByteBuffer content, boolean complete) throws IOException + private void channelWrite(ByteBuffer content, boolean complete) throws IOException { try (Blocker blocker = _writeBlocker.acquire()) { - write(content, complete, blocker); + channelWrite(content, complete, blocker); blocker.block(); - if (complete) - closed(); - } - catch (Exception failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - if (failure instanceof IOException) - throw failure; - throw new IOException(failure); } } - protected void write(ByteBuffer content, boolean complete, Callback callback) + private void channelWrite(ByteBuffer content, boolean last, Callback callback) { if (_firstByteTimeStamp == -1) { @@ -258,175 +279,321 @@ protected void write(ByteBuffer content, boolean complete, Callback callback) else _firstByteTimeStamp = Long.MAX_VALUE; } - _interceptor.write(content, complete, callback); - } - private void abort(Throwable failure) - { - closed(); - _channel.abort(failure); + _interceptor.write(content, last, callback); } - public void closedBySendError() + private void onWriteComplete(boolean last, Throwable failure) { - while (true) + String state = null; + boolean wake = false; + Callback closedCallback = null; + ByteBuffer closeContent = null; + synchronized (_channelState) { - State state = _state.get(); - switch (state) - { - case OPEN: - case READY: - case ASYNC: - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - return; + if (LOG.isDebugEnabled()) + state = stateString(); - default: - throw new IllegalStateException(state.toString()); + // Transition to CLOSED state if we were the last write or we have failed + if (last || failure != null) + { + _state = State.CLOSED; + closedCallback = _closedCallback; + _closedCallback = null; + releaseBuffer(); + wake = updateApiState(failure); + } + else if (_state == State.CLOSE) + { + // Somebody called close or complete while we were writing. + // We can now send a (probably empty) last buffer and then when it completes + // onWriteCompletion will be called again to actually execute the _completeCallback + _state = State.CLOSING; + closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + } + else + { + wake = updateApiState(null); } } - } - public void close(Closeable wrapper, Callback callback) - { - _closeCallback = callback; + if (LOG.isDebugEnabled()) + LOG.debug("onWriteComplete({},{}) {}->{} c={} cb={} w={}", + last, failure, state, stateString(), BufferUtil.toDetailString(closeContent), closedCallback, wake); + try { - if (wrapper != null) - wrapper.close(); - if (!isClosed()) - close(); + if (failure != null) + _channel.abort(failure); + + if (closedCallback != null) + { + if (failure == null) + closedCallback.succeeded(); + else + closedCallback.failed(failure); + } + else if (closeContent != null) + { + channelWrite(closeContent, true, new WriteCompleteCB()); + } } - catch (Throwable th) + finally { - closed(); - if (_closeCallback == null) - LOG.ignore(th); - else - callback.failed(th); + if (wake) + _channel.execute(_channel); // TODO review in jetty-10 if execute is needed } - finally + } + + private boolean updateApiState(Throwable failure) + { + boolean wake = false; + switch (_apiState) { - if (_closeCallback != null) - callback.succeeded(); - _closeCallback = null; + case BLOCKED: + _apiState = ApiState.BLOCKING; + break; + + case PENDING: + _apiState = ApiState.ASYNC; + if (failure != null) + { + _onError = failure; + wake = _channelState.onWritePossible(); + } + break; + + case UNREADY: + _apiState = ApiState.READY; + if (failure != null) + _onError = failure; + wake = _channelState.onWritePossible(); + break; + + default: + if (_state == State.CLOSED) + break; + throw new IllegalStateException(stateString()); } + return wake; } - @Override - public void close() + public void softClose() { - Callback closeCallback = _closeCallback == null ? BLOCKING_CLOSE_CALLBACK : _closeCallback; + synchronized (_channelState) + { + _softClose = true; + } + } - while (true) + public void complete(Callback callback) + { + // This method is invoked for the COMPLETE action handling in + // HttpChannel.handle. The callback passed typically will call completed + // to finish the request cycle and so may need to asynchronously wait for: + // a pending/blocked operation to finish and then either an async close or + // wait for an application close to complete. + boolean succeeded = false; + Throwable error = null; + ByteBuffer content = null; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + switch (_state) { - case CLOSING: case CLOSED: - { - _closeCallback = null; - closeCallback.succeeded(); - return; - } - case ASYNC: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Thus we simulate a call to isReady here, by - // trying to move to READY state. Either way we continue. - _state.compareAndSet(state, State.READY); - continue; - } - case UNREADY: - case PENDING: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Because the prior write has not yet completed - // and/or isReady has not been called, this close is allowed, but will - // abort the response. - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - IOException ex = new IOException("Closed while Pending/Unready"); - LOG.warn(ex.toString()); - LOG.debug(ex); - abort(ex); - _closeCallback = null; - closeCallback.failed(ex); - return; - } - default: - { - if (!_state.compareAndSet(state, State.CLOSING)) - continue; + succeeded = true; + break; + + case CLOSE: + case CLOSING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; - // Do a normal close by writing the aggregate buffer or an empty buffer. If we are - // not including, then indicate this is the last write. - try + case OPEN: + if (_onError != null) { - ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - if (closeCallback == BLOCKING_CLOSE_CALLBACK) - { - // Do a blocking close - write(content, !_channel.getResponse().isIncluding()); - _closeCallback = null; - closeCallback.succeeded(); - } - else - { - _closeCallback = null; - write(content, !_channel.getResponse().isIncluding(), closeCallback); - } + error = _onError; + break; } - catch (IOException x) + + _closedCallback = Callback.combine(_closedCallback, callback); + + switch (_apiState) { - LOG.ignore(x); // Ignore it, it's been already logged in write(). - _closeCallback = null; - closeCallback.failed(x); + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case BLOCKED: + case UNREADY: + case PENDING: + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; } - return; - } + break; } } + + if (LOG.isDebugEnabled()) + LOG.debug("complete({}) {} s={} e={}, c={}", callback, stateString(), succeeded, error, BufferUtil.toDetailString(content)); + + if (succeeded) + { + callback.succeeded(); + return; + } + + if (error != null) + { + callback.failed(error); + return; + } + + if (content != null) + channelWrite(content, true, new WriteCompleteCB()); } /** - * Called to indicate that the last write has been performed. - * It updates the state and performs cleanup operations. + * Called to indicate that the request cycle has been completed. */ - public void closed() + public void completed() { - while (true) + synchronized (_channelState) { - State state = _state.get(); - switch (state) + _state = State.CLOSED; + } + releaseBuffer(); + } + + @Override + public void close() throws IOException + { + ByteBuffer content = null; + Blocker blocker = null; + synchronized (_channelState) + { + if (_onError != null) + { + if (_onError instanceof IOException) + throw (IOException)_onError; + if (_onError instanceof RuntimeException) + throw (RuntimeException)_onError; + if (_onError instanceof Error) + throw (Error)_onError; + throw new IOException(_onError); + } + + switch (_state) { case CLOSED: - { - return; - } - case UNREADY: - { - if (_state.compareAndSet(state, State.ERROR)) + break; + + case CLOSE: + case CLOSING: + switch (_apiState) + { + case BLOCKING: + case BLOCKED: + // block until CLOSED state reached. + blocker = _writeBlocker.acquire(); + _closedCallback = Callback.combine(_closedCallback, blocker); + break; + + default: + // async close with no callback, so nothing to do + break; + } + break; + + case OPEN: + switch (_apiState) { - if (_onError == null) - _onError = new EofException("Async closed"); - releaseBuffer(); - return; + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + blocker = _writeBlocker.acquire(); + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case BLOCKED: + // An blocking operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + // and block until it is complete + blocker = _writeBlocker.acquire(); + _closedCallback = Callback.combine(_closedCallback, blocker); + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case UNREADY: + case PENDING: + // An async operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; } break; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("close() {} c={} b={}", stateString(), BufferUtil.toDetailString(content), blocker); + + if (content == null) + { + if (blocker == null) + // nothing to do or block for. + return; + + // Just wait for some other close to finish. + try (Blocker b = blocker) + { + b.block(); + } + } + else + { + if (blocker == null) + { + // Do an async close + channelWrite(content, true, new WriteCompleteCB()); + } + else + { + // Do a blocking close + try (Blocker b = blocker) + { + channelWrite(content, true, blocker); + b.block(); + onWriteComplete(true, null); } - default: + catch (Throwable t) { - if (!_state.compareAndSet(state, State.CLOSED)) - break; - - releaseBuffer(); - return; + onWriteComplete(true, t); } } } @@ -455,305 +622,366 @@ private void releaseBuffer() public boolean isClosed() { - switch (_state.get()) + synchronized (_channelState) { - case CLOSING: - case CLOSED: - return true; - default: - return false; + return _softClose || (_state != State.OPEN); } } public boolean isAsync() { - switch (_state.get()) + synchronized (_channelState) { - case ASYNC: - case READY: - case PENDING: - case UNREADY: - return true; - default: - return false; + switch (_apiState) + { + case ASYNC: + case READY: + case PENDING: + case UNREADY: + return true; + default: + return false; + } } } @Override public void flush() throws IOException { - while (true) + ByteBuffer content = null; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + switch (_state) { - case OPEN: - write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); + case CLOSED: + case CLOSING: return; - case ASYNC: - throw new IllegalStateException("isReady() not called"); + default: + { + switch (_apiState) + { + case BLOCKING: + _apiState = ApiState.BLOCKED; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; - case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - new AsyncFlush().iterate(); - return; + case ASYNC: + case PENDING: + throw new IllegalStateException("isReady() not called: " + stateString()); - case UNREADY: - throw new WritePendingException(); + case READY: + _apiState = ApiState.PENDING; + break; - case ERROR: - throw new EofException(_onError); + case UNREADY: + throw new WritePendingException(); - case PENDING: - case CLOSING: - case CLOSED: - return; + default: + throw new IllegalStateException(stateString()); + } + } + } + } - default: - throw new IllegalStateException(state.toString()); + if (content == null) + new AsyncFlush(false).iterate(); + else + { + try + { + channelWrite(content, false); + onWriteComplete(false, null); + } + catch (Throwable t) + { + onWriteComplete(false, t); + throw t; } } } + private void checkWritable() throws EofException + { + if (_softClose) + throw new EofException("Closed"); + + switch (_state) + { + case CLOSED: + case CLOSING: + throw new EofException("Closed"); + + default: + break; + } + + if (_onError != null) + throw new EofException(_onError); + } + @Override public void write(byte[] b, int off, int len) throws IOException { + boolean last; + boolean aggregate; + boolean flush; + // Async or Blocking ? - while (true) + boolean async; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + checkWritable(); + long written = _written + len; + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + last = _channel.getResponse().isAllContentWritten(written); + // Write will be aggregated if: + // + it is smaller than the commitSize + // + is not the last one, or is last but will fit in an already allocated aggregate buffer. + aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); + flush = last || !aggregate || len >= space; + + if (last && _state == State.OPEN) + _state = State.CLOSING; + + switch (_apiState) { - case OPEN: - // process blocking below + case BLOCKING: + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; + async = false; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Should we aggregate? - boolean last = isLastContentToWrite(len); - if (!last && len <= _commitSize) - { - acquireBuffer(); - - // YES - fill the aggregate with content from the buffer - int filled = BufferUtil.fill(_aggregate, b, off, len); - - // return if we are not complete, not full and filled all the content - if (filled == len && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(_state.get().toString()); - return; - } - - // adjust offset/length - off += filled; - len -= filled; - } - - // Do the asynchronous writing from the callback - new AsyncWrite(b, off, len, last).iterate(); - return; + async = true; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; + break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); - default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(stateString()); } - break; - } - // handle blocking write + _written = written; - // Should we aggregate? - // Yes - if the write is smaller than the commitSize (==aggregate buffer size) - // and the write is not the last one, or is last but will fit in an already allocated aggregate buffer. - boolean last = isLastContentToWrite(len); - if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate))) - { - acquireBuffer(); - - // YES - fill the aggregate with content from the buffer - int filled = BufferUtil.fill(_aggregate, b, off, len); + // Should we aggregate? + if (aggregate) + { + acquireBuffer(); + int filled = BufferUtil.fill(_aggregate, b, off, len); - // return if we are not the last write and have aggregated all of the content - if (!last && filled == len && !BufferUtil.isFull(_aggregate)) - return; + // return if we are not complete, not full and filled all the content + if (!flush) + return; - // adjust offset/length - off += filled; - len -= filled; + // adjust offset/length + off += filled; + len -= filled; + } } - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) + if (async) { - write(_aggregate, last && len == 0); + // Do the asynchronous writing from the callback + new AsyncWrite(b, off, len, last).iterate(); + return; + } - // should we fill aggregate again from the buffer? - if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + // Blocking write + try + { + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) { - BufferUtil.append(_aggregate, b, off, len); - return; + channelWrite(_aggregate, last && len == 0); + + // should we fill aggregate again from the buffer? + if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + { + BufferUtil.append(_aggregate, b, off, len); + return; + } } - } - // write any remaining content in the buffer directly - if (len > 0) - { - // write a buffer capacity at a time to avoid JVM pooling large direct buffers - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 - ByteBuffer view = ByteBuffer.wrap(b, off, len); - while (len > getBufferSize()) + // write any remaining content in the buffer directly + if (len > 0) { - int p = view.position(); - int l = p + getBufferSize(); - view.limit(p + getBufferSize()); - write(view, false); - len -= getBufferSize(); - view.limit(l + Math.min(len, getBufferSize())); - view.position(l); + // write a buffer capacity at a time to avoid JVM pooling large direct buffers + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 + ByteBuffer view = ByteBuffer.wrap(b, off, len); + + while (len > getBufferSize()) + { + int p = view.position(); + int l = p + getBufferSize(); + view.limit(l); + channelWrite(view, false); + view.limit(p + len); + view.position(l); + len -= getBufferSize(); + } + channelWrite(view, last); } - write(view, last); + else if (last) + { + channelWrite(BufferUtil.EMPTY_BUFFER, true); + } + + onWriteComplete(last, null); } - else if (last) + catch (Throwable t) { - write(BufferUtil.EMPTY_BUFFER, true); + onWriteComplete(last, t); + throw t; } } public void write(ByteBuffer buffer) throws IOException { // This write always bypasses aggregate buffer + int len = BufferUtil.length(buffer); + boolean flush; + boolean last; // Async or Blocking ? - while (true) + boolean async; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + checkWritable(); + long written = _written + len; + last = _channel.getResponse().isAllContentWritten(_written); + flush = last || len > 0 || BufferUtil.hasContent(_aggregate); + + if (last && _state == State.OPEN) + _state = State.CLOSING; + + switch (_apiState) { - case OPEN: - // process blocking below + case BLOCKING: + async = false; + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Do the asynchronous writing from the callback - boolean last = isLastContentToWrite(buffer.remaining()); - new AsyncWrite(buffer, last).iterate(); - return; + async = true; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; + break; case PENDING: - case UNREADY: - throw new WritePendingException(); - - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + case UNREADY: + throw new WritePendingException(); default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(stateString()); } - break; + _written = written; } - // handle blocking write - int len = BufferUtil.length(buffer); - boolean last = isLastContentToWrite(len); - - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - write(_aggregate, last && len == 0); + if (!flush) + return; - // write any remaining content in the buffer directly - if (len > 0) - write(buffer, last); - else if (last) - write(BufferUtil.EMPTY_BUFFER, true); + if (async) + { + new AsyncWrite(buffer, last).iterate(); + } + else + { + try + { + // Blocking write + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + channelWrite(_aggregate, last && len == 0); + + // write any remaining content in the buffer directly + if (len > 0) + channelWrite(buffer, last); + else if (last) + channelWrite(BufferUtil.EMPTY_BUFFER, true); + + onWriteComplete(last, null); + } + catch (Throwable t) + { + onWriteComplete(last, t); + throw t; + } + } } @Override public void write(int b) throws IOException { - _written += 1; - boolean complete = _channel.getResponse().isAllContentWritten(_written); - + boolean flush; + boolean last; // Async or Blocking ? - while (true) + + boolean async = false; + synchronized (_channelState) { - switch (_state.get()) - { - case OPEN: - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); + checkWritable(); + long written = _written + 1; + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + last = _channel.getResponse().isAllContentWritten(written); + flush = last || space == 1; - // Check if all written or full - if (complete || BufferUtil.isFull(_aggregate)) - write(_aggregate, complete); + if (last && _state == State.OPEN) + _state = State.CLOSING; + + switch (_apiState) + { + case BLOCKING: + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: - if (!_state.compareAndSet(State.READY, State.PENDING)) - continue; - - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); - - // Check if all written or full - if (!complete && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(); - return; - } - - // Do the asynchronous writing from the callback - new AsyncFlush().iterate(); - return; + async = true; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; + break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); + default: + throw new IllegalStateException(stateString()); + } + _written = written; + + acquireBuffer(); + BufferUtil.append(_aggregate, (byte)b); + } - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + // Check if all written or full + if (!flush) + return; - default: - throw new IllegalStateException(); + if (async) + // Do the asynchronous writing from the callback + new AsyncFlush(last).iterate(); + else + { + try + { + channelWrite(_aggregate, last); + onWriteComplete(last, null); + } + catch (Throwable t) + { + onWriteComplete(last, t); + throw t; } - break; } } @@ -883,7 +1111,7 @@ public void sendContent(ByteBuffer content) throws IOException LOG.debug("sendContent({})", BufferUtil.toDetailString(content)); _written += content.remaining(); - write(content, true); + channelWrite(content, true); } /** @@ -899,13 +1127,6 @@ public void sendContent(InputStream in) throws IOException new InputStreamWritingCB(in, blocker).iterate(); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -921,13 +1142,6 @@ public void sendContent(ReadableByteChannel in) throws IOException new ReadableByteChannelWritingCB(in, blocker).iterate(); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -943,13 +1157,6 @@ public void sendContent(HttpContent content) throws IOException sendContent(content, blocker); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -963,23 +1170,24 @@ public void sendContent(ByteBuffer content, final Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); - _written += content.remaining(); - write(content, true, new Callback.Nested(callback) - { - @Override - public void succeeded() - { - closed(); - super.succeeded(); - } + if (prepareSendContent(content.remaining(), callback)) + channelWrite(content, true, + new Callback.Nested(callback) + { + @Override + public void succeeded() + { + onWriteComplete(true, null); + super.succeeded(); + } - @Override - public void failed(Throwable x) - { - abort(x); - super.failed(x); - } - }); + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + super.failed(x); + } + }); } /** @@ -994,7 +1202,8 @@ public void sendContent(InputStream in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(stream={},{})", in, callback); - new InputStreamWritingCB(in, callback).iterate(); + if (prepareSendContent(0, callback)) + new InputStreamWritingCB(in, callback).iterate(); } /** @@ -1009,54 +1218,62 @@ public void sendContent(ReadableByteChannel in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(channel={},{})", in, callback); - new ReadableByteChannelWritingCB(in, callback).iterate(); + if (prepareSendContent(0, callback)) + new ReadableByteChannelWritingCB(in, callback).iterate(); } - /** - * Asynchronous send of HTTP content. - * - * @param httpContent The HTTP content to send - * @param callback The callback to use to notify success or failure - */ - public void sendContent(HttpContent httpContent, Callback callback) + private boolean prepareSendContent(int len, Callback callback) { - if (LOG.isDebugEnabled()) - LOG.debug("sendContent(http={},{})", httpContent, callback); - - if (BufferUtil.hasContent(_aggregate)) - { - callback.failed(new IOException("cannot sendContent() after write()")); - return; - } - if (_channel.isCommitted()) - { - callback.failed(new IOException("cannot sendContent(), output already committed")); - return; - } - - while (true) + synchronized (_channelState) { - switch (_state.get()) + if (BufferUtil.hasContent(_aggregate)) { - case OPEN: - if (!_state.compareAndSet(State.OPEN, State.PENDING)) - continue; - break; - - case ERROR: - callback.failed(new EofException(_onError)); - return; + callback.failed(new IOException("cannot sendContent() after write()")); + return false; + } + if (_channel.isCommitted()) + { + callback.failed(new IOException("cannot sendContent(), output already committed")); + return false; + } - case CLOSING: + switch (_state) + { case CLOSED: + case CLOSING: callback.failed(new EofException("Closed")); - return; + return false; default: - throw new IllegalStateException(); + _state = State.CLOSING; + break; + } + + if (_onError != null) + { + callback.failed(_onError); + return false; } - break; + + if (_apiState != ApiState.BLOCKING) + throw new IllegalStateException(stateString()); + _apiState = ApiState.PENDING; + if (len > 0) + _written += len; + return true; } + } + + /** + * Asynchronous send of HTTP content. + * + * @param httpContent The HTTP content to send + * @param callback The callback to use to notify success or failure + */ + public void sendContent(HttpContent httpContent, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("sendContent(http={},{})", httpContent, callback); ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; if (buffer == null) @@ -1068,30 +1285,40 @@ public void sendContent(HttpContent httpContent, Callback callback) return; } + ReadableByteChannel rbc = null; try { - ReadableByteChannel rbc = httpContent.getReadableByteChannel(); - if (rbc != null) - { - // Close of the rbc is done by the async sendContent - sendContent(rbc, callback); - return; - } - - InputStream in = httpContent.getInputStream(); - if (in != null) - { - sendContent(in, callback); - return; - } + rbc = httpContent.getReadableByteChannel(); + } + catch (Throwable x) + { + LOG.debug(x); + } + if (rbc != null) + { + // Close of the rbc is done by the async sendContent + sendContent(rbc, callback); + return; + } - throw new IllegalArgumentException("unknown content for " + httpContent); + InputStream in = null; + try + { + in = httpContent.getInputStream(); + } + catch (Throwable x) + { + LOG.debug(x); } - catch (Throwable cause) + if (in != null) { - abort(cause); - callback.failed(cause); + sendContent(in, callback); + return; } + + Throwable cause = new IllegalArgumentException("unknown content for " + httpContent); + _channel.abort(cause); + callback.failed(cause); } public int getBufferSize() @@ -1135,20 +1362,25 @@ public void onFlushed(long bytes) throws IOException public void recycle() { - _interceptor = _channel; - HttpConfiguration config = _channel.getHttpConfiguration(); - _bufferSize = config.getOutputBufferSize(); - _commitSize = config.getOutputAggregationSize(); - if (_commitSize > _bufferSize) - _commitSize = _bufferSize; - releaseBuffer(); - _written = 0; - _writeListener = null; - _onError = null; - _firstByteTimeStamp = -1; - _flushed = 0; - _closeCallback = null; - reopen(); + synchronized (_channelState) + { + _state = State.OPEN; + _apiState = ApiState.BLOCKING; + _softClose = false; + _interceptor = _channel; + HttpConfiguration config = _channel.getHttpConfiguration(); + _bufferSize = config.getOutputBufferSize(); + _commitSize = config.getOutputAggregationSize(); + if (_commitSize > _bufferSize) + _commitSize = _bufferSize; + releaseBuffer(); + _written = 0; + _writeListener = null; + _onError = null; + _firstByteTimeStamp = -1; + _flushed = 0; + _closedCallback = null; + } } public void resetBuffer() @@ -1163,47 +1395,45 @@ public void resetBuffer() public void setWriteListener(WriteListener writeListener) { if (!_channel.getState().isAsync()) - throw new IllegalStateException("!ASYNC"); - - if (_state.compareAndSet(State.OPEN, State.READY)) + throw new IllegalStateException("!ASYNC: " + stateString()); + boolean wake; + synchronized (_channelState) { + if (_apiState != ApiState.BLOCKING) + throw new IllegalStateException("!OPEN" + stateString()); + _apiState = ApiState.READY; _writeListener = writeListener; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + wake = _channel.getState().onWritePossible(); } - else - throw new IllegalStateException(); + if (wake) + _channel.execute(_channel); } @Override public boolean isReady() { - while (true) + synchronized (_channelState) { - switch (_state.get()) + switch (_apiState) { - case OPEN: + case BLOCKING: case READY: - case ERROR: - case CLOSING: - case CLOSED: return true; case ASYNC: - if (!_state.compareAndSet(State.ASYNC, State.READY)) - continue; + _apiState = ApiState.READY; return true; case PENDING: - if (!_state.compareAndSet(State.PENDING, State.UNREADY)) - continue; + _apiState = ApiState.UNREADY; return false; + case BLOCKED: case UNREADY: return false; default: - throw new IllegalStateException(); + throw new IllegalStateException(stateString()); } } } @@ -1211,84 +1441,67 @@ public boolean isReady() @Override public void run() { - while (true) - { - State state = _state.get(); + Throwable error = null; + synchronized (_channelState) + { if (_onError != null) { - switch (state) - { - case CLOSING: - case CLOSED: - case ERROR: - { - _onError = null; - return; - } - default: - { - if (_state.compareAndSet(state, State.ERROR)) - { - Throwable th = _onError; - _onError = null; - if (LOG.isDebugEnabled()) - LOG.debug("onError", th); - - try - { - _writeListener.onError(th); - } - finally - { - IO.close(this); - } - - return; - } - } - } - continue; + error = _onError; + _onError = null; } + } - // We do not check the state here. Strictly speaking the state should - // always be READY when run is called. However, other async threads or - // a prior call by this thread to onDataAvailable may have called write - // after onWritePossible was called, so the state could be any of the - // write states. - // - // Even if the state is CLOSED, we need to call onWritePossible to tell - // async producer that the last write completed. - // - // We have to trust the scheduling of this run was done - // for good reason, that is protected correctly by HttpChannelState and - // that implementations of onWritePossible will - // themselves check isReady(). If multiple threads are calling write, - // then they must either rely on only a single container thread being - // dispatched or perform their own mutual exclusion. - try + try + { + if (error == null) { + if (LOG.isDebugEnabled()) + LOG.debug("onWritePossible"); _writeListener.onWritePossible(); - break; - } - catch (Throwable e) - { - _onError = e; + return; } } + catch (Throwable t) + { + error = t; + } + try + { + if (LOG.isDebugEnabled()) + LOG.debug("onError", error); + _writeListener.onError(error); + } + catch (Throwable t) + { + if (LOG.isDebugEnabled()) + LOG.debug(t); + } + finally + { + IO.close(this); + } + } + + private String stateString() + { + return String.format("s=%s,api=%s,sc=%b,e=%s", _state, _apiState, _softClose, _onError); } @Override public String toString() { - return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get()); + synchronized (_channelState) + { + return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), stateString()); + } } - private abstract class AsyncICB extends IteratingCallback + private abstract class ChannelWriteCB extends IteratingCallback { final boolean _last; - AsyncICB(boolean last) + ChannelWriteCB(boolean last) { _last = last; } @@ -1302,67 +1515,81 @@ public InvocationType getInvocationType() @Override protected void onCompleteSuccess() { - while (true) - { - State last = _state.get(); - switch (last) - { - case PENDING: - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - continue; - break; + onWriteComplete(_last, null); + } - case UNREADY: - if (!_state.compareAndSet(State.UNREADY, State.READY)) - continue; - if (_last) - closed(); - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); - break; + @Override + public void onCompleteFailure(Throwable e) + { + onWriteComplete(_last, e); + } + } - case CLOSED: - break; + private abstract class NestedChannelWriteCB extends ChannelWriteCB + { + final Callback _callback; - default: - throw new IllegalStateException(); - } - break; + NestedChannelWriteCB(Callback callback, boolean last) + { + super(last); + _callback = callback; + } + + @Override + protected void onCompleteSuccess() + { + try + { + super.onCompleteSuccess(); + } + finally + { + _callback.succeeded(); } } @Override public void onCompleteFailure(Throwable e) { - _onError = e == null ? new IOException() : e; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + try + { + super.onCompleteFailure(e); + } + catch (Throwable t) + { + if (t != e) + e.addSuppressed(t); + } + finally + { + _callback.failed(e); + } } } - private class AsyncFlush extends AsyncICB + private class AsyncFlush extends ChannelWriteCB { - protected volatile boolean _flushed; + volatile boolean _flushed; - public AsyncFlush() + AsyncFlush(boolean last) { - super(false); + super(last); } @Override - protected Action process() + protected Action process() throws Exception { if (BufferUtil.hasContent(_aggregate)) { _flushed = true; - write(_aggregate, false, this); + channelWrite(_aggregate, false, this); return Action.SCHEDULED; } if (!_flushed) { _flushed = true; - write(BufferUtil.EMPTY_BUFFER, false, this); + channelWrite(BufferUtil.EMPTY_BUFFER, false, this); return Action.SCHEDULED; } @@ -1370,14 +1597,14 @@ protected Action process() } } - private class AsyncWrite extends AsyncICB + private class AsyncWrite extends ChannelWriteCB { private final ByteBuffer _buffer; private final ByteBuffer _slice; private final int _len; - protected volatile boolean _completed; + volatile boolean _completed; - public AsyncWrite(byte[] b, int off, int len, boolean last) + AsyncWrite(byte[] b, int off, int len, boolean last) { super(last); _buffer = ByteBuffer.wrap(b, off, len); @@ -1386,7 +1613,7 @@ public AsyncWrite(byte[] b, int off, int len, boolean last) _slice = _len < getBufferSize() ? null : _buffer.duplicate(); } - public AsyncWrite(ByteBuffer buffer, boolean last) + AsyncWrite(ByteBuffer buffer, boolean last) { super(last); _buffer = buffer; @@ -1401,13 +1628,13 @@ public AsyncWrite(ByteBuffer buffer, boolean last) } @Override - protected Action process() + protected Action process() throws Exception { // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { _completed = _len == 0; - write(_aggregate, _last && _completed, this); + channelWrite(_aggregate, _last && _completed, this); return Action.SCHEDULED; } @@ -1427,7 +1654,7 @@ protected Action process() if (_slice == null) { _completed = true; - write(_buffer, _last, this); + channelWrite(_buffer, _last, this); return Action.SCHEDULED; } @@ -1439,7 +1666,7 @@ protected Action process() _buffer.position(pl); _slice.position(p); _completed = !_buffer.hasRemaining(); - write(_slice, _last && _completed, this); + channelWrite(_slice, _last && _completed, this); return Action.SCHEDULED; } @@ -1448,12 +1675,13 @@ protected Action process() if (_last && !_completed) { _completed = true; - write(BufferUtil.EMPTY_BUFFER, true, this); + channelWrite(BufferUtil.EMPTY_BUFFER, true, this); return Action.SCHEDULED; } if (LOG.isDebugEnabled() && _completed) LOG.debug("EOF of {}", this); + return Action.SUCCEEDED; } } @@ -1466,15 +1694,16 @@ protected Action process() * be notified as each buffer is written and only once all the input is consumed will the * wrapped {@link Callback#succeeded()} method be called. */ - private class InputStreamWritingCB extends IteratingNestedCallback + private class InputStreamWritingCB extends NestedChannelWriteCB { private final InputStream _in; private final ByteBuffer _buffer; private boolean _eof; + private boolean _closed; - public InputStreamWritingCB(InputStream in, Callback callback) + InputStreamWritingCB(InputStream in, Callback callback) { - super(callback); + super(callback, true); _in = in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false); } @@ -1488,10 +1717,13 @@ protected Action process() throws Exception { if (LOG.isDebugEnabled()) LOG.debug("EOF of {}", this); - // Handle EOF - _in.close(); - closed(); - _channel.getByteBufferPool().release(_buffer); + if (!_closed) + { + _closed = true; + _channel.getByteBufferPool().release(_buffer); + IO.close(_in); + } + return Action.SUCCEEDED; } @@ -1510,17 +1742,21 @@ protected Action process() throws Exception _buffer.position(0); _buffer.limit(len); _written += len; - write(_buffer, _eof, this); + channelWrite(_buffer, _eof, this); return Action.SCHEDULED; } @Override public void onCompleteFailure(Throwable x) { - abort(x); - _channel.getByteBufferPool().release(_buffer); - IO.close(_in); - super.onCompleteFailure(x); + try + { + _channel.getByteBufferPool().release(_buffer); + } + finally + { + super.onCompleteFailure(x); + } } } @@ -1533,15 +1769,16 @@ public void onCompleteFailure(Throwable x) * be notified as each buffer is written and only once all the input is consumed will the * wrapped {@link Callback#succeeded()} method be called. */ - private class ReadableByteChannelWritingCB extends IteratingNestedCallback + private class ReadableByteChannelWritingCB extends NestedChannelWriteCB { private final ReadableByteChannel _in; private final ByteBuffer _buffer; private boolean _eof; + private boolean _closed; - public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) + ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { - super(callback); + super(callback, true); _in = in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); } @@ -1555,9 +1792,12 @@ protected Action process() throws Exception { if (LOG.isDebugEnabled()) LOG.debug("EOF of {}", this); - _in.close(); - closed(); - _channel.getByteBufferPool().release(_buffer); + if (!_closed) + { + _closed = true; + _channel.getByteBufferPool().release(_buffer); + IO.close(_in); + } return Action.SUCCEEDED; } @@ -1571,7 +1811,7 @@ protected Action process() throws Exception // write what we have BufferUtil.flipToFlush(_buffer, 0); _written += _buffer.remaining(); - write(_buffer, _eof, this); + channelWrite(_buffer, _eof, this); return Action.SCHEDULED; } @@ -1579,7 +1819,6 @@ protected Action process() throws Exception @Override public void onCompleteFailure(Throwable x) { - abort(x); _channel.getByteBufferPool().release(_buffer); IO.close(_in); super.onCompleteFailure(x); @@ -1604,4 +1843,35 @@ protected long getIdleTimeout() return blockingTimeout; } } + + private class WriteCompleteCB implements Callback + { + final Callback _callback; + + WriteCompleteCB() + { + this(null); + } + + WriteCompleteCB(Callback callback) + { + _callback = callback; + } + + @Override + public void succeeded() + { + onWriteComplete(true, null); + if (_callback != null) + _callback.succeeded(); + } + + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + if (_callback != null) + _callback.succeeded(); + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java index 20b42e207602..d225d89adbe0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java @@ -22,13 +22,14 @@ import java.io.Writer; import org.eclipse.jetty.util.ByteArrayOutputStream2; +import org.eclipse.jetty.util.Callback; /** * */ public abstract class HttpWriter extends Writer { - public static final int MAX_OUTPUT_CHARS = 512; + public static final int MAX_OUTPUT_CHARS = 512; // TODO should this be configurable? super size is 1024 final HttpOutput _out; final ByteArrayOutputStream2 _bytes; @@ -38,7 +39,7 @@ public HttpWriter(HttpOutput out) { _out = out; _chars = new char[MAX_OUTPUT_CHARS]; - _bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); + _bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); // TODO should this be pooled - or do we just recycle the writer? } @Override @@ -47,6 +48,11 @@ public void close() throws IOException _out.close(); } + public void complete(Callback callback) + { + _out.complete(callback); + } + @Override public void flush() throws IOException { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java index 1c8c9b444df8..c755296ea2af 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java @@ -35,11 +35,6 @@ public Iso88591HttpWriter(HttpOutput out) public void write(char[] s, int offset, int length) throws IOException { HttpOutput out = _out; - if (length == 0 && out.isAllContentWritten()) - { - close(); - return; - } if (length == 1) { @@ -51,7 +46,7 @@ public void write(char[] s, int offset, int length) throws IOException while (length > 0) { _bytes.reset(); - int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length; + int chars = Math.min(length, MAX_OUTPUT_CHARS); byte[] buffer = _bytes.getBuf(); int bytes = _bytes.getCount(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 60b5b10e8b68..027eed491d6d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -145,10 +145,10 @@ public void reopen() _out.reopen(); } - public void closedBySendError() + public void softClose() { setErrorSent(true); - _out.closedBySendError(); + _out.softClose(); } /** @@ -496,7 +496,7 @@ public void sendRedirect(int code, String location) throws IOException resetBuffer(); setHeader(HttpHeader.LOCATION, location); setStatus(code); - closeOutput(); + completeOutput(); } @Override @@ -788,7 +788,7 @@ public void setContentLength(int len) { try { - closeOutput(); + completeOutput(); } catch (IOException e) { @@ -826,17 +826,20 @@ public boolean isContentComplete(long written) return (_contentLength < 0 || written >= _contentLength); } - public void closeOutput() throws IOException + public void completeOutput() throws IOException { if (_outputType == OutputType.WRITER) _writer.close(); - if (!_out.isClosed()) + else _out.close(); } - public void closeOutput(Callback callback) + public void completeOutput(Callback callback) { - _out.close((_outputType == OutputType.WRITER) ? _writer : _out, callback); + if (_outputType == OutputType.WRITER) + _writer.complete(callback); + else + _out.complete(callback); } public long getLongContentLength() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java index 094f98f1cbca..7837c9281844 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -148,7 +149,7 @@ public void flush() out.flush(); } } - catch (IOException ex) + catch (Throwable ex) { setError(ex); } @@ -171,6 +172,15 @@ public void close() } } + public void complete(Callback callback) + { + synchronized (lock) + { + _isClosed = true; + } + _httpWriter.complete(callback); + } + @Override public void write(int c) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java index ba614f0ddabc..d8cc660dd65f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java @@ -42,16 +42,11 @@ public Utf8HttpWriter(HttpOutput out) public void write(char[] s, int offset, int length) throws IOException { HttpOutput out = _out; - if (length == 0 && out.isAllContentWritten()) - { - close(); - return; - } while (length > 0) { _bytes.reset(); - int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length; + int chars = Math.min(length, MAX_OUTPUT_CHARS); byte[] buffer = _bytes.getBuf(); int bytes = _bytes.getCount(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 376b9f33baa0..d4c95aec9f4b 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -18,7 +18,9 @@ package org.eclipse.jetty.server; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; @@ -27,7 +29,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -48,9 +52,12 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -65,14 +72,19 @@ */ public class AsyncCompletionTest extends HttpServerTestFixture { - private static final Exchanger X = new Exchanger<>(); - private static final AtomicBoolean COMPLETE = new AtomicBoolean(); + private static final int POLL = 10; // milliseconds + private static final int WAIT = 10; // seconds + private static final String SMALL = "Now is the time for all good men to come to the aid of the party. "; + private static final String LARGE = SMALL + SMALL + SMALL + SMALL + SMALL; + private static final int BUFFER_SIZE = SMALL.length() * 3 / 2; + private static final BlockingQueue __queue = new BlockingArrayQueue<>(); + private static final AtomicBoolean __transportComplete = new AtomicBoolean(); - private static class DelayedCallback extends Callback.Nested + private static class PendingCallback extends Callback.Nested { - private CompletableFuture _delay = new CompletableFuture<>(); + private CompletableFuture _pending = new CompletableFuture<>(); - public DelayedCallback(Callback callback) + public PendingCallback(Callback callback) { super(callback); } @@ -80,20 +92,20 @@ public DelayedCallback(Callback callback) @Override public void succeeded() { - _delay.complete(null); + _pending.complete(null); } @Override public void failed(Throwable x) { - _delay.completeExceptionally(x); + _pending.completeExceptionally(x); } public void proceed() { try { - _delay.get(10, TimeUnit.SECONDS); + _pending.get(WAIT, TimeUnit.SECONDS); getCallback().succeeded(); } catch (Throwable th) @@ -107,13 +119,15 @@ public void proceed() @BeforeEach public void init() throws Exception { - COMPLETE.set(false); + __transportComplete.set(false); startServer(new ServerConnector(_server, new HttpConnectionFactory() { @Override public Connection newConnection(Connector connector, EndPoint endPoint) { + getHttpConfiguration().setOutputBufferSize(BUFFER_SIZE); + getHttpConfiguration().setOutputAggregationSize(BUFFER_SIZE); return configure(new ExtendedHttpConnection(getHttpConfiguration(), connector, endPoint), connector, endPoint); } }) @@ -136,16 +150,9 @@ public ExtendedEndPoint(SocketChannel channel, ManagedSelector selector, Selecti @Override public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException { - DelayedCallback delay = new DelayedCallback(callback); + PendingCallback delay = new PendingCallback(callback); super.write(delay, buffers); - try - { - X.exchange(delay); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + __queue.offer(delay); } } @@ -159,24 +166,44 @@ public ExtendedHttpConnection(HttpConfiguration config, Connector connector, End @Override public void onCompleted() { - COMPLETE.compareAndSet(false, true); + __transportComplete.compareAndSet(false, true); super.onCompleted(); } } - // Tests from here use these parameters - public static Stream tests() + enum WriteStyle + {ARRAY, BUFFER, BYTE, BYTE_THEN_ARRAY, PRINT} + + ; + + public static Stream asyncIOWriteTests() { List tests = new ArrayList<>(); - tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"}); - tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"}); - tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data}); + for (WriteStyle w : WriteStyle.values()) + { + for (boolean contentLength : new Boolean[]{true, false}) + { + for (boolean isReady : new Boolean[]{true, false}) + { + for (boolean flush : new Boolean[]{true, false}) + { + for (boolean close : new Boolean[]{true, false}) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new AsyncIOWriteHandler(w, contentLength, isReady, flush, close, data)}); + } + } + } + } + } + } return tests.stream().map(Arguments::of); } @ParameterizedTest - @MethodSource("tests") - public void testAsyncCompletion(Handler handler, int status, String message) throws Exception + @MethodSource("asyncIOWriteTests") + public void testAsyncIOWrite(AsyncIOWriteHandler handler) throws Exception { configureServer(handler); @@ -184,79 +211,242 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) { OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); // write the request os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); os.flush(); - // The write should happen but the callback is delayed - HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); - assertThat(response, Matchers.notNullValue()); - assertThat(response.getStatus(), is(status)); - String content = response.getContent(); - assertThat(content, containsString(message)); + // wait for OWP to execute (proves we do not block in write APIs) + boolean completeCalled = handler.waitForOWPExit(); - // Check that a thread is held busy in write - assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base)); + while (true) + { + // wait for threads to return to base level (proves we are really async) + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (_threadPool.getBusyThreads() != base) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + Thread.sleep(POLL); + } - // Getting the Delayed callback will free the thread - DelayedCallback delay = X.exchange(null, 10, TimeUnit.SECONDS); + if (completeCalled) + break; - // wait for threads to return to base level - long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); - while (_threadPool.getBusyThreads() != base) - { - if (System.nanoTime() > end) - throw new TimeoutException(); - Thread.sleep(10); - } + // We are now asynchronously waiting! + assertThat(__transportComplete.get(), is(false)); - // We are now asynchronously waiting! - assertThat(COMPLETE.get(), is(false)); + // If we are not complete, we must be waiting for one or more writes to complete + while (true) + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + { + delay.proceed(); + continue; + } + // No delay callback found, have we finished OWP again? + Boolean c = handler.pollForOWPExit(); - // proceed with the completion - delay.proceed(); + if (c == null) + // No we haven't, so look for another delay callback + continue; - while (!COMPLETE.get()) + // We have a OWP result, so let's handle it. + completeCalled = c; + break; + } + } + + // Wait for full completion + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) { if (System.nanoTime() > end) throw new TimeoutException(); - Thread.sleep(10); + + // proceed with any delayCBs needed for completion + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); } + + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); } } - private static class AsyncReadyCompleteHandler extends AbstractHandler + private static class AsyncIOWriteHandler extends AbstractHandler { - static String data = "Now is the time for all good men to come to the aid of the party"; + final WriteStyle _write; + final boolean _contentLength; + final boolean _isReady; + final boolean _flush; + final boolean _close; + final String _data; + final Exchanger _ready = new Exchanger<>(); + int _toWrite; + boolean _flushed; + boolean _closed; + + AsyncIOWriteHandler(WriteStyle write, boolean contentLength, boolean isReady, boolean flush, boolean close, String data) + { + _write = write; + _contentLength = contentLength; + _isReady = isReady; + _flush = flush; + _close = close; + _data = data; + _toWrite = data.length(); + } + + public String getExpectedMessage() + { + return SMALL; + } + + boolean waitForOWPExit() + { + try + { + return _ready.exchange(null); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + Boolean pollForOWPExit() + { + try + { + return _ready.exchange(null, POLL, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + return null; + } + } @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); AsyncContext context = request.startAsync(); ServletOutputStream out = response.getOutputStream(); + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + if (_contentLength) + response.setContentLength(bytes.length); + out.setWriteListener(new WriteListener() { - byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1); - @Override public void onWritePossible() throws IOException { - while (out.isReady()) + try { - if (bytes != null) - { - response.setContentType("text/plain"); - response.setContentLength(bytes.length); - out.write(bytes); - bytes = null; - } - else + if (out.isReady()) { + if (_toWrite > 0) + { + switch (_write) + { + case ARRAY: + _toWrite = 0; + out.write(bytes, 0, bytes.length); + break; + + case BUFFER: + _toWrite = 0; + ((HttpOutput)out).write(BufferUtil.toBuffer(bytes)); + break; + + case BYTE: + for (int i = bytes.length - _toWrite; i < bytes.length; i++) + { + _toWrite--; + out.write(bytes[i]); + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } + break; + + case BYTE_THEN_ARRAY: + _toWrite = 0; + out.write(bytes[0]); // This should always aggregate + assertThat(out.isReady(), is(true)); + out.write(bytes, 1, bytes.length - 1); + break; + + case PRINT: + _toWrite = 0; + out.print(_data); + break; + } + } + + if (_flush && !_flushed) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + _flushed = true; + out.flush(); + } + + if (_close && !_closed) + { + if (_isReady) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } + _closed = true; + out.close(); + } + + if (_isReady) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } context.complete(); - return; + _ready.exchange(Boolean.TRUE); } } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + finally + { + } } @Override @@ -266,5 +456,312 @@ public void onError(Throwable t) } }); } + + @Override + public String toString() + { + return String.format("AWCH{w=%s,cl=%b,ir=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _isReady, _flush, _close, _data.length()); + } + } + + public static Stream blockingWriteTests() + { + List tests = new ArrayList<>(); + for (WriteStyle w : WriteStyle.values()) + { + for (boolean contentLength : new Boolean[]{true, false}) + { + for (boolean flush : new Boolean[]{true, false}) + { + for (boolean close : new Boolean[]{true, false}) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new BlockingWriteHandler(w, contentLength, flush, close, data)}); + } + } + } + } + } + return tests.stream().map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("blockingWriteTests") + public void testBlockingWrite(BlockingWriteHandler handler) throws Exception + { + configureServer(handler); + + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); + + // write the request + os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + handler.wait4handle(); + + // Wait for full completion + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + + // proceed with any delayCBs needed for completion + try + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); + } + catch (Exception e) + { + // ignored + } + } + + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); + } + } + + private static class BlockingWriteHandler extends AbstractHandler + { + final WriteStyle _write; + final boolean _contentLength; + final boolean _flush; + final boolean _close; + final String _data; + final CountDownLatch _wait = new CountDownLatch(1); + + BlockingWriteHandler(WriteStyle write, boolean contentLength, boolean flush, boolean close, String data) + { + _write = write; + _contentLength = contentLength; + _flush = flush; + _close = close; + _data = data; + } + + public String getExpectedMessage() + { + return SMALL; + } + + public void wait4handle() + { + try + { + Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext context = request.startAsync(); + ServletOutputStream out = response.getOutputStream(); + + context.start(() -> + { + try + { + _wait.countDown(); + + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + if (_contentLength) + response.setContentLength(bytes.length); + + switch (_write) + { + case ARRAY: + out.write(bytes, 0, bytes.length); + break; + + case BUFFER: + ((HttpOutput)out).write(BufferUtil.toBuffer(bytes)); + break; + + case BYTE: + for (byte b : bytes) + { + out.write(b); + } + break; + + case BYTE_THEN_ARRAY: + out.write(bytes[0]); // This should always aggregate + out.write(bytes, 1, bytes.length - 1); + break; + + case PRINT: + out.print(_data); + break; + } + + if (_flush) + out.flush(); + + if (_close) + out.close(); + + context.complete(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @Override + public String toString() + { + return String.format("BWCH{w=%s,cl=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _flush, _close, _data.length()); + } + } + + public static Stream sendContentTests() + { + List tests = new ArrayList<>(); + for (ContentStyle style : ContentStyle.values()) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new SendContentHandler(style, data)}); + } + } + return tests.stream().map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("sendContentTests") + public void testSendContent(SendContentHandler handler) throws Exception + { + configureServer(handler); + + int base = _threadPool.getBusyThreads(); + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); + + // write the request + os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + handler.wait4handle(); + + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (_threadPool.getBusyThreads() != base) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + Thread.sleep(POLL); + } + + // Wait for full completion + end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + + // proceed with any delayCBs needed for completion + try + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); + } + catch (Exception e) + { + // ignored + } + } + + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); + } + } + + enum ContentStyle + {BUFFER, STREAM} // TODO more types needed here + + private static class SendContentHandler extends AbstractHandler + { + final ContentStyle _style; + final String _data; + final CountDownLatch _wait = new CountDownLatch(1); + + SendContentHandler(ContentStyle style, String data) + { + _style = style; + _data = data; + } + + public String getExpectedMessage() + { + return SMALL; + } + + public void wait4handle() + { + try + { + Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext context = request.startAsync(); + HttpOutput out = (HttpOutput)response.getOutputStream(); + + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + + switch (_style) + { + case BUFFER: + out.sendContent(BufferUtil.toBuffer(bytes), Callback.from(context::complete)); + break; + + case STREAM: + out.sendContent(new ByteArrayInputStream(bytes), Callback.from(context::complete)); + break; + } + + _wait.countDown(); + } + + @Override + public String toString() + { + return String.format("SCCH{w=%s,d=%d}", _style, _data.length()); + } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index 2584039a25ff..813e836e2b9c 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.concurrent.Exchanger; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; @@ -1020,13 +1021,22 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques @Test public void testPipeline() throws Exception { - configureServer(new HelloWorldHandler()); + AtomicInteger served = new AtomicInteger(); + configureServer(new HelloWorldHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + served.incrementAndGet(); + super.handle(target, baseRequest, request, response); + } + }); - //for (int pipeline=1;pipeline<32;pipeline++) - for (int pipeline = 1; pipeline < 32; pipeline++) + int pipeline = 64; { try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) { + served.set(0); client.setSoTimeout(5000); OutputStream os = client.getOutputStream(); @@ -1065,6 +1075,7 @@ public void testPipeline() throws Exception count++; line = in.readLine(); } + assertEquals(pipeline, served.get()); assertEquals(pipeline, count); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 8ce876ca377c..102ccad42977 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -291,6 +291,58 @@ public InvocationType getInvocationType() } } + interface InvocableCallback extends Invocable, Callback + { + } + + static Callback combine(Callback cb1, Callback cb2) + { + if (cb1 == null || cb1 == cb2) + return cb2; + if (cb2 == null) + return cb1; + + return new InvocableCallback() + { + @Override + public void succeeded() + { + try + { + cb1.succeeded(); + } + finally + { + cb2.succeeded(); + } + } + + @Override + public void failed(Throwable x) + { + try + { + cb1.failed(x); + } + catch (Throwable t) + { + if (x != t) + x.addSuppressed(t); + } + finally + { + cb2.failed(x); + } + } + + @Override + public InvocationType getInvocationType() + { + return Invocable.combine(Invocable.getInvocationType(cb1), Invocable.getInvocationType(cb2)); + } + }; + } + /** *

A CompletableFuture that is also a Callback.

*/ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 91e8e3ee13f5..620dccfe738e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -74,6 +74,20 @@ static void invokeNonBlocking(Runnable task) } } + static InvocationType combine(InvocationType it1, InvocationType it2) + { + if (it1 != null && it2 != null) + { + if (it1 == it2) + return it1; + if (it1 == InvocationType.EITHER) + return it2; + if (it2 == InvocationType.EITHER) + return it1; + } + return InvocationType.BLOCKING; + } + /** * Get the invocation type of an Object. *