Skip to content

Commit

Permalink
Jetty 9.4.x 4331 async close complete3 (#4409)
Browse files Browse the repository at this point in the history
* Issue #4376 Async Content Complete

Added test harness to reproduce unready completing write.
Fixed test by not closing output prior to becoming READY

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Write Complete

Test harness to reproduce unready when closing/completing.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Write Complete

test both PENDING and UNREADY

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Write Complete

test cleanups

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Async Close Complete

Cleanups of write

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* WIP

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Work in progress

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Added async close to HttpWriter and ResponseWriter
Always use async close, with blocker if necessary.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Working async close complete!

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

invert test as we can now call complete when not ready!

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

fixed transition to ERROR state

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

async close after onError

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

minor cleanups

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Fix for proxy tests

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Fix write loop to handle clear of p=0,l=0 rather than p=l

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Removed old close on all content mechanism
Cleanups and some more TODOs

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

a reworking of HttpOutput to separate out API state.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Soft close for Dispatcher
release buffer in onWriteComplete

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Set _onError in onWriteComplete
NOOP callback instead of null

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

failure closes HttpOutput

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Moved closedCallback handling to onWriteComplete

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Additional test of complete during blocking write.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

reimplemented blocking close to sometimes be async

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

ascii "art"

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Code cleanup.  Use a CLOSE state rather than non null closedCallback to be clearer that it is a state.
Renamed close(Callback) to complete(Callback)
Renamed and simplified closed() to completed()

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Do not dispatch
Better ascii art
improved close impl to be similar to complete

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

More test cases

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

retain execute behaviour in 9.4. review in 10.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Improved javadoc and ascii art

* Improved CLOSING

Switch to CLOSING state as soon as last write is done, even if several non last channelWrites will be done.   This allows a subsequent call to close to know that nothing needs to be written and can avoid some EOF exceptions. Now onWriteComplete acts only on the passed in last parameter.

Added test for sendContent

* WIP

Aggregate within lock
pipeline test debug

* Avoid creating ignored exception when Idle or Failed.

* Try a parse without fill to avoid unconsumed input debug

* fixed pipeline size

* release buffer before callback

* turn off debug

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Better javadoc
refactored onWriteComplete logic to be simpler
fixed bug with flush of last written byte

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Completely reworked test harness for better coverage.

Signed-off-by: Greg Wilkins <gregw@webtide.com>

* Issue #4331 Close Complete

Reworked order of ifs to match logic above in onWriteComplete

Signed-off-by: Greg Wilkins <gregw@webtide.com>
  • Loading branch information
gregw committed Dec 19, 2019
1 parent fcc18b0 commit c5acf96
Show file tree
Hide file tree
Showing 18 changed files with 1,600 additions and 784 deletions.
Expand Up @@ -41,7 +41,6 @@
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.server.HttpOutput;
import org.eclipse.jetty.util.Callback;
Expand Down Expand Up @@ -243,8 +242,8 @@ public void onWritePossible() throws IOException
// The write is too large and will stall.
output.write(ByteBuffer.wrap(new byte[2 * clientWindow]));

// We cannot call complete() now before checking for isReady().
// This will abort the response and the client will receive a reset.
// We can now call complete() now before checking for isReady().
// This will asynchronously complete when the write is finished.
asyncContext.complete();
}

Expand Down Expand Up @@ -275,7 +274,7 @@ public Map<Integer, Integer> onPreface(Session session)
session.newStream(frame, promise, new Stream.Listener.Adapter()
{
@Override
public void onReset(Stream stream, ResetFrame frame)
public void onClosed(Stream stream)
{
latch.countDown();
}
Expand Down
10 changes: 9 additions & 1 deletion jetty-io/src/main/java/org/eclipse/jetty/io/WriteFlusher.java
Expand Up @@ -518,7 +518,15 @@ public boolean onFail(Throwable cause)

public void onClose()
{
onFail(new ClosedChannelException());
switch (_state.get().getType())
{
case IDLE:
case FAILED:
return;

default:
onFail(new ClosedChannelException());
}
}

boolean isFailed()
Expand Down
Expand Up @@ -146,7 +146,11 @@ public void start(final Runnable task)
@Override
public void run()
{
state().getAsyncContextEvent().getContext().getContextHandler().handle(channel.getRequest(), task);
ContextHandler.Context context = state().getAsyncContextEvent().getContext();
if (context == null)
task.run();
else
context.getContextHandler().handle(channel.getRequest(), task);
}
});
}
Expand Down
Expand Up @@ -220,7 +220,7 @@ protected void forward(ServletRequest request, ServletResponse response, Dispatc
_contextHandler.handle(_pathInContext, baseRequest, (HttpServletRequest)request, (HttpServletResponse)response);

if (!baseRequest.getHttpChannelState().isAsync())
commitResponse(response, baseRequest);
baseRequest.getResponse().softClose();
}
}
finally
Expand All @@ -242,57 +242,6 @@ public String toString()
return String.format("Dispatcher@0x%x{%s,%s}", hashCode(), _named, _uri);
}

