From 7612bb5d42f74ec440dac92766ebc9a075322350 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 10 Dec 2019 16:00:37 +1100 Subject: [PATCH] Issue #4331 Close Complete a reworking of HttpOutput to separate out API state. Signed-off-by: Greg Wilkins --- .../org/eclipse/jetty/server/HttpOutput.java | 796 +++++++++++------- 1 file changed, 469 insertions(+), 327 deletions(-) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index c4092d0be900..fac8357980ac 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -42,7 +42,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IteratingCallback; -import org.eclipse.jetty.util.IteratingNestedCallback; import org.eclipse.jetty.util.SharedBlockingCallback; import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; import org.eclipse.jetty.util.log.Log; @@ -63,7 +62,7 @@ public class HttpOutput extends ServletOutputStream implements Runnable private static final String LSTRING_FILE = "javax.servlet.LocalStrings"; private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE); - /* + /* TODO ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED -------------------------------------------------------------------------------------------------- setWriteListener() READY->owp ise ise ise ise ise ise @@ -73,15 +72,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true CLOSED:true write completed - - - ASYNC READY->owp CLOSED - */ - enum State + + enum ApiState { - OPEN, // Open in blocking mode + BLOCKING, // Open in blocking mode + BLOCKED, // Blocked in blocking operation ASYNC, // Open in async mode READY, // isReady() has returned true PENDING, // write operating in progress UNREADY, // write operating in progress, isReady has returned false - ERROR, // onError needs to be called - CLOSING, // Asynchronous close in progress + } + + enum State + { + OPEN, // Open in blocking mode + CLOSING, // Close in progress CLOSED // Closed } @@ -153,7 +158,9 @@ default void resetBuffer() throws IllegalStateException private final HttpChannel _channel; private final HttpChannelState _channelState; private final SharedBlockingCallback _writeBlocker; + private ApiState _apiState = ApiState.BLOCKING; private State _state = State.OPEN; + private boolean _softClose = false; private Interceptor _interceptor; private long _written; private long _flushed; @@ -163,7 +170,7 @@ default void resetBuffer() throws IllegalStateException private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; - private Callback _closeCallback; + private Callback _closedCallback; public HttpOutput(HttpChannel channel) { @@ -210,7 +217,7 @@ public void reopen() { synchronized (_channelState) { - _state = State.OPEN; + _softClose = false; } } @@ -236,19 +243,10 @@ private void channelWrite(ByteBuffer content, boolean complete) throws IOExcepti { channelWrite(content, complete, blocker); blocker.block(); - if (complete) - closed(); - } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; } } - protected void channelWrite(ByteBuffer content, boolean complete, Callback callback) + private void channelWrite(ByteBuffer content, boolean last, Callback callback) { if (_firstByteTimeStamp == -1) { @@ -258,108 +256,191 @@ protected void channelWrite(ByteBuffer content, boolean complete, Callback callb else _firstByteTimeStamp = Long.MAX_VALUE; } - _interceptor.write(content, complete, callback); + + _interceptor.write(content, last, callback); } - private void abort(Throwable failure) + void onWriteComplete(boolean last, Throwable failure) { - closed(); - _channel.abort(failure); + if (LOG.isDebugEnabled()) + LOG.debug("onWriteComplete", failure); + + boolean wake = false; + Callback callback = null; + synchronized (_channelState) + { + if (_state == State.CLOSING || last) + { + _state = State.CLOSED; + callback = _closedCallback; + _closedCallback = null; + } + + switch (_apiState) + { + case BLOCKED: + _apiState = ApiState.BLOCKING; + break; + + case PENDING: + _apiState = ApiState.ASYNC; + break; + + case UNREADY: + _apiState = ApiState.READY; + wake = _channel.getState().onWritePossible(); + break; + + default: + if (_state == State.CLOSED) + break; + IllegalStateException ise = new IllegalStateException(stateString()); + if (failure != null) + ise.addSuppressed(failure); + throw ise; + } + } + + if (failure != null) + _channel.abort(failure); + + try + { + if (callback != null) + { + if (failure == null) + callback.succeeded(); + else + callback.failed(failure); + } + } + finally + { + if (wake) + _channel.execute(_channel); // TODO can we call directly? Why execute? + } + } public void closedBySendError() { synchronized (_channelState) { - - switch (_state) + switch (_apiState) { - case OPEN: + case BLOCKING: + case BLOCKED: case READY: case ASYNC: - _state = State.CLOSED; + _softClose = true; return; default: - throw new IllegalStateException(_state.toString()); + throw new IllegalStateException(stateString()); } } } public void close(Callback callback) { + boolean succeeded = false; + Throwable error = null; + ByteBuffer content = null; synchronized (_channelState) { switch (_state) { case CLOSED: - callback.succeeded(); - return; - - case ERROR: - // TODO is this right? Perhaps the close should wait - // TODO until after onError callback? - Callback cb = Callback.combine(_closeCallback, callback); - _closeCallback = null; - cb.failed(_onError); - _state = State.CLOSED; - return; + succeeded = true; + break; - case CLOSING: // Close already initiated, so just add callback - case PENDING: // Add the callback and close when write is complete. - case UNREADY: // Add the callback and close when write is complete. - // Let's just add the callback so it get's noticed once write is possible. - _closeCallback = Callback.combine(_closeCallback, callback); - return; + case CLOSING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; - default: - _state = State.CLOSING; - _closeCallback = Callback.combine(_closeCallback, callback); + case OPEN: + if (_onError != null) + { + error = _onError; + break; + } + + switch (_apiState) + { + case BLOCKING: + _apiState = ApiState.BLOCKED; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case READY: + _apiState = ApiState.PENDING; + _state = State.CLOSING; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case BLOCKED: + case UNREADY: + case PENDING: + _closedCallback = Callback.combine(_closedCallback, callback); + break; + } + break; } } - ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - channelWrite(content, !_channel.getResponse().isIncluding(), new Callback() + if (succeeded) { - @Override - public void succeeded() - { - callback().succeeded(); - } + callback.succeeded(); + return; + } - @Override - public void failed(Throwable x) - { - callback().failed(x); - } + if (error != null) + { + callback.failed(error); + return; + } - public Callback callback() - { - Callback closeCallback; - synchronized (_channelState) + if (content != null) + channelWrite(content, true, + new Callback.Nested(callback) { - _state = State.CLOSED; - closeCallback = _closeCallback; - _closeCallback = null; - } - return closeCallback == null ? Callback.NOOP : closeCallback; - } - }); + @Override + public void succeeded() + { + onWriteComplete(true, null); + super.succeeded(); + } + + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + super.failed(x); + } + }); } @Override public void close() throws IOException { + if (_channel.getResponse().isIncluding()) + { + _softClose = true; + flush(); + return; + } + try (Blocker blocker = _writeBlocker.acquire()) { close(blocker); blocker.block(); + onWriteComplete(true, null); } catch (Throwable failure) { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); + onWriteComplete(true, failure); throw failure; } } @@ -370,29 +451,18 @@ public void close() throws IOException */ public void closed() { + Callback callback = null; synchronized (_channelState) { - switch (_state) + if (_state != State.CLOSED) { - case CLOSED: - { - break; - } - case UNREADY: - { - _state = State.ERROR; - if (_onError == null) - _onError = new EofException("Async closed"); - releaseBuffer(); - break; - } - default: - { - _state = State.CLOSED; - releaseBuffer(); - } + callback = _closedCallback; + _closedCallback = null; + _state = State.CLOSED; } } + if (callback != null) + callback.succeeded(); } public ByteBuffer getBuffer() @@ -420,14 +490,7 @@ public boolean isClosed() { synchronized (_channelState) { - switch (_state) - { - case CLOSING: - case CLOSED: - return true; - default: - return false; - } + return _softClose || (_state != State.OPEN); } } @@ -435,7 +498,7 @@ public boolean isAsync() { synchronized (_channelState) { - switch (_state) + switch (_apiState) { case ASYNC: case READY: @@ -451,38 +514,73 @@ public boolean isAsync() @Override public void flush() throws IOException { + ByteBuffer content = null; synchronized (_channelState) { - switch (_state) { - case OPEN: - channelWrite(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false); - return; - - case ASYNC: - throw new IllegalStateException("isReady() not called"); - - case READY: - _state = State.PENDING; - new AsyncFlush().iterate(); - return; - - case UNREADY: - throw new WritePendingException(); - - case ERROR: - throw new EofException(_onError); - - case PENDING: - case CLOSING: case CLOSED: + case CLOSING: return; default: - throw new IllegalStateException(_state.toString()); + { + switch (_apiState) + { + case BLOCKING: + _apiState = ApiState.BLOCKED; + content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + break; + + case ASYNC: + case PENDING: + throw new IllegalStateException("isReady() not called: " + stateString()); + + case READY: + _apiState = ApiState.PENDING; + break; + + case UNREADY: + throw new WritePendingException(); + + default: + throw new IllegalStateException(stateString()); + } + } } } + + if (content == null) + new AsyncFlush().iterate(); + else + { + try + { + channelWrite(content, false); + onWriteComplete(false, null); + } + catch (Throwable t) + { + onWriteComplete(false, t); + throw t; + } + } + } + + private void checkWritable() throws EofException + { + switch (_state) + { + case CLOSED: + case CLOSING: + throw new EofException("Closed"); + + default: + break; + } + + if (_onError != null) + throw new EofException(_onError); } @Override @@ -493,9 +591,10 @@ public void write(byte[] b, int off, int len) throws IOException boolean flush; // Async or Blocking ? - boolean async = false; + boolean async; synchronized (_channelState) { + checkWritable(); long written = _written + len; int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); last = _channel.getResponse().isAllContentWritten(written); @@ -504,34 +603,30 @@ public void write(byte[] b, int off, int len) throws IOException // + is not the last one, or is last but will fit in an already allocated aggregate buffer. aggregate = len <= _commitSize && (!last || BufferUtil.hasContent(_aggregate) && len <= space); flush = last || !aggregate || len >= space; - switch (_state) + + switch (_apiState) { - case OPEN: - // process blocking write below + case BLOCKING: + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; + async = false; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: async = true; - _state = flush ? State.PENDING : State.ASYNC; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); - default: - throw new IllegalStateException(_state.toString()); + throw new IllegalStateException(stateString()); } + _written = written; } @@ -557,41 +652,52 @@ public void write(byte[] b, int off, int len) throws IOException return; } - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) + // Blocking write + try { - channelWrite(_aggregate, last && len == 0); - - // should we fill aggregate again from the buffer? - if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) { - BufferUtil.append(_aggregate, b, off, len); - return; + channelWrite(_aggregate, last && len == 0); + + // should we fill aggregate again from the buffer? + if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate)) + { + BufferUtil.append(_aggregate, b, off, len); + return; + } } - } - // write any remaining content in the buffer directly - if (len > 0) - { - // write a buffer capacity at a time to avoid JVM pooling large direct buffers - // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 - ByteBuffer view = ByteBuffer.wrap(b, off, len); + // write any remaining content in the buffer directly + if (len > 0) + { + // write a buffer capacity at a time to avoid JVM pooling large direct buffers + // http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541 + ByteBuffer view = ByteBuffer.wrap(b, off, len); - while (len > getBufferSize()) + while (len > getBufferSize()) + { + int p = view.position(); + int l = p + getBufferSize(); + view.limit(l); + channelWrite(view, false); + view.limit(p + len); + view.position(l); + len -= getBufferSize(); + } + channelWrite(view, last); + } + else if (last) { - int p = view.position(); - int l = p + getBufferSize(); - view.limit(l); - channelWrite(view, false); - view.limit(p + len); - view.position(l); - len -= getBufferSize(); + channelWrite(BufferUtil.EMPTY_BUFFER, true); } - channelWrite(view, last); + + onWriteComplete(last, null); } - else if (last) + catch (Throwable t) { - channelWrite(BufferUtil.EMPTY_BUFFER, true); + onWriteComplete(last, t); + throw t; } } @@ -599,60 +705,71 @@ public void write(ByteBuffer buffer) throws IOException { // This write always bypasses aggregate buffer int len = BufferUtil.length(buffer); + boolean flush; boolean last; // Async or Blocking ? - boolean async = false; + boolean async; synchronized (_channelState) { + checkWritable(); long written = _written + len; last = _channel.getResponse().isAllContentWritten(_written); - switch (_state) + flush = last || len > 0 || BufferUtil.hasContent(_aggregate); + switch (_apiState) { - case OPEN: + case BLOCKING: + async = false; + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: async = true; - _state = State.PENDING; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); - default: - throw new IllegalStateException(_state.toString()); + throw new IllegalStateException(stateString()); } _written = written; } + if (!flush) + return; + if (async) { new AsyncWrite(buffer, last).iterate(); } else { - // Blocking write - // flush any content from the aggregate - if (BufferUtil.hasContent(_aggregate)) - channelWrite(_aggregate, last && len == 0); - - // write any remaining content in the buffer directly - if (len > 0) - channelWrite(buffer, last); - else if (last) - channelWrite(BufferUtil.EMPTY_BUFFER, true); + try + { + // Blocking write + // flush any content from the aggregate + if (BufferUtil.hasContent(_aggregate)) + channelWrite(_aggregate, last && len == 0); + + // write any remaining content in the buffer directly + if (len > 0) + channelWrite(buffer, last); + else if (last) + channelWrite(BufferUtil.EMPTY_BUFFER, true); + + onWriteComplete(last, null); + } + catch (Throwable t) + { + onWriteComplete(last, t); + throw t; + } } } @@ -666,37 +783,32 @@ public void write(int b) throws IOException boolean async = false; synchronized (_channelState) { + checkWritable(); long written = _written + 1; int space = _aggregate == null ? getBufferSize() : BufferUtil.space(_aggregate); last = _channel.getResponse().isAllContentWritten(written); flush = last || space == 1; - switch (_state) + switch (_apiState) { - case OPEN: + case BLOCKING: + _apiState = flush ? ApiState.BLOCKED : ApiState.BLOCKING; break; case ASYNC: - throw new IllegalStateException("isReady() not called"); + throw new IllegalStateException("isReady() not called: " + stateString()); case READY: async = true; - _state = flush ? State.PENDING : State.ASYNC; + _apiState = flush ? ApiState.PENDING : ApiState.ASYNC; break; case PENDING: case UNREADY: throw new WritePendingException(); - case ERROR: - throw new EofException(_onError); - - case CLOSING: - case CLOSED: - throw new EofException("Closed"); - default: - throw new IllegalStateException(); + throw new IllegalStateException(stateString()); } _written = written; } @@ -711,7 +823,18 @@ public void write(int b) throws IOException // Do the asynchronous writing from the callback new AsyncFlush().iterate(); else - channelWrite(_aggregate, last); + { + try + { + channelWrite(_aggregate, last); + onWriteComplete(last, null); + } + catch (Throwable t) + { + onWriteComplete(last, t); + throw t; + } + } } @Override @@ -856,13 +979,6 @@ public void sendContent(InputStream in) throws IOException new InputStreamWritingCB(in, blocker).iterate(); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -878,13 +994,6 @@ public void sendContent(ReadableByteChannel in) throws IOException new ReadableByteChannelWritingCB(in, blocker).iterate(); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -900,13 +1009,6 @@ public void sendContent(HttpContent content) throws IOException sendContent(content, blocker); blocker.block(); } - catch (Throwable failure) - { - if (LOG.isDebugEnabled()) - LOG.debug(failure); - abort(failure); - throw failure; - } } /** @@ -923,22 +1025,23 @@ public void sendContent(ByteBuffer content, final Callback callback) if (!prepareSendContent(content.remaining(), callback)) return; - channelWrite(content, true, new Callback.Nested(callback) - { - @Override - public void succeeded() + channelWrite(content, true, + new Callback.Nested(callback) { - closed(); - super.succeeded(); - } + @Override + public void succeeded() + { + onWriteComplete(true, null); + super.succeeded(); + } - @Override - public void failed(Throwable x) - { - abort(x); - super.failed(x); - } - }); + @Override + public void failed(Throwable x) + { + onWriteComplete(true, x); + super.failed(x); + } + }); } /** @@ -994,24 +1097,27 @@ private boolean prepareSendContent(int len, Callback callback) switch (_state) { - case OPEN: - _state = State.PENDING; - if (len > 0) - _written += len; - return true; - - case ERROR: - callback.failed(new EofException(_onError)); - return false; - - case CLOSING: case CLOSED: + case CLOSING: callback.failed(new EofException("Closed")); return false; default: - throw new IllegalStateException(); + break; + } + + if (_onError != null) + { + callback.failed(_onError); + return false; } + + if (_apiState != ApiState.BLOCKING) + throw new IllegalStateException(stateString()); + _apiState = ApiState.PENDING; + if (len > 0) + _written += len; + return true; } } @@ -1068,7 +1174,7 @@ public void sendContent(HttpContent httpContent, Callback callback) } Throwable cause = new IllegalArgumentException("unknown content for " + httpContent); - abort(cause); + _channel.abort(cause); callback.failed(cause); } @@ -1116,6 +1222,8 @@ public void recycle() synchronized (_channelState) { _state = State.OPEN; + _apiState = ApiState.BLOCKING; + _softClose = false; _interceptor = _channel; HttpConfiguration config = _channel.getHttpConfiguration(); _bufferSize = config.getOutputBufferSize(); @@ -1128,7 +1236,7 @@ public void recycle() _onError = null; _firstByteTimeStamp = -1; _flushed = 0; - _closeCallback = null; + _closedCallback = null; } } @@ -1144,13 +1252,13 @@ public void resetBuffer() public void setWriteListener(WriteListener writeListener) { if (!_channel.getState().isAsync()) - throw new IllegalStateException("!ASYNC"); + throw new IllegalStateException("!ASYNC: " + stateString()); boolean wake; synchronized (_channelState) { - if (_state != State.OPEN) - throw new IllegalStateException("!OPEN"); - _state = State.READY; + if (_apiState != ApiState.BLOCKING) + throw new IllegalStateException("!OPEN" + stateString()); + _apiState = ApiState.READY; _writeListener = writeListener; wake = _channel.getState().onWritePossible(); } @@ -1163,28 +1271,26 @@ public boolean isReady() { synchronized (_channelState) { - switch (_state) + switch (_apiState) { - case OPEN: + case BLOCKING: case READY: - case ERROR: - case CLOSING: - case CLOSED: return true; case ASYNC: - _state = State.READY; + _apiState = ApiState.READY; return true; case PENDING: - _state = State.UNREADY; + _apiState = ApiState.UNREADY; return false; + case BLOCKED: case UNREADY: return false; default: - throw new IllegalStateException(_state.toString()); + throw new IllegalStateException(stateString()); } } } @@ -1196,8 +1302,7 @@ public void run() synchronized (_channelState) { - // TODO does this need to be a state or just non null _onError - if (_state == State.ERROR) + if (_onError != null) { error = _onError; _onError = null; @@ -1240,17 +1345,25 @@ public void run() } } + private String stateString() + { + return String.format("s=%s,api=%s", _state, _apiState); + } + @Override public String toString() { - return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state); + synchronized (_channelState) + { + return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), stateString()); + } } - private abstract class AsyncICB extends IteratingCallback + private abstract class ChannelWriteCB extends IteratingCallback { final boolean _last; - AsyncICB(boolean last) + ChannelWriteCB(boolean last) { _last = last; } @@ -1262,66 +1375,84 @@ public InvocationType getInvocationType() } @Override - protected void onCompleteSuccess() + protected Action process() throws Exception { - boolean close = false; - boolean wake = false; + ByteBuffer closeContent = null; synchronized (_channelState) { - switch (_state) + // If we have a pending close, schedule one more iteration + if (_state == State.OPEN && _closedCallback != null) { - case PENDING: - _state = State.ASYNC; - if (_closeCallback != null) - close = true; - break; + _state = State.CLOSING; + closeContent = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; + } + } - case UNREADY: - if (_last) - { - _state = State.CLOSED; - close = true; - } - else - { - _state = State.READY; - } - wake = _channel.getState().onWritePossible(); - break; + if (closeContent != null) + { + channelWrite(closeContent, true, this); + return Action.SCHEDULED; + } - case CLOSED: - break; + return Action.SUCCEEDED; + } - default: - throw new IllegalStateException(); - } - } + @Override + protected void onCompleteSuccess() + { + onWriteComplete(_last, null); + } + + @Override + public void onCompleteFailure(Throwable e) + { + onWriteComplete(_last, e); + } + } + + private abstract class NestedChannelWriteCB extends ChannelWriteCB + { + final Callback _callback; + + NestedChannelWriteCB(Callback callback, boolean last) + { + super(last); + _callback = callback; + } - if (close) + @Override + protected void onCompleteSuccess() + { + try { - if (wake) - HttpOutput.this.close(Callback.from(() -> _channel.execute(_channel))); // TODO can we call directly? Why execute? - else - HttpOutput.this.close(null); + super.onCompleteSuccess(); + } + finally + { + _callback.succeeded(); } - else if (wake) - _channel.execute(_channel); // TODO can we call directly? Why execute? } @Override public void onCompleteFailure(Throwable e) { - synchronized (_channelState) + try + { + super.onCompleteFailure(e); + } + catch (Throwable t) + { + if (t != e) + e.addSuppressed(t); + } + finally { - _onError = e == null ? new IOException() : e; - _state = State.ERROR; + _callback.failed(e); } - if (_channel.getState().onWritePossible()) - _channel.execute(_channel); } } - private class AsyncFlush extends AsyncICB + private class AsyncFlush extends ChannelWriteCB { volatile boolean _flushed; @@ -1331,7 +1462,7 @@ private class AsyncFlush extends AsyncICB } @Override - protected Action process() + protected Action process() throws Exception { if (BufferUtil.hasContent(_aggregate)) { @@ -1347,11 +1478,11 @@ protected Action process() return Action.SCHEDULED; } - return Action.SUCCEEDED; + return super.process(); } } - private class AsyncWrite extends AsyncICB + private class AsyncWrite extends ChannelWriteCB { private final ByteBuffer _buffer; private final ByteBuffer _slice; @@ -1382,7 +1513,7 @@ private class AsyncWrite extends AsyncICB } @Override - protected Action process() + protected Action process() throws Exception { // flush any content from the aggregate if (BufferUtil.hasContent(_aggregate)) @@ -1435,7 +1566,8 @@ protected Action process() if (LOG.isDebugEnabled() && _completed) LOG.debug("EOF of {}", this); - return Action.SUCCEEDED; + + return super.process(); } } @@ -1447,15 +1579,16 @@ protected Action process() * be notified as each buffer is written and only once all the input is consumed will the * wrapped {@link Callback#succeeded()} method be called. */ - private class InputStreamWritingCB extends IteratingNestedCallback + private class InputStreamWritingCB extends NestedChannelWriteCB { private final InputStream _in; private final ByteBuffer _buffer; private boolean _eof; + private boolean _closed; InputStreamWritingCB(InputStream in, Callback callback) { - super(callback); + super(callback, true); _in = in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false); } @@ -1469,11 +1602,13 @@ protected Action process() throws Exception { if (LOG.isDebugEnabled()) LOG.debug("EOF of {}", this); - // Handle EOF - _in.close(); - closed(); - _channel.getByteBufferPool().release(_buffer); - return Action.SUCCEEDED; + if (!_closed) + { + _closed = true; + _channel.getByteBufferPool().release(_buffer); + IO.close(_in); + } + return super.process(); } // Read until buffer full or EOF @@ -1498,10 +1633,14 @@ protected Action process() throws Exception @Override public void onCompleteFailure(Throwable x) { - abort(x); - _channel.getByteBufferPool().release(_buffer); - IO.close(_in); - super.onCompleteFailure(x); + try + { + _channel.getByteBufferPool().release(_buffer); + } + finally + { + super.onCompleteFailure(x); + } } } @@ -1514,15 +1653,16 @@ public void onCompleteFailure(Throwable x) * be notified as each buffer is written and only once all the input is consumed will the * wrapped {@link Callback#succeeded()} method be called. */ - private class ReadableByteChannelWritingCB extends IteratingNestedCallback + private class ReadableByteChannelWritingCB extends NestedChannelWriteCB { private final ReadableByteChannel _in; private final ByteBuffer _buffer; private boolean _eof; + private boolean _closed; ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback) { - super(callback); + super(callback, true); _in = in; _buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers()); } @@ -1536,10 +1676,13 @@ protected Action process() throws Exception { if (LOG.isDebugEnabled()) LOG.debug("EOF of {}", this); - _in.close(); - closed(); - _channel.getByteBufferPool().release(_buffer); - return Action.SUCCEEDED; + if (!_closed) + { + _closed = true; + _channel.getByteBufferPool().release(_buffer); + IO.close(_in); + } + return super.process(); } // Read from stream until buffer full or EOF @@ -1560,7 +1703,6 @@ protected Action process() throws Exception @Override public void onCompleteFailure(Throwable x) { - abort(x); _channel.getByteBufferPool().release(_buffer); IO.close(_in); super.onCompleteFailure(x);