diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index fb0ca7b2c023..b56e73e32562 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -62,17 +62,27 @@ public class HttpOutput extends ServletOutputStream implements Runnable private static final String LSTRING_FILE = "javax.servlet.LocalStrings"; private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE); - /* TODO - ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED - -------------------------------------------------------------------------------------------------- - setWriteListener() READY->owp ise ise ise ise ise ise - write() OPEN ise PENDING wpe wpe eof eof - flush() OPEN ise PENDING wpe wpe eof eof - close() CLOSING CLOSING CLOSING CLOSED CLOSED CLOSING CLOSED - isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true CLOSED:true - write completed - - - ASYNC READY->owp CLOSED - + /* + BLOCKING-------write/flush/close------->BLOCKED + | ^ | + | | | + | +-------- onWriteComplete ----------+ + | + |setWriteListener + | + v + READY ------ write/flush/close ------> PENDING + ^ ^ | | + | | | | + | +----------isReady==true----------+ | | + | | | | + |onWriteComplete | | |onWriteComplete + | | | | + | +----------isReady==false------------+ | + | | | | + | v | v + UNREADY +---ASYNC */ - enum ApiState { BLOCKING, // Open in blocking mode @@ -83,10 +93,25 @@ enum ApiState UNREADY, // write operating in progress, isReady has returned false } + /* + OPEN/BLOCKING ---- close ----> CLOSING/BLOCKED + OPEN/BLOCKED ----- close ----> assign _closedCallback ---onWriteComplete---> CLOSING/BLOCKED + OPEN/ASYNC ------- close ----> CLOSING/PENDING + OPEN/READY ------- close ----> CLOSING/PENDING + OPEN/PENDING ----- close ----> assign _closedCallback --- onWriteComplete --> CLOSING/PENDING + OPEN/UNREADY ----- close ----> assign _closedCallback --- onWriteComplete --> CLOSING/UNREADY + CLOSING/* ---------close ----> combine _closedCallback + CLOSING/BLOCKING ---- onWriteComplete ----> CLOSED/BLOCKING + CLOSING/BLOCKED ----- onWriteComplete ----> CLOSED/BLOCKING + CLOSING/ASYNC ------- onWriteComplete ----> CLOSED/ASYNC + CLOSING/READY ------- onWriteComplete ----> CLOSED/READY + CLOSING/PENDING ----- onWriteComplete ----> CLOSED/ASYNC + CLOSING/UNREADY ----- onWriteComplete ----> CLOSED/UNREADY + */ enum State { - OPEN, // Open in blocking mode - CLOSING, // Close in progress + OPEN, // Open + CLOSING, // Close in progress after close API called CLOSED // Closed } @@ -221,17 +246,6 @@ public void reopen() } } - private boolean isLastContentToWrite(int len) - { - _written += len; - return _channel.getResponse().isAllContentWritten(_written); - } - - public boolean isAllContentWritten() - { - return _channel.getResponse().isAllContentWritten(_written); - } - protected Blocker acquireWriteBlockingCallback() throws IOException { return _writeBlocker.acquire(); @@ -268,8 +282,13 @@ void onWriteComplete(boolean last, Throwable failure) boolean wake = false; Callback callback = null; boolean release = false; + ByteBuffer closeContent = null; synchronized (_channelState) { + // Transition to CLOSED state if + // + we are in state CLOSING because the closed API was called + // + we were the last write + // + we have failed if (_state == State.CLOSING || last || failure != null) { _state = State.CLOSED; @@ -278,41 +297,52 @@ void onWriteComplete(boolean last, Throwable failure) release = true; } - switch (_apiState) + if (_closedCallback != null && _state == State.OPEN) { - case BLOCKED: - _apiState = ApiState.BLOCKING; - break; - - case PENDING: - _apiState = ApiState.ASYNC; - if (failure != null) - { - _onError = failure; - wake = _channel.getState().onWritePossible(); - } - break; + _state = State.CLOSING; + closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + } + else + { + switch (_apiState) + { + case BLOCKED: + _apiState = ApiState.BLOCKING; + break; - case UNREADY: - _apiState = ApiState.READY; - if (failure != null) - _onError = failure; - wake = _channel.getState().onWritePossible(); - break; + case PENDING: + _apiState = ApiState.ASYNC; + if (failure != null) + { + _onError = failure; + wake = _channelState.onWritePossible(); + } + break; - default: - if (_state == State.CLOSED) + case UNREADY: + _apiState = ApiState.READY; + if (failure != null) + _onError = failure; + wake = _channelState.onWritePossible(); break; - IllegalStateException ise = new IllegalStateException(stateString()); - if (failure != null) - ise.addSuppressed(failure); - throw ise; + + default: + if (_state == State.CLOSED) + break; + throw new IllegalStateException(stateString()); + } } } if (failure != null) _channel.abort(failure); + if (closeContent != null) + { + channelWrite(closeContent, true, new AsyncCloseCB(Callback.NOOP)); + return; + } + try { if (callback != null) @@ -415,22 +445,7 @@ public void close(Callback callback) if (content != null) channelWrite(content, true, - new Callback.Nested(callback) - { - @Override - public void succeeded() - { - onWriteComplete(true, null); - super.succeeded(); - } - - @Override - public void failed(Throwable x) - { - onWriteComplete(true, x); - super.failed(x); - } - }); + new AsyncCloseCB(callback)); } @Override @@ -457,8 +472,9 @@ public void close() throws IOException } /** - * Called to indicate that the last write has been performed. - * It updates the state and performs cleanup operations. + * Called to indicate that the output has been closed externally + * via other means that may not have involve writing + * the last chunk. */ public void closed() { @@ -581,6 +597,10 @@ public void flush() throws IOException private void checkWritable() throws EofException { + // TODO check this + if (_softClose) + throw new EofException("Closed"); + switch (_state) { case CLOSED: @@ -1034,26 +1054,24 @@ public void sendContent(ByteBuffer content, final Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback); - if (!prepareSendContent(content.remaining(), callback)) - return; - - channelWrite(content, true, - new Callback.Nested(callback) - { - @Override - public void succeeded() + if (prepareSendContent(content.remaining(), callback)) + channelWrite(content, true, + new Callback.Nested(callback) { - onWriteComplete(true, null); - super.succeeded(); - } + @Override + public void succeeded() + { + onWriteComplete(true, null); + super.succeeded(); + } - @Override - public void failed(Throwable x) - { - onWriteComplete(true, x); - super.failed(x); - } - }); + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + super.failed(x); + } + }); } /** @@ -1068,10 +1086,8 @@ public void sendContent(InputStream in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(stream={},{})", in, callback); - if (!prepareSendContent(0, callback)) - return; - - new InputStreamWritingCB(in, callback).iterate(); + if (prepareSendContent(0, callback)) + new InputStreamWritingCB(in, callback).iterate(); } /** @@ -1086,10 +1102,8 @@ public void sendContent(ReadableByteChannel in, Callback callback) if (LOG.isDebugEnabled()) LOG.debug("sendContent(channel={},{})", in, callback); - if (!prepareSendContent(0, callback)) - return; - - new ReadableByteChannelWritingCB(in, callback).iterate(); + if (prepareSendContent(0, callback)) + new ReadableByteChannelWritingCB(in, callback).iterate(); } private boolean prepareSendContent(int len, Callback callback) @@ -1335,25 +1349,21 @@ public void run() { error = t; } - - if (error != null) + try { - try - { - if (LOG.isDebugEnabled()) - LOG.debug("onError", error); - _writeListener.onError(error); - } - catch (Throwable t) - { - if (LOG.isDebugEnabled()) - LOG.debug(t); - } - finally - { - // Initiate an async close - close(Callback.NOOP); - } + if (LOG.isDebugEnabled()) + LOG.debug("onError", error); + _writeListener.onError(error); + } + catch (Throwable t) + { + if (LOG.isDebugEnabled()) + LOG.debug(t); + } + finally + { + // Initiate an async close + close(Callback.NOOP); } } @@ -1386,29 +1396,6 @@ public InvocationType getInvocationType() return InvocationType.NON_BLOCKING; } - @Override - protected Action process() throws Exception - { - ByteBuffer closeContent = null; - synchronized (_channelState) - { - // If we have a pending close, schedule one more iteration - if (_state == State.OPEN && _closedCallback != null) - { - _state = State.CLOSING; - closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - } - } - - if (closeContent != null) - { - channelWrite(closeContent, true, this); - return Action.SCHEDULED; - } - - return Action.SUCCEEDED; - } - @Override protected void onCompleteSuccess() { @@ -1490,7 +1477,7 @@ protected Action process() throws Exception return Action.SCHEDULED; } - return super.process(); + return Action.SUCCEEDED; } } @@ -1579,7 +1566,7 @@ protected Action process() throws Exception if (LOG.isDebugEnabled() && _completed) LOG.debug("EOF of {}", this); - return super.process(); + return Action.SUCCEEDED; } } @@ -1620,7 +1607,8 @@ protected Action process() throws Exception _channel.getByteBufferPool().release(_buffer); IO.close(_in); } - return super.process(); + + return Action.SUCCEEDED; } // Read until buffer full or EOF @@ -1694,7 +1682,7 @@ protected Action process() throws Exception _channel.getByteBufferPool().release(_buffer); IO.close(_in); } - return super.process(); + return Action.SUCCEEDED; } // Read from stream until buffer full or EOF @@ -1739,4 +1727,26 @@ protected long getIdleTimeout() return blockingTimeout; } } + + private class AsyncCloseCB extends Callback.Nested + { + public AsyncCloseCB(Callback callback) + { + super(callback); + } + + @Override + public void succeeded() + { + onWriteComplete(true, null); + super.succeeded(); + } + + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + super.failed(x); + } + } }