@SuppressWarnings("Duplicates")
private void commitResponse(ServletResponse response, Request baseRequest) throws IOException, ServletException
{
if (baseRequest.getResponse().isWriting())
{
try
{
// Try closing Writer first (based on knowledge in Response obj)
response.getWriter().close();
}
catch (IllegalStateException ex)
{
try
{
// Try closing OutputStream as alternate route
// This path is possible due to badly behaving Response wrappers
response.getOutputStream().close();
}
catch (IllegalStateException ex2)
{
ServletException servletException = new ServletException("Unable to commit the response", ex2);
servletException.addSuppressed(ex);
throw servletException;
}
}
}
else
{
try
{
// Try closing OutputStream first (based on knowledge in Response obj)
response.getOutputStream().close();
}
catch (IllegalStateException ex)
{
try
{
// Try closing Writer as alternate route
// This path is possible due to badly behaving Response wrappers
response.getWriter().close();
}
catch (IllegalStateException ex2)
{
ServletException servletException = new ServletException("Unable to commit the response", ex2);
servletException.addSuppressed(ex);
throw servletException;
}
}
}
}

private class ForwardAttributes implements Attributes
{
final Attributes _attr;
Expand Down
Expand Up @@ -47,16 +47,11 @@ public EncodingHttpWriter(HttpOutput out, String encoding)
public void write(char[] s, int offset, int length) throws IOException
{
HttpOutput out = _out;
if (length == 0 && out.isAllContentWritten())
{
out.close();
return;
}

while (length > 0)
{
_bytes.reset();
int chars = length > MAX_OUTPUT_CHARS ? MAX_OUTPUT_CHARS : length;
int chars = Math.min(length, MAX_OUTPUT_CHARS);

_converter.write(s, offset, chars);
_converter.flush();
Expand Down
Expand Up @@ -509,7 +509,7 @@ public boolean handle()
// TODO that is done.

// Set a close callback on the HttpOutput to make it an async callback
_response.closeOutput(Callback.from(_state::completed));
_response.completeOutput(Callback.from(_state::completed));

break;
}
Expand Down Expand Up @@ -1212,7 +1212,7 @@ public void failed(final Throwable x)
@Override
public void succeeded()
{
_response.getHttpOutput().closed();
_response.getHttpOutput().completed();
super.failed(x);
}

Expand Down
Expand Up @@ -889,6 +889,9 @@ public void sendError(int code, String message)
if (LOG.isDebugEnabled())
LOG.debug("sendError {}", toStringLocked());

if (_outputState != OutputState.OPEN)
throw new IllegalStateException(_outputState.toString());

switch (_state)
{
case HANDLING:
Expand All @@ -902,7 +905,7 @@ public void sendError(int code, String message)
throw new IllegalStateException("Response is " + _outputState);

response.setStatus(code);
response.closedBySendError();
response.softClose();

request.setAttribute(ErrorHandler.ERROR_CONTEXT, request.getErrorContext());
request.setAttribute(ERROR_REQUEST_URI, request.getRequestURI());
Expand Down Expand Up @@ -970,7 +973,7 @@ protected void completed()
}

// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();
_channel.getResponse().getHttpOutput().completed();

if (event != null)
{
Expand Down
Expand Up @@ -407,20 +407,25 @@ public void onCompleted()
}
else if (_parser.inContentState() && _generator.isPersistent())
{
// If we are async, then we have problems to complete neatly
if (_input.isAsync())
// Try to progress without filling.
parseRequestBuffer();
if (_parser.inContentState())
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
_channel.abort(new IOException("unconsumed input"));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
// Complete reading the request
if (!_input.consumeAll())
// If we are async, then we have problems to complete neatly
if (_input.isAsync())
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input while async {}", _parser.isChunking() ? "Possible " : "", this);
_channel.abort(new IOException("unconsumed input"));
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{}unconsumed input {}", _parser.isChunking() ? "Possible " : "", this);
// Complete reading the request
if (!_input.consumeAll())
_channel.abort(new IOException("unconsumed input"));
}
}
}

Expand Down

0 comments on commit c5acf96

Please sign in to comment.