diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 6d9f2179e9fc..f681e2dd1404 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -153,9 +153,10 @@ default void resetBuffer() throws IllegalStateException private static Logger LOG = Log.getLogger(HttpOutput.class); private static final ThreadLocal _encoder = new ThreadLocal<>(); - private final AtomicReference _state = new AtomicReference<>(State.OPEN); private final HttpChannel _channel; + private final HttpChannelState _channelState; private final SharedBlockingCallback _writeBlocker; + private State _state = State.OPEN; private Interceptor _interceptor; private long _written; private long _flushed; @@ -165,11 +166,13 @@ default void resetBuffer() throws IllegalStateException private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; + private Callback _completeCallback; private Callback _closeCallback; public HttpOutput(HttpChannel channel) { _channel = channel; + _channelState = channel.getState(); _interceptor = channel; _writeBlocker = new WriteBlocker(channel); HttpConfiguration config = channel.getHttpConfiguration(); @@ -209,7 +212,10 @@ public long getWritten() public void reopen() { - _state.set(State.OPEN); + synchronized (_channelState) + { + _state = State.OPEN; + } } private boolean isLastContentToWrite(int len) @@ -267,128 +273,123 @@ private void abort(Throwable failure) public void closedBySendError() { - while (true) + synchronized (_channelState) { - State state = _state.get(); - switch (state) + + switch (_state) { case OPEN: case READY: case ASYNC: - if (!_state.compareAndSet(state, State.CLOSED)) - continue; + _state = State.CLOSED; return; default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(_state.toString()); } } } - public void close(Closeable wrapper, Callback callback) + public void complete(Closeable wrapper, Callback callback) { - _closeCallback = callback; + if (wrapper == this || wrapper == null) + { + // If there is no wrapper, then complete is just an normal async close + close(callback); + return; + } + + // otherwise we must remember the callback and call the wrappers close. + synchronized (_channelState) + { + if (_completeCallback != null) + throw new IllegalStateException(); + _completeCallback = callback; + } + try { if (wrapper != null) wrapper.close(); - if (!isClosed()) - close(); } catch (Throwable th) { - closed(); - if (_closeCallback == null) - LOG.ignore(th); - else - callback.failed(th); + LOG.ignore(th); } - finally + + // Was our close method wasn't actually called, then do a normal async close + synchronized (_channelState) { - if (_closeCallback != null) - callback.succeeded(); - _closeCallback = null; + if (_completeCallback != null) + callback = null; } + if (callback != null) + close(callback); } - @Override - public void close() + public void close(Callback callback) { - Callback closeCallback = _closeCallback == null ? BLOCKING_CLOSE_CALLBACK : _closeCallback; - - while (true) + synchronized (_channelState) { - State state = _state.get(); - switch (state) + switch (_state) { - case CLOSING: case CLOSED: - case ERROR: - { - _closeCallback = null; - closeCallback.succeeded(); + callback.succeeded(); return; - } - case ASYNC: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Thus we simulate a call to isReady here, by - // trying to move to READY state. Either way we continue. - _state.compareAndSet(state, State.READY); - continue; - } - case UNREADY: - case PENDING: - { - // A close call implies a write operation, thus in asynchronous mode - // a call to isReady() that returned true should have been made. - // However it is desirable to allow a close at any time, specially if - // complete is called. Because the prior write has not yet completed - // and/or isReady has not been called, this close is allowed, but will - // abort the response. - if (!_state.compareAndSet(state, State.CLOSED)) - continue; - IOException ex = new IOException("Closed while Pending/Unready"); - LOG.warn(ex.toString()); - LOG.debug(ex); - abort(ex); - _closeCallback = null; - closeCallback.failed(ex); + + case CLOSING: + // TODO merge the callbacks return; - } - default: - { - if (!_state.compareAndSet(state, State.CLOSING)) - continue; - // Do a normal close by writing the aggregate buffer or an empty buffer. If we are - // not including, then indicate this is the last write. - try - { - ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - if (closeCallback == BLOCKING_CLOSE_CALLBACK) - { - // Do a blocking close - channelWrite(content, !_channel.getResponse().isIncluding()); - _closeCallback = null; - closeCallback.succeeded(); - } - else - { - _closeCallback = null; - channelWrite(content, !_channel.getResponse().isIncluding(), closeCallback); - } - } - catch (IOException x) - { - LOG.ignore(x); // Ignore it, it's been already logged in write(). - _closeCallback = null; - closeCallback.failed(x); - } + case ERROR: + callback.failed(_onError); + // TODO State change??? return; - } + + default: + _state = State.CLOSING; + } + } + + ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + channelWrite(content, !_channel.getResponse().isIncluding(), callback); + } + + @Override + public void close() throws IOException + { + Callback callback = null; + synchronized (_channelState) + { + if (_completeCallback != null) + { + // This is a completion close, + callback = _completeCallback; + _completeCallback = null; + } + // Else handle with blocking unless already closed. + else if (_state == State.CLOSED) + return; + } + + // This is a completion close, so we will handle without blocking. + if (callback != null) + { + close(callback); + } + else + { + try (Blocker blocker = _writeBlocker.acquire()) + { + close(blocker); + blocker.block(); + } + catch (Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug(failure); + abort(failure); + throw failure; } } } @@ -399,33 +400,26 @@ public void close() */ public void closed() { - while (true) + synchronized (_channelState) { - State state = _state.get(); - switch (state) + switch (_state) { case CLOSED: { - return; + break; } case UNREADY: { - if (_state.compareAndSet(state, State.ERROR)) - { - if (_onError == null) - _onError = new EofException("Async closed"); - releaseBuffer(); - return; - } + _state = State.ERROR; + if (_onError == null) + _onError = new EofException("Async closed"); + releaseBuffer(); break; } default: { - if (!_state.compareAndSet(state, State.CLOSED)) - break; - + _state = State.CLOSED; releaseBuffer(); - return; } } } @@ -454,37 +448,43 @@ private void releaseBuffer() public boolean isClosed() { - switch (_state.get()) + synchronized (_channelState) { - case CLOSING: - case CLOSED: - return true; - default: - return false; + switch (_state) + { + case CLOSING: + case CLOSED: + return true; + default: + return false; + } } } public boolean isAsync() { - switch (_state.get()) - { - case ASYNC: - case READY: - case PENDING: - case UNREADY: - return true; - default: - return false; + synchronized (_channelState) + { + switch (_state) + { + case ASYNC: + case READY: + case PENDING: + case UNREADY: + return true; + default: + return false; + } } } @Override public void flush() throws IOException { - while (true) + synchronized (_channelState) { - State state = _state.get(); - switch (state) + + switch (_state) { case OPEN: channelWrite(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); @@ -494,8 +494,7 @@ public void flush() throws IOException throw new IllegalStateException("isReady() not called"); case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; + _state = State.PENDING; new AsyncFlush().iterate(); return; @@ -511,7 +510,7 @@ public void flush() throws IOException return; default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(_state.toString()); } } } @@ -519,52 +518,35 @@ public void flush() throws IOException @Override public void write(byte[] b, int off, int len) throws IOException { - long written = _written + len; - boolean last = _channel.getResponse().isAllContentWritten(written); - int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); - // Write will be aggregated if: - // + it is smaller than the commitSize - // + is not the last one, or is last but will fit in an already allocated aggregate buffer. - boolean aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); - boolean flush = last || !aggregate || len >= space; + boolean last; + boolean aggregate; + boolean flush; // Async or Blocking ? - while (true) + boolean async = false; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + long written = _written + len; + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + last = _channel.getResponse().isAllContentWritten(written); + // Write will be aggregated if: + // + it is smaller than the commitSize + // + is not the last one, or is last but will fit in an already allocated aggregate buffer. + aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); + flush = last || !aggregate || len >= space; + switch (_state) { case OPEN: // process blocking write below - _written = written; break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: - if (!_state.compareAndSet(state, flush ? State.PENDING : State.ASYNC)) - continue; - _written = written; - - // Should we aggregate? - if (aggregate) - { - acquireBuffer(); - int filled = BufferUtil.fill(_aggregate, b, off, len); - - // return if we are not complete, not full and filled all the content - if (!flush) - return; - - // adjust offset/length - off += filled; - len -= filled; - } - - // Do the asynchronous writing from the callback - new AsyncWrite(b, off, len, last).iterate(); - return; + async = true; + _state = flush ? State.PENDING : State.ASYNC; + break; case PENDING: case UNREADY: @@ -578,12 +560,12 @@ public void write(byte[] b, int off, int len) throws IOException throw new EofException("Closed"); default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(_state.toString()); } - break; + _written = written; } - // handle blocking write + // Should we aggregate? if (aggregate) { acquireBuffer(); @@ -598,6 +580,13 @@ public void write(byte[] b, int off, int len) throws IOException len -= filled; } + if (async) + { + // Do the asynchronous writing from the callback + new AsyncWrite(b, off, len, last).iterate(); + return; + } + // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) { @@ -637,28 +626,27 @@ else if (last) public void write(ByteBuffer buffer) throws IOException { // This write always bypasses aggregate buffer + int len = BufferUtil.length(buffer); + boolean last; // Async or Blocking ? - while (true) + boolean async = false; + synchronized (_channelState) { - State state = _state.get(); - switch (state) + long written = _written + len; + last = _channel.getResponse().isAllContentWritten(_written); + switch (_state) { case OPEN: - // process blocking below break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: - if (!_state.compareAndSet(state, State.PENDING)) - continue; - - // Do the asynchronous writing from the callback - boolean last = isLastContentToWrite(buffer.remaining()); - new AsyncWrite(buffer, last).iterate(); - return; + async = true; + _state = State.PENDING; + break; case PENDING: case UNREADY: @@ -672,67 +660,57 @@ public void write(ByteBuffer buffer) throws IOException throw new EofException("Closed"); default: - throw new IllegalStateException(state.toString()); + throw new IllegalStateException(_state.toString()); } - break; + _written = written; } - // handle blocking write - int len = BufferUtil.length(buffer); - boolean last = isLastContentToWrite(len); - - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - channelWrite(_aggregate, last && len == 0); + if (async) + { + new AsyncWrite(buffer, last).iterate(); + } + else + { + // Blocking write + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + channelWrite(_aggregate, last && len == 0); - // write any remaining content in the buffer directly - if (len > 0) - channelWrite(buffer, last); - else if (last) - channelWrite(BufferUtil.EMPTY_BUFFER, true); + // write any remaining content in the buffer directly + if (len > 0) + channelWrite(buffer, last); + else if (last) + channelWrite(BufferUtil.EMPTY_BUFFER, true); + } } @Override public void write(int b) throws IOException { - long written = _written + 1; - boolean last = _channel.getResponse().isAllContentWritten(written); - int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); - boolean flush = last || space == 1; - + boolean flush; + boolean last; // Async or Blocking ? - while (true) + + boolean async = false; + synchronized (_channelState) { - switch (_state.get()) + long written = _written + 1; + int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); + last = _channel.getResponse().isAllContentWritten(written); + flush = last || space == 1; + + switch (_state) { case OPEN: - _written = written; - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); - - // Check if all written or full - if (flush) - channelWrite(_aggregate, last); break; case ASYNC: throw new IllegalStateException("isReady() not called"); case READY: - if (!_state.compareAndSet(State.READY, flush ? State.PENDING : State.ASYNC)) - continue; - - _written = written; - acquireBuffer(); - BufferUtil.append(_aggregate, (byte)b); - - // Check if all written or full - if (flush) - { - // Do the asynchronous writing from the callback - new AsyncFlush().iterate(); - } - return; + async = true; + _state = flush ? State.PENDING : State.ASYNC; + break; case PENDING: case UNREADY: @@ -748,8 +726,20 @@ public void write(int b) throws IOException default: throw new IllegalStateException(); } - break; + _written = written; } + acquireBuffer(); + BufferUtil.append(_aggregate, (byte)b); + + // Check if all written or full + if (!flush) + return; + + if (async) + // Do the asynchronous writing from the callback + new AsyncFlush().iterate(); + else + channelWrite(_aggregate, last); } @Override @@ -958,7 +948,9 @@ public void sendContent(ByteBuffer content, final Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); - _written += content.remaining(); + if (!prepareSendContent(content.remaining(), callback)) + return; + channelWrite(content, true, new Callback.Nested(callback) { @Override @@ -989,6 +981,9 @@ public void sendContent(InputStream in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(stream={},{})", in, callback); + if (!prepareSendContent(0, callback)) + return; + new InputStreamWritingCB(in, callback).iterate(); } @@ -1004,54 +999,60 @@ public void sendContent(ReadableByteChannel in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(channel={},{})", in, callback); + if (!prepareSendContent(0, callback)) + return; + new ReadableByteChannelWritingCB(in, callback).iterate(); } - /** - * Asynchronous send of HTTP content. - * - * @param httpContent The HTTP content to send - * @param callback The callback to use to notify success or failure - */ - public void sendContent(HttpContent httpContent, Callback callback) + private boolean prepareSendContent(int len, Callback callback) { - if (LOG.isDebugEnabled()) - LOG.debug("sendContent(http={},{})", httpContent, callback); - - if (BufferUtil.hasContent(_aggregate)) - { - callback.failed(new IOException("cannot sendContent() after write()")); - return; - } - if (_channel.isCommitted()) + synchronized (_channelState) { - callback.failed(new IOException("cannot sendContent(), output already committed")); - return; - } + if (BufferUtil.hasContent(_aggregate)) + { + callback.failed(new IOException("cannot sendContent() after write()")); + return false; + } + if (_channel.isCommitted()) + { + callback.failed(new IOException("cannot sendContent(), output already committed")); + return false; + } - while (true) - { - switch (_state.get()) + switch (_state) { case OPEN: - if (!_state.compareAndSet(State.OPEN, State.PENDING)) - continue; - break; + _state = State.PENDING; + if (len > 0) + _written += len; + return true; case ERROR: callback.failed(new EofException(_onError)); - return; + return false; case CLOSING: case CLOSED: callback.failed(new EofException("Closed")); - return; + return false; default: throw new IllegalStateException(); } - break; } + } + + /** + * Asynchronous send of HTTP content. + * + * @param httpContent The HTTP content to send + * @param callback The callback to use to notify success or failure + */ + public void sendContent(HttpContent httpContent, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("sendContent(http={},{})", httpContent, callback); ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null; if (buffer == null) @@ -1063,30 +1064,40 @@ public void sendContent(HttpContent httpContent, Callback callback) return; } + ReadableByteChannel rbc = null; try { - ReadableByteChannel rbc = httpContent.getReadableByteChannel(); - if (rbc != null) - { - // Close of the rbc is done by the async sendContent - sendContent(rbc, callback); - return; - } - - InputStream in = httpContent.getInputStream(); - if (in != null) - { - sendContent(in, callback); - return; - } + rbc = httpContent.getReadableByteChannel(); + } + catch (Throwable x) + { + LOG.debug(x); + } + if (rbc != null) + { + // Close of the rbc is done by the async sendContent + sendContent(rbc, callback); + return; + } - throw new IllegalArgumentException("unknown content for " + httpContent); + InputStream in = null; + try + { + in = httpContent.getInputStream(); } - catch (Throwable cause) + catch (Throwable x) { - abort(cause); - callback.failed(cause); + LOG.debug(x); + } + if (in != null) + { + sendContent(in, callback); + return; } + + Throwable cause = new IllegalArgumentException("unknown content for " + httpContent); + abort(cause); + callback.failed(cause); } public int getBufferSize() @@ -1159,23 +1170,24 @@ public void setWriteListener(WriteListener writeListener) { if (!_channel.getState().isAsync()) throw new IllegalStateException("!ASYNC"); - - if (_state.compareAndSet(State.OPEN, State.READY)) + boolean wake; + synchronized (_channelState) { + if (_state != State.OPEN) + throw new IllegalStateException("!OPEN"); _writeListener = writeListener; - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); + wake = _channel.getState().onWritePossible(); } - else - throw new IllegalStateException(); + if (wake) + _channel.execute(_channel); } @Override public boolean isReady() { - while (true) + synchronized (_channelState) { - switch (_state.get()) + switch (_state) { case OPEN: case READY: @@ -1185,20 +1197,18 @@ public boolean isReady() return true; case ASYNC: - if (!_state.compareAndSet(State.ASYNC, State.READY)) - continue; + _state = State.READY; return true; case PENDING: - if (!_state.compareAndSet(State.PENDING, State.UNREADY)) - continue; + _state = State.UNREADY; return false; case UNREADY: return false; default: - throw new IllegalStateException(); + throw new IllegalStateException(_state.toString()); } } } @@ -1206,77 +1216,59 @@ public boolean isReady() @Override public void run() { - while (true) + Throwable error = null; + + synchronized (_channelState) { - State state = _state.get(); + if (_state == State.ERROR) + { + error = _onError; + _onError = null; + // TODO change state? + } + } - if (_onError != null) + try + { + if (error == null) { - switch (state) - { - case CLOSING: - case CLOSED: - case ERROR: - { - _onError = null; - return; - } - default: - { - if (_state.compareAndSet(state, State.ERROR)) - { - Throwable th = _onError; - _onError = null; - if (LOG.isDebugEnabled()) - LOG.debug("onError", th); - - try - { - _writeListener.onError(th); - } - finally - { - IO.close(this); - } - - return; - } - } - } - continue; + if (LOG.isDebugEnabled()) + LOG.debug("onWritePossible"); + _writeListener.onWritePossible(); + return; } + } + catch (Throwable t) + { + error = t; + } - // We do not check the state here. Strictly speaking the state should - // always be READY when run is called. However, other async threads or - // a prior call by this thread to onDataAvailable may have called write - // after onWritePossible was called, so the state could be any of the - // write states. - // - // Even if the state is CLOSED, we need to call onWritePossible to tell - // async producer that the last write completed. - // - // We have to trust the scheduling of this run was done - // for good reason, that is protected correctly by HttpChannelState and - // that implementations of onWritePossible will - // themselves check isReady(). If multiple threads are calling write, - // then they must either rely on only a single container thread being - // dispatched or perform their own mutual exclusion. + if (error != null) + { try { - _writeListener.onWritePossible(); - break; + if (LOG.isDebugEnabled()) + LOG.debug("onError", error); + _writeListener.onError(error); + } + catch(Throwable t) + { + if (LOG.isDebugEnabled()) + LOG.debug(t); } - catch (Throwable e) + finally { - _onError = e; + // TODO is this needed? + IO.close(this); } } + } @Override public String toString() { - return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get()); + return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state); } private abstract class AsyncICB extends IteratingCallback @@ -1297,23 +1289,18 @@ public InvocationType getInvocationType() @Override protected void onCompleteSuccess() { - while (true) + boolean wake = false; + synchronized (_channelState) { - State last = _state.get(); - switch (last) + switch (_state) { case PENDING: - if (!_state.compareAndSet(State.PENDING, State.ASYNC)) - continue; + _state = State.ASYNC; break; case UNREADY: - if (!_state.compareAndSet(State.UNREADY, State.READY)) - continue; - if (_last) - closed(); - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); // TODO can we call directly? Why execute? + _state = _last ? State.CLOSED : State.READY; + wake = _channel.getState().onWritePossible(); break; case CLOSED: @@ -1322,8 +1309,10 @@ protected void onCompleteSuccess() default: throw new IllegalStateException(); } - break; } + + if (wake) + _channel.execute(_channel); // TODO can we call directly? Why execute? } @Override @@ -1337,9 +1326,9 @@ public void onCompleteFailure(Throwable e) private class AsyncFlush extends AsyncICB { - protected volatile boolean _flushed; + volatile boolean _flushed; - public AsyncFlush() + AsyncFlush() { super(false); } @@ -1370,9 +1359,9 @@ private class AsyncWrite extends AsyncICB private final ByteBuffer _buffer; private final ByteBuffer _slice; private final int _len; - protected volatile boolean _completed; + volatile boolean _completed; - public AsyncWrite(byte[] b, int off, int len, boolean last) + AsyncWrite(byte[] b, int off, int len, boolean last) { super(last); _buffer = ByteBuffer.wrap(b, off, len); @@ -1381,7 +1370,7 @@ public AsyncWrite(byte[] b, int off, int len, boolean last) _slice = _len < getBufferSize() ? null : _buffer.duplicate(); } - public AsyncWrite(ByteBuffer buffer, boolean last) + AsyncWrite(ByteBuffer buffer, boolean last) { super(last); _buffer = buffer; @@ -1467,7 +1456,7 @@ private class InputStreamWritingCB extends IteratingNestedCallback private final ByteBuffer _buffer; private boolean _eof; - public InputStreamWritingCB(InputStream in, Callback callback) + InputStreamWritingCB(InputStream in, Callback callback) { super(callback); _in = in; @@ -1534,7 +1523,7 @@ private class ReadableByteChannelWritingCB extends IteratingNestedCallback private final ByteBuffer _buffer; private boolean _eof; - public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) + ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { super(callback); _in = in; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java index 60b5b10e8b68..c33775d75869 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Response.java @@ -836,7 +836,7 @@ public void closeOutput() throws IOException public void closeOutput(Callback callback) { - _out.close((_outputType == OutputType.WRITER) ? _writer : _out, callback); + _out.complete((_outputType == OutputType.WRITER) ? _writer : _out, callback); } public long getLongContentLength()