diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java index 1881a10a6d3e..b91cb162655e 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/AsyncIOTest.java @@ -41,7 +41,6 @@ import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.DataFrame; import org.eclipse.jetty.http2.frames.HeadersFrame; -import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.http2.frames.SettingsFrame; import org.eclipse.jetty.server.HttpOutput; import org.eclipse.jetty.util.Callback; @@ -243,8 +242,8 @@ public void onWritePossible() throws IOException // The write is too large and will stall. output.write(ByteBuffer.wrap(new byte[2 * clientWindow])); - // We cannot call complete() now before checking for isReady(). - // This will abort the response and the client will receive a reset. + // We can now call complete() now before checking for isReady(). + // This will asynchronously complete when the write is finished. asyncContext.complete(); } @@ -275,7 +274,7 @@ public Map onPreface(Session session) session.newStream(frame, promise, new Stream.Listener.Adapter() { @Override - public void onReset(Stream stream, ResetFrame frame) + public void onClosed(Stream stream) { latch.countDown(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java index 743f0a1fcef3..84cabdf5a708 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java @@ -518,7 +518,15 @@ public boolean onFail(Throwable cause) public void onClose() { - onFail(new ClosedChannelException()); + switch (_state.get().getType()) + { + case IDLE: + case FAILED: + return; + + default: + onFail(new ClosedChannelException()); + } } boolean isFailed() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java index d02f9768e1f4..eb30265ba33c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContextState.java @@ -146,7 +146,11 @@ public void start(final Runnable task) @Override public void run() { - state().getAsyncContextEvent().getContext().getContextHandler().handle(channel.getRequest(), task); + ContextHandler.Context context = state().getAsyncContextEvent().getContext(); + if (context == null) + task.run(); + else + context.getContextHandler().handle(channel.getRequest(), task); } }); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java index 09249f955ab7..9251b202e97a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Dispatcher.java @@ -220,7 +220,7 @@ protected void forward(ServletRequest request, ServletResponse response, Dispatc _contextHandler.handle(_pathInContext, baseRequest, (HttpServletRequest)request, (HttpServletResponse)response); if (!baseRequest.getHttpChannelState().isAsync()) - commitResponse(response, baseRequest); + baseRequest.getResponse().softClose(); } } finally @@ -242,57 +242,6 @@ public String toString() return String.format("Dispatcher@0x%x{%s,%s}", hashCode(), _named, _uri); } - @SuppressWarnings("Duplicates") - private void commitResponse(ServletResponse response, Request baseRequest) throws IOException, ServletException - { - if (baseRequest.getResponse().isWriting()) - { - try - { - // Try closing Writer first (based on knowledge in Response obj) - response.getWriter().close(); - } - catch (IllegalStateException ex) - { - try - { - // Try closing OutputStream as alternate route - // This path is possible due to badly behaving Response wrappers - response.getOutputStream().close(); - } - catch (IllegalStateException ex2) - { - ServletException servletException = new ServletException("Unable to commit the response", ex2); - servletException.addSuppressed(ex); - throw servletException; - } - } - } - else - { - try - { - // Try closing OutputStream first (based on knowledge in Response obj) - response.getOutputStream().close(); - } - catch (IllegalStateException ex) - { - try - { - // Try closing Writer as alternate route - // This path is possible due to badly behaving Response wrappers - response.getWriter().close(); - } - catch (IllegalStateException ex2) - { - ServletException servletException = new ServletException("Unable to commit the response", ex2); - servletException.addSuppressed(ex); - throw servletException; - } - } - } - } - private class ForwardAttributes implements Attributes { final Attributes _attr; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java index 23a2413903c1..ab101829f9e0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/EncodingHttpWriter.java @@ -47,16 +47,11 @@ public EncodingHttpWriter(HttpOutput out, String encoding) public void write(char[] s, int offset, int length) throws IOException { HttpOutput out = _out; - if (length == 0 && out.isAllContentWritten()) - { - out.close(); - return; - } while (length > 0) { _bytes.reset(); - int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length; + int chars = Math.min(length, MAX_OUTPUT_CHARS); _converter.write(s, offset, chars); _converter.flush(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 9134588cfd99..42794c91460f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -509,7 +509,7 @@ public boolean handle() // TODO that is done. // Set a close callback on the HttpOutput to make it an async callback - _response.closeOutput(Callback.from(_state::completed)); + _response.completeOutput(Callback.from(_state::completed)); break; } @@ -1212,7 +1212,7 @@ public void failed(final Throwable x) @Override public void succeeded() { - _response.getHttpOutput().closed(); + _response.getHttpOutput().completed(); super.failed(x); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 74e72098f64f..c2b0dc05952b 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -889,6 +889,9 @@ public void sendError(int code, String message) if (LOG.isDebugEnabled()) LOG.debug("sendError {}", toStringLocked()); + if (_outputState != OutputState.OPEN) + throw new IllegalStateException(_outputState.toString()); + switch (_state) { case HANDLING: @@ -902,7 +905,7 @@ public void sendError(int code, String message) throw new IllegalStateException("Response is " + _outputState); response.setStatus(code); - response.closedBySendError(); + response.softClose(); request.setAttribute(ErrorHandler.ERROR_CONTEXT, request.getErrorContext()); request.setAttribute(ERROR_REQUEST_URI, request.getRequestURI()); @@ -970,7 +973,7 @@ protected void completed() } // release any aggregate buffer from a closing flush - _channel.getResponse().getHttpOutput().closed(); + _channel.getResponse().getHttpOutput().completed(); if (event != null) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index e42223a751e7..a73aefbcc75e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -407,20 +407,25 @@ public void onCompleted() } else if (_parser.inContentState() && _generator.isPersistent()) { - // If we are async, then we have problems to complete neatly - if (_input.isAsync()) + // Try to progress without filling. + parseRequestBuffer(); + if (_parser.inContentState()) { - if (LOG.isDebugEnabled()) - LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this); - _channel.abort(new IOException("unconsumed input")); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this); - // Complete reading the request - if (!_input.consumeAll()) + // If we are async, then we have problems to complete neatly + if (_input.isAsync()) + { + if (LOG.isDebugEnabled()) + LOG.debug("{}unconsumed input while async {}", _parser.isChunking() ? "Possible " : "", this); _channel.abort(new IOException("unconsumed input")); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this); + // Complete reading the request + if (!_input.consumeAll()) + _channel.abort(new IOException("unconsumed input")); + } } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f651179862ab..3e98573abce4 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.server; -import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; @@ -31,7 +30,6 @@ import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; @@ -44,7 +42,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IteratingCallback; -import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; @@ -63,29 +60,68 @@ public class HttpOutput extends ServletOutputStream implements Runnable { private static final String LSTRING_FILE = "javax.servlet.LocalStrings"; - private static final Callback BLOCKING_CLOSE_CALLBACK = new Callback() {}; private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE); - /* - ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED - -------------------------------------------------------------------------------------------------- - setWriteListener() READY->owp ise ise ise ise ise ise - write() OPEN ise PENDING wpe wpe eof eof - flush() OPEN ise PENDING wpe wpe eof eof - close() CLOSING CLOSING CLOSING CLOSED CLOSED CLOSING CLOSED - isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true CLOSED:true - write completed - - - ASYNC READY->owp CLOSED - - */ + /** + * The output state + */ enum State { - OPEN, // Open in blocking mode + OPEN, // Open + CLOSE, // Close needed from onWriteCompletion + CLOSING, // Close in progress after close API called + CLOSED // Closed + } + + /** + * The API State which combines with the output State: + *
+              OPEN/BLOCKING---last----------------------------+                      CLOSED/BLOCKING
+             /   |    ^                                        \                         ^  ^
+            /    w    |                                         \                       /   |
+           /     |   owc   +--owcL------------------->--owcL-----\---------------------+    |
+          |      v    |   /                         /             V                         |
+         swl  OPEN/BLOCKED----last---->CLOSE/BLOCKED----owc----->CLOSING/BLOCKED--owcL------+
+          |
+           \
+            \
+             V
+          +-->OPEN/READY------last---------------------------+
+         /    ^    |                                          \
+        /    /     w                                           \
+       |    /      |       +--owcL------------------->--owcL----\---------------------------+
+       |   /       v      /                         /            V                          |
+       | irt  OPEN/PENDING----last---->CLOSE/PENDING----owc---->CLOSING/PENDING--owcL----+  |
+       |   \  /    |                        |                    ^     |                 |  |
+      owc   \/    owc                      irf                  /     irf                |  |
+       |    /\     |                        |                  /       |                 |  |
+       |   /  \    V                        |                 /        |                 V  V
+       | irf  OPEN/ASYNC------last----------|----------------+         |             CLOSED/ASYNC
+       |   \                                |                          |                 ^  ^
+        \   \                               |                          |                 |  |
+         \   \                              |                          |                 |  |
+          \   v                             v                          v                 |  |
+           +--OPEN/UNREADY----last---->CLOSE/UNREADY----owc----->CLOSING/UNREADY--owcL---+  |
+                          \                         \                                       |
+                           +--owcL------------------->--owcL--------------------------------+
+
+      swl  : setWriteListener
+      w    : write
+      owc  : onWriteComplete last == false
+      owcL : onWriteComplete last == true
+      irf  : isReady() == false
+      irt  : isReady() == true
+      last : close() or complete(Callback) or write of known last content
+     
+ */ + enum ApiState + { + BLOCKING, // Open in blocking mode + BLOCKED, // Blocked in blocking operation ASYNC, // Open in async mode READY, // isReady() has returned true PENDING, // write operating in progress UNREADY, // write operating in progress, isReady has returned false - ERROR, // An error has occured - CLOSING, // Asynchronous close in progress - CLOSED // Closed } /** @@ -153,9 +189,12 @@ default void resetBuffer() throws IllegalStateException private static Logger LOG = Log.getLogger(HttpOutput.class); private static final ThreadLocal _encoder = new ThreadLocal<>(); - private final AtomicReference _state = new AtomicReference<>(State.OPEN); private final HttpChannel _channel; + private final HttpChannelState _channelState; private final SharedBlockingCallback _writeBlocker; + private ApiState _apiState = ApiState.BLOCKING; + private State _state = State.OPEN; + private boolean _softClose = false; private Interceptor _interceptor; private long _written; private long _flushed; @@ -165,11 +204,12 @@ default void resetBuffer() throws IllegalStateException private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; - private Callback _closeCallback; + private Callback _closedCallback; public HttpOutput(HttpChannel channel) { _channel = channel; + _channelState = channel.getState(); _interceptor = channel; _writeBlocker = new WriteBlocker(channel); HttpConfiguration config = channel.getHttpConfiguration(); @@ -209,18 +249,10 @@ public long getWritten() public void reopen() { - _state.set(State.OPEN); - } - - private boolean isLastContentToWrite(int len) - { - _written += len; - return _channel.getResponse().isAllContentWritten(_written); - } - - public boolean isAllContentWritten() - { - return _channel.getResponse().isAllContentWritten(_written); + synchronized (_channelState) + { + _softClose = false; + } } protected Blocker acquireWriteBlockingCallback() throws IOException @@ -228,27 +260,16 @@ protected Blocker acquireWriteBlockingCallback() throws IOException return _writeBlocker.acquire(); } - private void write(ByteBuffer content, boolean complete) throws IOException + private void channelWrite(ByteBuffer content, boolean complete) throws IOException { try (Blocker blocker = _writeBlocker.acquire()) { - write(content, complete, blocker); + channelWrite(content, complete, blocker); blocker.block(); - if (complete) - closed(); - } - catch (Exception failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - if (failure instanceof IOException) - throw failure; - throw new IOException(failure); } } - protected void write(ByteBuffer content, boolean complete, Callback callback) + private void channelWrite(ByteBuffer content, boolean last, Callback callback) { if (_firstByteTimeStamp == -1) { @@ -258,175 +279,321 @@ protected void write(ByteBuffer content, boolean complete, Callback callback) else _firstByteTimeStamp = Long.MAX_VALUE; } - _interceptor.write(content, complete, callback); - } - private void abort(Throwable failure) - { - closed(); - _channel.abort(failure); + _interceptor.write(content, last, callback); } - public void closedBySendError() + private void onWriteComplete(boolean last, Throwable failure) { - while (true) + String state = null; + boolean wake = false; + Callback closedCallback = null; + ByteBuffer closeContent = null; + synchronized (_channelState) { - State state = _state.get(); - switch (state) - { - case OPEN: - case READY: - case ASYNC: - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - return; + if (LOG.isDebugEnabled()) + state = stateString(); - default: - throw new IllegalStateException(state.toString()); + // Transition to CLOSED state if we were the last write or we have failed + if (last || failure != null) + { + _state = State.CLOSED; + closedCallback = _closedCallback; + _closedCallback = null; + releaseBuffer(); + wake = updateApiState(failure); + } + else if (_state == State.CLOSE) + { + // Somebody called close or complete while we were writing. + // We can now send a (probably empty) last buffer and then when it completes + // onWriteCompletion will be called again to actually execute the _completeCallback + _state = State.CLOSING; + closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + } + else + { + wake = updateApiState(null); } } - } - public void close(Closeable wrapper, Callback callback) - { - _closeCallback = callback; + if (LOG.isDebugEnabled()) + LOG.debug("onWriteComplete({},{}) {}->{} c={} cb={} w={}", + last, failure, state, stateString(), BufferUtil.toDetailString(closeContent), closedCallback, wake); + try { - if (wrapper != null) - wrapper.close(); - if (!isClosed()) - close(); + if (failure != null) + _channel.abort(failure); + + if (closedCallback != null) + { + if (failure == null) + closedCallback.succeeded(); + else + closedCallback.failed(failure); + } + else if (closeContent != null) + { + channelWrite(closeContent, true, new WriteCompleteCB()); + } } - catch (Throwable th) + finally { - closed(); - if (_closeCallback == null) - LOG.ignore(th); - else - callback.failed(th); + if (wake) + _channel.execute(_channel); // TODO review in jetty-10 if execute is needed } - finally + } + + private boolean updateApiState(Throwable failure) + { + boolean wake = false; + switch (_apiState) { - if (_closeCallback != null) - callback.succeeded(); - _closeCallback = null; + case BLOCKED: + _apiState = ApiState.BLOCKING; + break; + + case PENDING: + _apiState = ApiState.ASYNC; + if (failure != null) + { + _onError = failure; + wake = _channelState.onWritePossible(); + } + break; + + case UNREADY: + _apiState = ApiState.READY; + if (failure != null) + _onError = failure; + wake = _channelState.onWritePossible(); + break; + + default: + if (_state == State.CLOSED) + break; + throw new IllegalStateException(stateString()); } + return wake; } - @Override - public void close() + public void softClose() { - Callback closeCallback = _closeCallback == null ? BLOCKING_CLOSE_CALLBACK : _closeCallback; + synchronized (_channelState) + { + _softClose = true; + } + } - while (true) + public void complete(Callback callback) + { + // This method is invoked for the COMPLETE action handling in + // HttpChannel.handle. The callback passed typically will call completed + // to finish the request cycle and so may need to asynchronously wait for: + // a pending/blocked operation to finish and then either an async close or + // wait for an application close to complete. + boolean succeeded = false; + Throwable error = null; + ByteBuffer content = null; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + switch (_state) { - case CLOSING: case CLOSED: - { - _closeCallback = null; - closeCallback.succeeded(); - return; - } - case ASYNC: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Thus we simulate a call to isReady here, by - // trying to move to READY state. Either way we continue. - _state.compareAndSet(state, State.READY); - continue; - } - case UNREADY: - case PENDING: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Because the prior write has not yet completed - // and/or isReady has not been called, this close is allowed, but will - // abort the response. - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - IOException ex = new IOException("Closed while Pending/Unready"); - LOG.warn(ex.toString()); - LOG.debug(ex); - abort(ex); - _closeCallback = null; - closeCallback.failed(ex); - return; - } - default: - { - if (!_state.compareAndSet(state, State.CLOSING)) - continue; + succeeded = true; + break; + + case CLOSE: + case CLOSING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; - // Do a normal close by writing the aggregate buffer or an empty buffer. If we are - // not including, then indicate this is the last write. - try + case OPEN: + if (_onError != null) { - ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - if (closeCallback == BLOCKING_CLOSE_CALLBACK) - { - // Do a blocking close - write(content, !_channel.getResponse().isIncluding()); - _closeCallback = null; - closeCallback.succeeded(); - } - else - { - _closeCallback = null; - write(content, !_channel.getResponse().isIncluding(), closeCallback); - } + error = _onError; + break; } - catch (IOException x) + + _closedCallback = Callback.combine(_closedCallback, callback); + + switch (_apiState) { - LOG.ignore(x); // Ignore it, it's been already logged in write(). - _closeCallback = null; - closeCallback.failed(x); + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case BLOCKED: + case UNREADY: + case PENDING: + // An operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; } - return; - } + break; } } + + if (LOG.isDebugEnabled()) + LOG.debug("complete({}) {} s={} e={}, c={}", callback, stateString(), succeeded, error, BufferUtil.toDetailString(content)); + + if (succeeded) + { + callback.succeeded(); + return; + } + + if (error != null) + { + callback.failed(error); + return; + } + + if (content != null) + channelWrite(content, true, new WriteCompleteCB()); } /** - * Called to indicate that the last write has been performed. - * It updates the state and performs cleanup operations. + * Called to indicate that the request cycle has been completed. */ - public void closed() + public void completed() { - while (true) + synchronized (_channelState) { - State state = _state.get(); - switch (state) + _state = State.CLOSED; + } + releaseBuffer(); + } + + @Override + public void close() throws IOException + { + ByteBuffer content = null; + Blocker blocker = null; + synchronized (_channelState) + { + if (_onError != null) + { + if (_onError instanceof IOException) + throw (IOException)_onError; + if (_onError instanceof RuntimeException) + throw (RuntimeException)_onError; + if (_onError instanceof Error) + throw (Error)_onError; + throw new IOException(_onError); + } + + switch (_state) { case CLOSED: - { - return; - } - case UNREADY: - { - if (_state.compareAndSet(state, State.ERROR)) + break; + + case CLOSE: + case CLOSING: + switch (_apiState) + { + case BLOCKING: + case BLOCKED: + // block until CLOSED state reached. + blocker = _writeBlocker.acquire(); + _closedCallback = Callback.combine(_closedCallback, blocker); + break; + + default: + // async close with no callback, so nothing to do + break; + } + break; + + case OPEN: + switch (_apiState) { - if (_onError == null) - _onError = new EofException("Async closed"); - releaseBuffer(); - return; + case BLOCKING: + // Output is idle blocking state, but we still do an async close + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + blocker = _writeBlocker.acquire(); + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case BLOCKED: + // An blocking operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + // and block until it is complete + blocker = _writeBlocker.acquire(); + _closedCallback = Callback.combine(_closedCallback, blocker); + break; + + case ASYNC: + case READY: + // Output is idle in async state, so we can do an async close + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case UNREADY: + case PENDING: + // An async operation is in progress, so we soft close now + _softClose = true; + // then trigger a close from onWriteComplete + _state = State.CLOSE; + break; } break; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("close() {} c={} b={}", stateString(), BufferUtil.toDetailString(content), blocker); + + if (content == null) + { + if (blocker == null) + // nothing to do or block for. + return; + + // Just wait for some other close to finish. + try (Blocker b = blocker) + { + b.block(); + } + } + else + { + if (blocker == null) + { + // Do an async close + channelWrite(content, true, new WriteCompleteCB()); + } + else + { + // Do a blocking close + try (Blocker b = blocker) + { + channelWrite(content, true, blocker); + b.block(); + onWriteComplete(true, null); } - default: + catch (Throwable t) { - if (!_state.compareAndSet(state, State.CLOSED)) - break; - - releaseBuffer(); - return; + onWriteComplete(true, t); } } } @@ -455,305 +622,366 @@ private void releaseBuffer() public boolean isClosed() { - switch (_state.get()) + synchronized (_channelState) { - case CLOSING: - case CLOSED: - return true; - default: - return false; + return _softClose || (_state != State.OPEN); } } public boolean isAsync() { - switch (_state.get()) + synchronized (_channelState) { - case ASYNC: - case READY: - case PENDING: - case UNREADY: - return true; - default: - return false; + switch (_apiState) + { + case ASYNC: + case READY: + case PENDING: + case UNREADY: + return true; + default: + return false; + } } } @Override public void flush() throws IOException { - while (true) + ByteBuffer content = null; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + switch (_state) { - case OPEN: - write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); + case CLOSED: + case CLOSING: return; - case ASYNC: - throw new IllegalStateException("isReady() not called"); + default: + { + switch (_apiState) + { + case BLOCKING: + _apiState = ApiState.BLOCKED; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; - case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - new AsyncFlush().iterate(); - return; + case ASYNC: + case PENDING: + throw new IllegalStateException("isReady() not called: " + stateString()); - case UNREADY: - throw new WritePendingException(); + case READY: + _apiState = ApiState.PENDING; + break; - case ERROR: - throw new EofException(_onError); + case UNREADY: + throw new WritePendingException(); - case PENDING: - case CLOSING: - case CLOSED: - return; + default: + throw new IllegalStateException(stateString()); + } + } + } + } - default: - throw new IllegalStateException(state.toString()); + if (content == null) + new AsyncFlush(false).iterate(); + else + { + try + { + channelWrite(content, false); + onWriteComplete(false, null); + } + catch (Throwable t) + { + onWriteComplete(false, t); + throw t; } } } + private void checkWritable() throws EofException + { + if (_softClose) + throw new EofException("Closed"); + + switch (_state) + { + case CLOSED: + case CLOSING: + throw new EofException("Closed"); + + default: + break; + } + + if (_onError != null) + throw new EofException(_onError); + } + @Override public void write(byte[] b, int off, int len) throws IOException { + boolean last; + boolean aggregate; + boolean flush; + // Async or Blocking ? - while (true) + boolean async; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + checkWritable(); + long written = _written + len; + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + last = _channel.getResponse().isAllContentWritten(written); + // Write will be aggregated if: + // + it is smaller than the commitSize + // + is not the last one, or is last but will fit in an already allocated aggregate buffer. + aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); + flush = last || !aggregate || len >= space; + + if (last && _state == State.OPEN) + _state = State.CLOSING; + + switch (_apiState) { - case OPEN: - // process blocking below + case BLOCKING: + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; + async = false; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Should we aggregate? - boolean last = isLastContentToWrite(len); - if (!last && len <= _commitSize) - { - acquireBuffer(); - - // YES - fill the aggregate with content from the buffer - int filled = BufferUtil.fill(_aggregate, b, off, len); - - // return if we are not complete, not full and filled all the content - if (filled == len && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(_state.get().toString()); - return; - } - - // adjust offset/length - off += filled; - len -= filled; - } - - // Do the asynchronous writing from the callback - new AsyncWrite(b, off, len, last).iterate(); - return; + async = true; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; + break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); - default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(stateString()); } - break; - } - // handle blocking write + _written = written; - // Should we aggregate? - // Yes - if the write is smaller than the commitSize (==aggregate buffer size) - // and the write is not the last one, or is last but will fit in an already allocated aggregate buffer. - boolean last = isLastContentToWrite(len); - if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate))) - { - acquireBuffer(); - - // YES - fill the aggregate with content from the buffer - int filled = BufferUtil.fill(_aggregate, b, off, len); + // Should we aggregate? + if (aggregate) + { + acquireBuffer(); + int filled = BufferUtil.fill(_aggregate, b, off, len); - // return if we are not the last write and have aggregated all of the content - if (!last && filled == len && !BufferUtil.isFull(_aggregate)) - return; + // return if we are not complete, not full and filled all the content + if (!flush) + return; - // adjust offset/length - off += filled; - len -= filled; + // adjust offset/length + off += filled; + len -= filled; + } } - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) + if (async) { - write(_aggregate, last && len == 0); + // Do the asynchronous writing from the callback + new AsyncWrite(b, off, len, last).iterate(); + return; + } - // should we fill aggregate again from the buffer? - if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + // Blocking write + try + { + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) { - BufferUtil.append(_aggregate, b, off, len); - return; + channelWrite(_aggregate, last && len == 0); + + // should we fill aggregate again from the buffer? + if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + { + BufferUtil.append(_aggregate, b, off, len); + return; + } } - } - // write any remaining content in the buffer directly - if (len > 0) - { - // write a buffer capacity at a time to avoid JVM pooling large direct buffers - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 - ByteBuffer view = ByteBuffer.wrap(b, off, len); - while (len > getBufferSize()) + // write any remaining content in the buffer directly + if (len > 0) { - int p = view.position(); - int l = p + getBufferSize(); - view.limit(p + getBufferSize()); - write(view, false); - len -= getBufferSize(); - view.limit(l + Math.min(len, getBufferSize())); - view.position(l); + // write a buffer capacity at a time to avoid JVM pooling large direct buffers + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 + ByteBuffer view = ByteBuffer.wrap(b, off, len); + + while (len > getBufferSize()) + { + int p = view.position(); + int l = p + getBufferSize(); + view.limit(l); + channelWrite(view, false); + view.limit(p + len); + view.position(l); + len -= getBufferSize(); + } + channelWrite(view, last); } - write(view, last); + else if (last) + { + channelWrite(BufferUtil.EMPTY_BUFFER, true); + } + + onWriteComplete(last, null); } - else if (last) + catch (Throwable t) { - write(BufferUtil.EMPTY_BUFFER, true); + onWriteComplete(last, t); + throw t; } } public void write(ByteBuffer buffer) throws IOException { // This write always bypasses aggregate buffer + int len = BufferUtil.length(buffer); + boolean flush; + boolean last; // Async or Blocking ? - while (true) + boolean async; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + checkWritable(); + long written = _written + len; + last = _channel.getResponse().isAllContentWritten(_written); + flush = last || len > 0 || BufferUtil.hasContent(_aggregate); + + if (last && _state == State.OPEN) + _state = State.CLOSING; + + switch (_apiState) { - case OPEN: - // process blocking below + case BLOCKING: + async = false; + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Do the asynchronous writing from the callback - boolean last = isLastContentToWrite(buffer.remaining()); - new AsyncWrite(buffer, last).iterate(); - return; + async = true; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; + break; case PENDING: - case UNREADY: - throw new WritePendingException(); - - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + case UNREADY: + throw new WritePendingException(); default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(stateString()); } - break; + _written = written; } - // handle blocking write - int len = BufferUtil.length(buffer); - boolean last = isLastContentToWrite(len); - - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - write(_aggregate, last && len == 0); + if (!flush) + return; - // write any remaining content in the buffer directly - if (len > 0) - write(buffer, last); - else if (last) - write(BufferUtil.EMPTY_BUFFER, true); + if (async) + { + new AsyncWrite(buffer, last).iterate(); + } + else + { + try + { + // Blocking write + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + channelWrite(_aggregate, last && len == 0); + + // write any remaining content in the buffer directly + if (len > 0) + channelWrite(buffer, last); + else if (last) + channelWrite(BufferUtil.EMPTY_BUFFER, true); + + onWriteComplete(last, null); + } + catch (Throwable t) + { + onWriteComplete(last, t); + throw t; + } + } } @Override public void write(int b) throws IOException { - _written += 1; - boolean complete = _channel.getResponse().isAllContentWritten(_written); - + boolean flush; + boolean last; // Async or Blocking ? - while (true) + + boolean async = false; + synchronized (_channelState) { - switch (_state.get()) - { - case OPEN: - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); + checkWritable(); + long written = _written + 1; + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + last = _channel.getResponse().isAllContentWritten(written); + flush = last || space == 1; - // Check if all written or full - if (complete || BufferUtil.isFull(_aggregate)) - write(_aggregate, complete); + if (last && _state == State.OPEN) + _state = State.CLOSING; + + switch (_apiState) + { + case BLOCKING: + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: - if (!_state.compareAndSet(State.READY, State.PENDING)) - continue; - - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); - - // Check if all written or full - if (!complete && !BufferUtil.isFull(_aggregate)) - { - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - throw new IllegalStateException(); - return; - } - - // Do the asynchronous writing from the callback - new AsyncFlush().iterate(); - return; + async = true; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; + break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); + default: + throw new IllegalStateException(stateString()); + } + _written = written; + + acquireBuffer(); + BufferUtil.append(_aggregate, (byte)b); + } - case CLOSING: - case CLOSED: - throw new EofException("Closed"); + // Check if all written or full + if (!flush) + return; - default: - throw new IllegalStateException(); + if (async) + // Do the asynchronous writing from the callback + new AsyncFlush(last).iterate(); + else + { + try + { + channelWrite(_aggregate, last); + onWriteComplete(last, null); + } + catch (Throwable t) + { + onWriteComplete(last, t); + throw t; } - break; } } @@ -883,7 +1111,7 @@ public void sendContent(ByteBuffer content) throws IOException LOG.debug("sendContent({})", BufferUtil.toDetailString(content)); _written += content.remaining(); - write(content, true); + channelWrite(content, true); } /** @@ -899,13 +1127,6 @@ public void sendContent(InputStream in) throws IOException new InputStreamWritingCB(in, blocker).iterate(); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -921,13 +1142,6 @@ public void sendContent(ReadableByteChannel in) throws IOException new ReadableByteChannelWritingCB(in, blocker).iterate(); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -943,13 +1157,6 @@ public void sendContent(HttpContent content) throws IOException sendContent(content, blocker); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -963,23 +1170,24 @@ public void sendContent(ByteBuffer content, final Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); - _written += content.remaining(); - write(content, true, new Callback.Nested(callback) - { - @Override - public void succeeded() - { - closed(); - super.succeeded(); - } + if (prepareSendContent(content.remaining(), callback)) + channelWrite(content, true, + new Callback.Nested(callback) + { + @Override + public void succeeded() + { + onWriteComplete(true, null); + super.succeeded(); + } - @Override - public void failed(Throwable x) - { - abort(x); - super.failed(x); - } - }); + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + super.failed(x); + } + }); } /** @@ -994,7 +1202,8 @@ public void sendContent(InputStream in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(stream={},{})", in, callback); - new InputStreamWritingCB(in, callback).iterate(); + if (prepareSendContent(0, callback)) + new InputStreamWritingCB(in, callback).iterate(); } /** @@ -1009,54 +1218,62 @@ public void sendContent(ReadableByteChannel in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(channel={},{})", in, callback); - new ReadableByteChannelWritingCB(in, callback).iterate(); + if (prepareSendContent(0, callback)) + new ReadableByteChannelWritingCB(in, callback).iterate(); } - /** - * Asynchronous send of HTTP content. - * - * @param httpContent The HTTP content to send - * @param callback The callback to use to notify success or failure - */ - public void sendContent(HttpContent httpContent, Callback callback) + private boolean prepareSendContent(int len, Callback callback) { - if (LOG.isDebugEnabled()) - LOG.debug("sendContent(http={},{})", httpContent, callback); - - if (BufferUtil.hasContent(_aggregate)) - { - callback.failed(new IOException("cannot sendContent() after write()")); - return; - } - if (_channel.isCommitted()) - { - callback.failed(new IOException("cannot sendContent(), output already committed")); - return; - } - - while (true) + synchronized (_channelState) { - switch (_state.get()) + if (BufferUtil.hasContent(_aggregate)) { - case OPEN: - if (!_state.compareAndSet(State.OPEN, State.PENDING)) - continue; - break; - - case ERROR: - callback.failed(new EofException(_onError)); - return; + callback.failed(new IOException("cannot sendContent() after write()")); + return false; + } + if (_channel.isCommitted()) + { + callback.failed(new IOException("cannot sendContent(), output already committed")); + return false; + } - case CLOSING: + switch (_state) + { case CLOSED: + case CLOSING: callback.failed(new EofException("Closed")); - return; + return false; default: - throw new IllegalStateException(); + _state = State.CLOSING; + break; + } + + if (_onError != null) + { + callback.failed(_onError); + return false; } - break; + + if (_apiState != ApiState.BLOCKING) + throw new IllegalStateException(stateString()); + _apiState = ApiState.PENDING; + if (len > 0) + _written += len; + return true; } + } + + /** + * Asynchronous send of HTTP content. + * + * @param httpContent The HTTP content to send + * @param callback The callback to use to notify success or failure + */ + public void sendContent(HttpContent httpContent, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("sendContent(http={},{})", httpContent, callback); ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; if (buffer == null) @@ -1068,30 +1285,40 @@ public void sendContent(HttpContent httpContent, Callback callback) return; } + ReadableByteChannel rbc = null; try { - ReadableByteChannel rbc = httpContent.getReadableByteChannel(); - if (rbc != null) - { - // Close of the rbc is done by the async sendContent - sendContent(rbc, callback); - return; - } - - InputStream in = httpContent.getInputStream(); - if (in != null) - { - sendContent(in, callback); - return; - } + rbc = httpContent.getReadableByteChannel(); + } + catch (Throwable x) + { + LOG.debug(x); + } + if (rbc != null) + { + // Close of the rbc is done by the async sendContent + sendContent(rbc, callback); + return; + } - throw new IllegalArgumentException("unknown content for " + httpContent); + InputStream in = null; + try + { + in = httpContent.getInputStream(); + } + catch (Throwable x) + { + LOG.debug(x); } - catch (Throwable cause) + if (in != null) { - abort(cause); - callback.failed(cause); + sendContent(in, callback); + return; } + + Throwable cause = new IllegalArgumentException("unknown content for " + httpContent); + _channel.abort(cause); + callback.failed(cause); } public int getBufferSize() @@ -1135,20 +1362,25 @@ public void onFlushed(long bytes) throws IOException public void recycle() { - _interceptor = _channel; - HttpConfiguration config = _channel.getHttpConfiguration(); - _bufferSize = config.getOutputBufferSize(); - _commitSize = config.getOutputAggregationSize(); - if (_commitSize > _bufferSize) - _commitSize = _bufferSize; - releaseBuffer(); - _written = 0; - _writeListener = null; - _onError = null; - _firstByteTimeStamp = -1; - _flushed = 0; - _closeCallback = null; - reopen(); + synchronized (_channelState) + { + _state = State.OPEN; + _apiState = ApiState.BLOCKING; + _softClose = false; + _interceptor = _channel; + HttpConfiguration config = _channel.getHttpConfiguration(); + _bufferSize = config.getOutputBufferSize(); + _commitSize = config.getOutputAggregationSize(); + if (_commitSize > _bufferSize) + _commitSize = _bufferSize; + releaseBuffer(); + _written = 0; + _writeListener = null; + _onError = null; + _firstByteTimeStamp = -1; + _flushed = 0; + _closedCallback = null; + } } public void resetBuffer() @@ -1163,47 +1395,45 @@ public void resetBuffer() public void setWriteListener(WriteListener writeListener) { if (!_channel.getState().isAsync()) - throw new IllegalStateException("!ASYNC"); - - if (_state.compareAndSet(State.OPEN, State.READY)) + throw new IllegalStateException("!ASYNC: " + stateString()); + boolean wake; + synchronized (_channelState) { + if (_apiState != ApiState.BLOCKING) + throw new IllegalStateException("!OPEN" + stateString()); + _apiState = ApiState.READY; _writeListener = writeListener; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + wake = _channel.getState().onWritePossible(); } - else - throw new IllegalStateException(); + if (wake) + _channel.execute(_channel); } @Override public boolean isReady() { - while (true) + synchronized (_channelState) { - switch (_state.get()) + switch (_apiState) { - case OPEN: + case BLOCKING: case READY: - case ERROR: - case CLOSING: - case CLOSED: return true; case ASYNC: - if (!_state.compareAndSet(State.ASYNC, State.READY)) - continue; + _apiState = ApiState.READY; return true; case PENDING: - if (!_state.compareAndSet(State.PENDING, State.UNREADY)) - continue; + _apiState = ApiState.UNREADY; return false; + case BLOCKED: case UNREADY: return false; default: - throw new IllegalStateException(); + throw new IllegalStateException(stateString()); } } } @@ -1211,84 +1441,67 @@ public boolean isReady() @Override public void run() { - while (true) - { - State state = _state.get(); + Throwable error = null; + synchronized (_channelState) + { if (_onError != null) { - switch (state) - { - case CLOSING: - case CLOSED: - case ERROR: - { - _onError = null; - return; - } - default: - { - if (_state.compareAndSet(state, State.ERROR)) - { - Throwable th = _onError; - _onError = null; - if (LOG.isDebugEnabled()) - LOG.debug("onError", th); - - try - { - _writeListener.onError(th); - } - finally - { - IO.close(this); - } - - return; - } - } - } - continue; + error = _onError; + _onError = null; } + } - // We do not check the state here. Strictly speaking the state should - // always be READY when run is called. However, other async threads or - // a prior call by this thread to onDataAvailable may have called write - // after onWritePossible was called, so the state could be any of the - // write states. - // - // Even if the state is CLOSED, we need to call onWritePossible to tell - // async producer that the last write completed. - // - // We have to trust the scheduling of this run was done - // for good reason, that is protected correctly by HttpChannelState and - // that implementations of onWritePossible will - // themselves check isReady(). If multiple threads are calling write, - // then they must either rely on only a single container thread being - // dispatched or perform their own mutual exclusion. - try + try + { + if (error == null) { + if (LOG.isDebugEnabled()) + LOG.debug("onWritePossible"); _writeListener.onWritePossible(); - break; - } - catch (Throwable e) - { - _onError = e; + return; } } + catch (Throwable t) + { + error = t; + } + try + { + if (LOG.isDebugEnabled()) + LOG.debug("onError", error); + _writeListener.onError(error); + } + catch (Throwable t) + { + if (LOG.isDebugEnabled()) + LOG.debug(t); + } + finally + { + IO.close(this); + } + } + + private String stateString() + { + return String.format("s=%s,api=%s,sc=%b,e=%s", _state, _apiState, _softClose, _onError); } @Override public String toString() { - return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get()); + synchronized (_channelState) + { + return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), stateString()); + } } - private abstract class AsyncICB extends IteratingCallback + private abstract class ChannelWriteCB extends IteratingCallback { final boolean _last; - AsyncICB(boolean last) + ChannelWriteCB(boolean last) { _last = last; } @@ -1302,67 +1515,81 @@ public InvocationType getInvocationType() @Override protected void onCompleteSuccess() { - while (true) - { - State last = _state.get(); - switch (last) - { - case PENDING: - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - continue; - break; + onWriteComplete(_last, null); + } - case UNREADY: - if (!_state.compareAndSet(State.UNREADY, State.READY)) - continue; - if (_last) - closed(); - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); - break; + @Override + public void onCompleteFailure(Throwable e) + { + onWriteComplete(_last, e); + } + } - case CLOSED: - break; + private abstract class NestedChannelWriteCB extends ChannelWriteCB + { + final Callback _callback; - default: - throw new IllegalStateException(); - } - break; + NestedChannelWriteCB(Callback callback, boolean last) + { + super(last); + _callback = callback; + } + + @Override + protected void onCompleteSuccess() + { + try + { + super.onCompleteSuccess(); + } + finally + { + _callback.succeeded(); } } @Override public void onCompleteFailure(Throwable e) { - _onError = e == null ? new IOException() : e; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + try + { + super.onCompleteFailure(e); + } + catch (Throwable t) + { + if (t != e) + e.addSuppressed(t); + } + finally + { + _callback.failed(e); + } } } - private class AsyncFlush extends AsyncICB + private class AsyncFlush extends ChannelWriteCB { - protected volatile boolean _flushed; + volatile boolean _flushed; - public AsyncFlush() + AsyncFlush(boolean last) { - super(false); + super(last); } @Override - protected Action process() + protected Action process() throws Exception { if (BufferUtil.hasContent(_aggregate)) { _flushed = true; - write(_aggregate, false, this); + channelWrite(_aggregate, false, this); return Action.SCHEDULED; } if (!_flushed) { _flushed = true; - write(BufferUtil.EMPTY_BUFFER, false, this); + channelWrite(BufferUtil.EMPTY_BUFFER, false, this); return Action.SCHEDULED; } @@ -1370,14 +1597,14 @@ protected Action process() } } - private class AsyncWrite extends AsyncICB + private class AsyncWrite extends ChannelWriteCB { private final ByteBuffer _buffer; private final ByteBuffer _slice; private final int _len; - protected volatile boolean _completed; + volatile boolean _completed; - public AsyncWrite(byte[] b, int off, int len, boolean last) + AsyncWrite(byte[] b, int off, int len, boolean last) { super(last); _buffer = ByteBuffer.wrap(b, off, len); @@ -1386,7 +1613,7 @@ public AsyncWrite(byte[] b, int off, int len, boolean last) _slice = _len < getBufferSize() ? null : _buffer.duplicate(); } - public AsyncWrite(ByteBuffer buffer, boolean last) + AsyncWrite(ByteBuffer buffer, boolean last) { super(last); _buffer = buffer; @@ -1401,13 +1628,13 @@ public AsyncWrite(ByteBuffer buffer, boolean last) } @Override - protected Action process() + protected Action process() throws Exception { // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { _completed = _len == 0; - write(_aggregate, _last && _completed, this); + channelWrite(_aggregate, _last && _completed, this); return Action.SCHEDULED; } @@ -1427,7 +1654,7 @@ protected Action process() if (_slice == null) { _completed = true; - write(_buffer, _last, this); + channelWrite(_buffer, _last, this); return Action.SCHEDULED; } @@ -1439,7 +1666,7 @@ protected Action process() _buffer.position(pl); _slice.position(p); _completed = !_buffer.hasRemaining(); - write(_slice, _last && _completed, this); + channelWrite(_slice, _last && _completed, this); return Action.SCHEDULED; } @@ -1448,12 +1675,13 @@ protected Action process() if (_last && !_completed) { _completed = true; - write(BufferUtil.EMPTY_BUFFER, true, this); + channelWrite(BufferUtil.EMPTY_BUFFER, true, this); return Action.SCHEDULED; } if (LOG.isDebugEnabled() && _completed) LOG.debug("EOF of {}", this); + return Action.SUCCEEDED; } } @@ -1466,15 +1694,16 @@ protected Action process() * be notified as each buffer is written and only once all the input is consumed will the * wrapped {@link Callback#succeeded()} method be called. */ - private class InputStreamWritingCB extends IteratingNestedCallback + private class InputStreamWritingCB extends NestedChannelWriteCB { private final InputStream _in; private final ByteBuffer _buffer; private boolean _eof; + private boolean _closed; - public InputStreamWritingCB(InputStream in, Callback callback) + InputStreamWritingCB(InputStream in, Callback callback) { - super(callback); + super(callback, true); _in = in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false); } @@ -1488,10 +1717,13 @@ protected Action process() throws Exception { if (LOG.isDebugEnabled()) LOG.debug("EOF of {}", this); - // Handle EOF - _in.close(); - closed(); - _channel.getByteBufferPool().release(_buffer); + if (!_closed) + { + _closed = true; + _channel.getByteBufferPool().release(_buffer); + IO.close(_in); + } + return Action.SUCCEEDED; } @@ -1510,17 +1742,21 @@ protected Action process() throws Exception _buffer.position(0); _buffer.limit(len); _written += len; - write(_buffer, _eof, this); + channelWrite(_buffer, _eof, this); return Action.SCHEDULED; } @Override public void onCompleteFailure(Throwable x) { - abort(x); - _channel.getByteBufferPool().release(_buffer); - IO.close(_in); - super.onCompleteFailure(x); + try + { + _channel.getByteBufferPool().release(_buffer); + } + finally + { + super.onCompleteFailure(x); + } } } @@ -1533,15 +1769,16 @@ public void onCompleteFailure(Throwable x) * be notified as each buffer is written and only once all the input is consumed will the * wrapped {@link Callback#succeeded()} method be called. */ - private class ReadableByteChannelWritingCB extends IteratingNestedCallback + private class ReadableByteChannelWritingCB extends NestedChannelWriteCB { private final ReadableByteChannel _in; private final ByteBuffer _buffer; private boolean _eof; + private boolean _closed; - public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) + ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { - super(callback); + super(callback, true); _in = in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); } @@ -1555,9 +1792,12 @@ protected Action process() throws Exception { if (LOG.isDebugEnabled()) LOG.debug("EOF of {}", this); - _in.close(); - closed(); - _channel.getByteBufferPool().release(_buffer); + if (!_closed) + { + _closed = true; + _channel.getByteBufferPool().release(_buffer); + IO.close(_in); + } return Action.SUCCEEDED; } @@ -1571,7 +1811,7 @@ protected Action process() throws Exception // write what we have BufferUtil.flipToFlush(_buffer, 0); _written += _buffer.remaining(); - write(_buffer, _eof, this); + channelWrite(_buffer, _eof, this); return Action.SCHEDULED; } @@ -1579,7 +1819,6 @@ protected Action process() throws Exception @Override public void onCompleteFailure(Throwable x) { - abort(x); _channel.getByteBufferPool().release(_buffer); IO.close(_in); super.onCompleteFailure(x); @@ -1604,4 +1843,35 @@ protected long getIdleTimeout() return blockingTimeout; } } + + private class WriteCompleteCB implements Callback + { + final Callback _callback; + + WriteCompleteCB() + { + this(null); + } + + WriteCompleteCB(Callback callback) + { + _callback = callback; + } + + @Override + public void succeeded() + { + onWriteComplete(true, null); + if (_callback != null) + _callback.succeeded(); + } + + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + if (_callback != null) + _callback.succeeded(); + } + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java index 20b42e207602..d225d89adbe0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpWriter.java @@ -22,13 +22,14 @@ import java.io.Writer; import org.eclipse.jetty.util.ByteArrayOutputStream2; +import org.eclipse.jetty.util.Callback; /** * */ public abstract class HttpWriter extends Writer { - public static final int MAX_OUTPUT_CHARS = 512; + public static final int MAX_OUTPUT_CHARS = 512; // TODO should this be configurable? super size is 1024 final HttpOutput _out; final ByteArrayOutputStream2 _bytes; @@ -38,7 +39,7 @@ public HttpWriter(HttpOutput out) { _out = out; _chars = new char[MAX_OUTPUT_CHARS]; - _bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); + _bytes = new ByteArrayOutputStream2(MAX_OUTPUT_CHARS); // TODO should this be pooled - or do we just recycle the writer? } @Override @@ -47,6 +48,11 @@ public void close() throws IOException _out.close(); } + public void complete(Callback callback) + { + _out.complete(callback); + } + @Override public void flush() throws IOException { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java index 1c8c9b444df8..c755296ea2af 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Iso88591HttpWriter.java @@ -35,11 +35,6 @@ public Iso88591HttpWriter(HttpOutput out) public void write(char[] s, int offset, int length) throws IOException { HttpOutput out = _out; - if (length == 0 && out.isAllContentWritten()) - { - close(); - return; - } if (length == 1) { @@ -51,7 +46,7 @@ public void write(char[] s, int offset, int length) throws IOException while (length > 0) { _bytes.reset(); - int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length; + int chars = Math.min(length, MAX_OUTPUT_CHARS); byte[] buffer = _bytes.getBuf(); int bytes = _bytes.getCount(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 60b5b10e8b68..027eed491d6d 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -145,10 +145,10 @@ public void reopen() _out.reopen(); } - public void closedBySendError() + public void softClose() { setErrorSent(true); - _out.closedBySendError(); + _out.softClose(); } /** @@ -496,7 +496,7 @@ public void sendRedirect(int code, String location) throws IOException resetBuffer(); setHeader(HttpHeader.LOCATION, location); setStatus(code); - closeOutput(); + completeOutput(); } @Override @@ -788,7 +788,7 @@ public void setContentLength(int len) { try { - closeOutput(); + completeOutput(); } catch (IOException e) { @@ -826,17 +826,20 @@ public boolean isContentComplete(long written) return (_contentLength < 0 || written >= _contentLength); } - public void closeOutput() throws IOException + public void completeOutput() throws IOException { if (_outputType == OutputType.WRITER) _writer.close(); - if (!_out.isClosed()) + else _out.close(); } - public void closeOutput(Callback callback) + public void completeOutput(Callback callback) { - _out.close((_outputType == OutputType.WRITER) ? _writer : _out, callback); + if (_outputType == OutputType.WRITER) + _writer.complete(callback); + else + _out.complete(callback); } public long getLongContentLength() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java index 094f98f1cbca..7837c9281844 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ResponseWriter.java @@ -27,6 +27,7 @@ import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -148,7 +149,7 @@ public void flush() out.flush(); } } - catch (IOException ex) + catch (Throwable ex) { setError(ex); } @@ -171,6 +172,15 @@ public void close() } } + public void complete(Callback callback) + { + synchronized (lock) + { + _isClosed = true; + } + _httpWriter.complete(callback); + } + @Override public void write(int c) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java index ba614f0ddabc..d8cc660dd65f 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Utf8HttpWriter.java @@ -42,16 +42,11 @@ public Utf8HttpWriter(HttpOutput out) public void write(char[] s, int offset, int length) throws IOException { HttpOutput out = _out; - if (length == 0 && out.isAllContentWritten()) - { - close(); - return; - } while (length > 0) { _bytes.reset(); - int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length; + int chars = Math.min(length, MAX_OUTPUT_CHARS); byte[] buffer = _bytes.getBuf(); int bytes = _bytes.getCount(); diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 376b9f33baa0..d4c95aec9f4b 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -18,7 +18,9 @@ package org.eclipse.jetty.server; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; @@ -27,7 +29,9 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -48,9 +52,12 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -65,14 +72,19 @@ */ public class AsyncCompletionTest extends HttpServerTestFixture { - private static final Exchanger X = new Exchanger<>(); - private static final AtomicBoolean COMPLETE = new AtomicBoolean(); + private static final int POLL = 10; // milliseconds + private static final int WAIT = 10; // seconds + private static final String SMALL = "Now is the time for all good men to come to the aid of the party. "; + private static final String LARGE = SMALL + SMALL + SMALL + SMALL + SMALL; + private static final int BUFFER_SIZE = SMALL.length() * 3 / 2; + private static final BlockingQueue __queue = new BlockingArrayQueue<>(); + private static final AtomicBoolean __transportComplete = new AtomicBoolean(); - private static class DelayedCallback extends Callback.Nested + private static class PendingCallback extends Callback.Nested { - private CompletableFuture _delay = new CompletableFuture<>(); + private CompletableFuture _pending = new CompletableFuture<>(); - public DelayedCallback(Callback callback) + public PendingCallback(Callback callback) { super(callback); } @@ -80,20 +92,20 @@ public DelayedCallback(Callback callback) @Override public void succeeded() { - _delay.complete(null); + _pending.complete(null); } @Override public void failed(Throwable x) { - _delay.completeExceptionally(x); + _pending.completeExceptionally(x); } public void proceed() { try { - _delay.get(10, TimeUnit.SECONDS); + _pending.get(WAIT, TimeUnit.SECONDS); getCallback().succeeded(); } catch (Throwable th) @@ -107,13 +119,15 @@ public void proceed() @BeforeEach public void init() throws Exception { - COMPLETE.set(false); + __transportComplete.set(false); startServer(new ServerConnector(_server, new HttpConnectionFactory() { @Override public Connection newConnection(Connector connector, EndPoint endPoint) { + getHttpConfiguration().setOutputBufferSize(BUFFER_SIZE); + getHttpConfiguration().setOutputAggregationSize(BUFFER_SIZE); return configure(new ExtendedHttpConnection(getHttpConfiguration(), connector, endPoint), connector, endPoint); } }) @@ -136,16 +150,9 @@ public ExtendedEndPoint(SocketChannel channel, ManagedSelector selector, Selecti @Override public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateException { - DelayedCallback delay = new DelayedCallback(callback); + PendingCallback delay = new PendingCallback(callback); super.write(delay, buffers); - try - { - X.exchange(delay); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + __queue.offer(delay); } } @@ -159,24 +166,44 @@ public ExtendedHttpConnection(HttpConfiguration config, Connector connector, End @Override public void onCompleted() { - COMPLETE.compareAndSet(false, true); + __transportComplete.compareAndSet(false, true); super.onCompleted(); } } - // Tests from here use these parameters - public static Stream tests() + enum WriteStyle + {ARRAY, BUFFER, BYTE, BYTE_THEN_ARRAY, PRINT} + + ; + + public static Stream asyncIOWriteTests() { List tests = new ArrayList<>(); - tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"}); - tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"}); - tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data}); + for (WriteStyle w : WriteStyle.values()) + { + for (boolean contentLength : new Boolean[]{true, false}) + { + for (boolean isReady : new Boolean[]{true, false}) + { + for (boolean flush : new Boolean[]{true, false}) + { + for (boolean close : new Boolean[]{true, false}) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new AsyncIOWriteHandler(w, contentLength, isReady, flush, close, data)}); + } + } + } + } + } + } return tests.stream().map(Arguments::of); } @ParameterizedTest - @MethodSource("tests") - public void testAsyncCompletion(Handler handler, int status, String message) throws Exception + @MethodSource("asyncIOWriteTests") + public void testAsyncIOWrite(AsyncIOWriteHandler handler) throws Exception { configureServer(handler); @@ -184,79 +211,242 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) { OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); // write the request os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); os.flush(); - // The write should happen but the callback is delayed - HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); - assertThat(response, Matchers.notNullValue()); - assertThat(response.getStatus(), is(status)); - String content = response.getContent(); - assertThat(content, containsString(message)); + // wait for OWP to execute (proves we do not block in write APIs) + boolean completeCalled = handler.waitForOWPExit(); - // Check that a thread is held busy in write - assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base)); + while (true) + { + // wait for threads to return to base level (proves we are really async) + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (_threadPool.getBusyThreads() != base) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + Thread.sleep(POLL); + } - // Getting the Delayed callback will free the thread - DelayedCallback delay = X.exchange(null, 10, TimeUnit.SECONDS); + if (completeCalled) + break; - // wait for threads to return to base level - long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); - while (_threadPool.getBusyThreads() != base) - { - if (System.nanoTime() > end) - throw new TimeoutException(); - Thread.sleep(10); - } + // We are now asynchronously waiting! + assertThat(__transportComplete.get(), is(false)); - // We are now asynchronously waiting! - assertThat(COMPLETE.get(), is(false)); + // If we are not complete, we must be waiting for one or more writes to complete + while (true) + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + { + delay.proceed(); + continue; + } + // No delay callback found, have we finished OWP again? + Boolean c = handler.pollForOWPExit(); - // proceed with the completion - delay.proceed(); + if (c == null) + // No we haven't, so look for another delay callback + continue; - while (!COMPLETE.get()) + // We have a OWP result, so let's handle it. + completeCalled = c; + break; + } + } + + // Wait for full completion + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) { if (System.nanoTime() > end) throw new TimeoutException(); - Thread.sleep(10); + + // proceed with any delayCBs needed for completion + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); } + + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); } } - private static class AsyncReadyCompleteHandler extends AbstractHandler + private static class AsyncIOWriteHandler extends AbstractHandler { - static String data = "Now is the time for all good men to come to the aid of the party"; + final WriteStyle _write; + final boolean _contentLength; + final boolean _isReady; + final boolean _flush; + final boolean _close; + final String _data; + final Exchanger _ready = new Exchanger<>(); + int _toWrite; + boolean _flushed; + boolean _closed; + + AsyncIOWriteHandler(WriteStyle write, boolean contentLength, boolean isReady, boolean flush, boolean close, String data) + { + _write = write; + _contentLength = contentLength; + _isReady = isReady; + _flush = flush; + _close = close; + _data = data; + _toWrite = data.length(); + } + + public String getExpectedMessage() + { + return SMALL; + } + + boolean waitForOWPExit() + { + try + { + return _ready.exchange(null); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + Boolean pollForOWPExit() + { + try + { + return _ready.exchange(null, POLL, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + return null; + } + } @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { + baseRequest.setHandled(true); AsyncContext context = request.startAsync(); ServletOutputStream out = response.getOutputStream(); + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + if (_contentLength) + response.setContentLength(bytes.length); + out.setWriteListener(new WriteListener() { - byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1); - @Override public void onWritePossible() throws IOException { - while (out.isReady()) + try { - if (bytes != null) - { - response.setContentType("text/plain"); - response.setContentLength(bytes.length); - out.write(bytes); - bytes = null; - } - else + if (out.isReady()) { + if (_toWrite > 0) + { + switch (_write) + { + case ARRAY: + _toWrite = 0; + out.write(bytes, 0, bytes.length); + break; + + case BUFFER: + _toWrite = 0; + ((HttpOutput)out).write(BufferUtil.toBuffer(bytes)); + break; + + case BYTE: + for (int i = bytes.length - _toWrite; i < bytes.length; i++) + { + _toWrite--; + out.write(bytes[i]); + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } + break; + + case BYTE_THEN_ARRAY: + _toWrite = 0; + out.write(bytes[0]); // This should always aggregate + assertThat(out.isReady(), is(true)); + out.write(bytes, 1, bytes.length - 1); + break; + + case PRINT: + _toWrite = 0; + out.print(_data); + break; + } + } + + if (_flush && !_flushed) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + _flushed = true; + out.flush(); + } + + if (_close && !_closed) + { + if (_isReady) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } + _closed = true; + out.close(); + } + + if (_isReady) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } context.complete(); - return; + _ready.exchange(Boolean.TRUE); } } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + finally + { + } } @Override @@ -266,5 +456,312 @@ public void onError(Throwable t) } }); } + + @Override + public String toString() + { + return String.format("AWCH{w=%s,cl=%b,ir=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _isReady, _flush, _close, _data.length()); + } + } + + public static Stream blockingWriteTests() + { + List tests = new ArrayList<>(); + for (WriteStyle w : WriteStyle.values()) + { + for (boolean contentLength : new Boolean[]{true, false}) + { + for (boolean flush : new Boolean[]{true, false}) + { + for (boolean close : new Boolean[]{true, false}) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new BlockingWriteHandler(w, contentLength, flush, close, data)}); + } + } + } + } + } + return tests.stream().map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("blockingWriteTests") + public void testBlockingWrite(BlockingWriteHandler handler) throws Exception + { + configureServer(handler); + + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); + + // write the request + os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + handler.wait4handle(); + + // Wait for full completion + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + + // proceed with any delayCBs needed for completion + try + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); + } + catch (Exception e) + { + // ignored + } + } + + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); + } + } + + private static class BlockingWriteHandler extends AbstractHandler + { + final WriteStyle _write; + final boolean _contentLength; + final boolean _flush; + final boolean _close; + final String _data; + final CountDownLatch _wait = new CountDownLatch(1); + + BlockingWriteHandler(WriteStyle write, boolean contentLength, boolean flush, boolean close, String data) + { + _write = write; + _contentLength = contentLength; + _flush = flush; + _close = close; + _data = data; + } + + public String getExpectedMessage() + { + return SMALL; + } + + public void wait4handle() + { + try + { + Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext context = request.startAsync(); + ServletOutputStream out = response.getOutputStream(); + + context.start(() -> + { + try + { + _wait.countDown(); + + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + if (_contentLength) + response.setContentLength(bytes.length); + + switch (_write) + { + case ARRAY: + out.write(bytes, 0, bytes.length); + break; + + case BUFFER: + ((HttpOutput)out).write(BufferUtil.toBuffer(bytes)); + break; + + case BYTE: + for (byte b : bytes) + { + out.write(b); + } + break; + + case BYTE_THEN_ARRAY: + out.write(bytes[0]); // This should always aggregate + out.write(bytes, 1, bytes.length - 1); + break; + + case PRINT: + out.print(_data); + break; + } + + if (_flush) + out.flush(); + + if (_close) + out.close(); + + context.complete(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); + } + + @Override + public String toString() + { + return String.format("BWCH{w=%s,cl=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _flush, _close, _data.length()); + } + } + + public static Stream sendContentTests() + { + List tests = new ArrayList<>(); + for (ContentStyle style : ContentStyle.values()) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new SendContentHandler(style, data)}); + } + } + return tests.stream().map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("sendContentTests") + public void testSendContent(SendContentHandler handler) throws Exception + { + configureServer(handler); + + int base = _threadPool.getBusyThreads(); + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); + + // write the request + os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + handler.wait4handle(); + + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (_threadPool.getBusyThreads() != base) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + Thread.sleep(POLL); + } + + // Wait for full completion + end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + + // proceed with any delayCBs needed for completion + try + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); + } + catch (Exception e) + { + // ignored + } + } + + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); + } + } + + enum ContentStyle + {BUFFER, STREAM} // TODO more types needed here + + private static class SendContentHandler extends AbstractHandler + { + final ContentStyle _style; + final String _data; + final CountDownLatch _wait = new CountDownLatch(1); + + SendContentHandler(ContentStyle style, String data) + { + _style = style; + _data = data; + } + + public String getExpectedMessage() + { + return SMALL; + } + + public void wait4handle() + { + try + { + Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + AsyncContext context = request.startAsync(); + HttpOutput out = (HttpOutput)response.getOutputStream(); + + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + + switch (_style) + { + case BUFFER: + out.sendContent(BufferUtil.toBuffer(bytes), Callback.from(context::complete)); + break; + + case STREAM: + out.sendContent(new ByteArrayInputStream(bytes), Callback.from(context::complete)); + break; + } + + _wait.countDown(); + } + + @Override + public String toString() + { + return String.format("SCCH{w=%s,d=%d}", _style, _data.length()); + } } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index 2584039a25ff..813e836e2b9c 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -33,6 +33,7 @@ import java.util.Arrays; import java.util.concurrent.Exchanger; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.AsyncContext; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; @@ -1020,13 +1021,22 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques @Test public void testPipeline() throws Exception { - configureServer(new HelloWorldHandler()); + AtomicInteger served = new AtomicInteger(); + configureServer(new HelloWorldHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + served.incrementAndGet(); + super.handle(target, baseRequest, request, response); + } + }); - //for (int pipeline=1;pipeline<32;pipeline++) - for (int pipeline = 1; pipeline < 32; pipeline++) + int pipeline = 64; { try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) { + served.set(0); client.setSoTimeout(5000); OutputStream os = client.getOutputStream(); @@ -1065,6 +1075,7 @@ public void testPipeline() throws Exception count++; line = in.readLine(); } + assertEquals(pipeline, served.get()); assertEquals(pipeline, count); } } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 8ce876ca377c..102ccad42977 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -291,6 +291,58 @@ public InvocationType getInvocationType() } } + interface InvocableCallback extends Invocable, Callback + { + } + + static Callback combine(Callback cb1, Callback cb2) + { + if (cb1 == null || cb1 == cb2) + return cb2; + if (cb2 == null) + return cb1; + + return new InvocableCallback() + { + @Override + public void succeeded() + { + try + { + cb1.succeeded(); + } + finally + { + cb2.succeeded(); + } + } + + @Override + public void failed(Throwable x) + { + try + { + cb1.failed(x); + } + catch (Throwable t) + { + if (x != t) + x.addSuppressed(t); + } + finally + { + cb2.failed(x); + } + } + + @Override + public InvocationType getInvocationType() + { + return Invocable.combine(Invocable.getInvocationType(cb1), Invocable.getInvocationType(cb2)); + } + }; + } + /** *

A CompletableFuture that is also a Callback.

*/ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 91e8e3ee13f5..620dccfe738e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -74,6 +74,20 @@ static void invokeNonBlocking(Runnable task) } } + static InvocationType combine(InvocationType it1, InvocationType it2) + { + if (it1 != null && it2 != null) + { + if (it1 == it2) + return it1; + if (it1 == InvocationType.EITHER) + return it2; + if (it2 == InvocationType.EITHER) + return it1; + } + return InvocationType.BLOCKING; + } + /** * Get the invocation type of an Object. *