diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index f681e2dd1404..28e2796ae708 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -31,7 +31,6 @@ import java.nio.charset.CodingErrorAction; import java.util.ResourceBundle; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import javax.servlet.RequestDispatcher; import javax.servlet.ServletOutputStream; import javax.servlet.ServletRequest; @@ -63,10 +62,9 @@ public class HttpOutput extends ServletOutputStream implements Runnable { private static final String LSTRING_FILE = "javax.servlet.LocalStrings"; - private static final Callback BLOCKING_CLOSE_CALLBACK = new Callback() {}; private static ResourceBundle lStrings = ResourceBundle.getBundle(LSTRING_FILE); - /* + /* TODO UPDATE!!! ACTION OPEN ASYNC READY PENDING UNREADY CLOSING CLOSED -------------------------------------------------------------------------------------------------- setWriteListener() READY->owp ise ise ise ise ise ise @@ -157,6 +155,7 @@ default void resetBuffer() throws IllegalStateException private final HttpChannelState _channelState; private final SharedBlockingCallback _writeBlocker; private State _state = State.OPEN; + private boolean _completing = false; private Interceptor _interceptor; private long _written; private long _flushed; @@ -166,7 +165,6 @@ default void resetBuffer() throws IllegalStateException private int _commitSize; private WriteListener _writeListener; private volatile Throwable _onError; - private Callback _completeCallback; private Callback _closeCallback; public HttpOutput(HttpChannel channel) @@ -299,32 +297,31 @@ public void complete(Closeable wrapper, Callback callback) return; } - // otherwise we must remember the callback and call the wrappers close. + // otherwise we must close the wrapper, but all calls to close() will now + // be treated as async anyway. synchronized (_channelState) { - if (_completeCallback != null) - throw new IllegalStateException(); - _completeCallback = callback; + _completing = true; + _closeCallback = Callback.combine(_closeCallback, callback); } try { - if (wrapper != null) - wrapper.close(); + wrapper.close(); } catch (Throwable th) { LOG.ignore(th); } - // Was our close method wasn't actually called, then do a normal async close + // If the wrapper intercepted the close, then initiate directly + boolean closed; synchronized (_channelState) { - if (_completeCallback != null) - callback = null; + closed = _state == State.CLOSED || _state == State.CLOSING; } - if (callback != null) - close(callback); + if (!closed) + close(null); } public void close(Callback callback) @@ -338,21 +335,59 @@ public void close(Callback callback) return; case CLOSING: - // TODO merge the callbacks + // Close already initiated, so just add the callback to those + // executed when it is complete. + _closeCallback = Callback.combine(_closeCallback, callback); return; case ERROR: - callback.failed(_onError); - // TODO State change??? + // TODO is this right? + Callback cb = Callback.combine(_closeCallback, callback); + _closeCallback = null; + cb.failed(_onError); + _state = State.CLOSED; return; + case PENDING: + case UNREADY: + // Let's just add the callback so it get's noticed once write is possible. + _closeCallback = Callback.combine(_closeCallback, callback); + break; + default: _state = State.CLOSING; + _closeCallback = Callback.combine(_closeCallback, callback); + break; } } ByteBuffer content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER; - channelWrite(content, !_channel.getResponse().isIncluding(), callback); + channelWrite(content, !_channel.getResponse().isIncluding(), new Callback() + { + @Override + public void succeeded() + { + callback().succeeded(); + } + + @Override + public void failed(Throwable x) + { + callback().failed(x); + } + + public Callback callback() + { + Callback closeCallback; + synchronized (_channelState) + { + _state = State.CLOSED; + closeCallback = _closeCallback; + _closeCallback = null; + } + return closeCallback == null ? Callback.NOOP : closeCallback; + } + }); } @Override @@ -361,12 +396,9 @@ public void close() throws IOException Callback callback = null; synchronized (_channelState) { - if (_completeCallback != null) - { - // This is a completion close, - callback = _completeCallback; - _completeCallback = null; - } + if (_completing) + // Completion has started so all closes are async + close(null); // Else handle with blocking unless already closed. else if (_state == State.CLOSED) return; @@ -1083,7 +1115,7 @@ public void sendContent(HttpContent httpContent, Callback callback) InputStream in = null; try { - in = httpContent.getInputStream(); + in = httpContent.getInputStream(); } catch (Throwable x) { @@ -1141,20 +1173,24 @@ public void onFlushed(long bytes) throws IOException public void recycle() { - _interceptor = _channel; - HttpConfiguration config = _channel.getHttpConfiguration(); - _bufferSize = config.getOutputBufferSize(); - _commitSize = config.getOutputAggregationSize(); - if (_commitSize > _bufferSize) - _commitSize = _bufferSize; - releaseBuffer(); - _written = 0; - _writeListener = null; - _onError = null; - _firstByteTimeStamp = -1; - _flushed = 0; - _closeCallback = null; - reopen(); + synchronized (_channelState) + { + _state = State.OPEN; + _completing = false; + _interceptor = _channel; + HttpConfiguration config = _channel.getHttpConfiguration(); + _bufferSize = config.getOutputBufferSize(); + _commitSize = config.getOutputAggregationSize(); + if (_commitSize > _bufferSize) + _commitSize = _bufferSize; + releaseBuffer(); + _written = 0; + _writeListener = null; + _onError = null; + _firstByteTimeStamp = -1; + _flushed = 0; + _closeCallback = null; + } } public void resetBuffer() @@ -1224,7 +1260,6 @@ public void run() { error = _onError; _onError = null; - // TODO change state? } } @@ -1251,15 +1286,14 @@ public void run() LOG.debug("onError", error); _writeListener.onError(error); } - catch(Throwable t) + catch (Throwable t) { if (LOG.isDebugEnabled()) LOG.debug(t); } finally { - // TODO is this needed? - IO.close(this); + closed(); } } @@ -1289,6 +1323,7 @@ public InvocationType getInvocationType() @Override protected void onCompleteSuccess() { + boolean close = false; boolean wake = false; synchronized (_channelState) { @@ -1296,10 +1331,14 @@ protected void onCompleteSuccess() { case PENDING: _state = State.ASYNC; + if (_closeCallback != null) + close = true; break; case UNREADY: _state = _last ? State.CLOSED : State.READY; + // TODO should we close first and then call OWP? + close = true; wake = _channel.getState().onWritePossible(); break; @@ -1311,6 +1350,9 @@ protected void onCompleteSuccess() } } + if (close) + HttpOutput.this.close(null); + if (wake) _channel.execute(_channel); // TODO can we call directly? Why execute? } diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 8ce876ca377c..102ccad42977 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -291,6 +291,58 @@ public InvocationType getInvocationType() } } + interface InvocableCallback extends Invocable, Callback + { + } + + static Callback combine(Callback cb1, Callback cb2) + { + if (cb1 == null || cb1 == cb2) + return cb2; + if (cb2 == null) + return cb1; + + return new InvocableCallback() + { + @Override + public void succeeded() + { + try + { + cb1.succeeded(); + } + finally + { + cb2.succeeded(); + } + } + + @Override + public void failed(Throwable x) + { + try + { + cb1.failed(x); + } + catch (Throwable t) + { + if (x != t) + x.addSuppressed(t); + } + finally + { + cb2.failed(x); + } + } + + @Override + public InvocationType getInvocationType() + { + return Invocable.combine(Invocable.getInvocationType(cb1), Invocable.getInvocationType(cb2)); + } + }; + } + /** *

A CompletableFuture that is also a Callback.

*/ diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java index 91e8e3ee13f5..620dccfe738e 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/thread/Invocable.java @@ -74,6 +74,20 @@ static void invokeNonBlocking(Runnable task) } } + static InvocationType combine(InvocationType it1, InvocationType it2) + { + if (it1 != null && it2 != null) + { + if (it1 == it2) + return it1; + if (it1 == InvocationType.EITHER) + return it2; + if (it2 == InvocationType.EITHER) + return it1; + } + return InvocationType.BLOCKING; + } + /** * Get the invocation type of an Object. *