Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #4376 Async Content Complete #4377

Merged
merged 5 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1195,10 +1195,7 @@ public void succeeded()
if (_length > 0)
_combinedListener.onResponseContent(_request, _content);
if (_complete && _state.completeResponse())
{
_response.getHttpOutput().closed();
_combinedListener.onResponseEnd(_request);
}
super.succeeded();
}

Expand All @@ -1222,7 +1219,6 @@ public void succeeded()
@Override
public void failed(Throwable th)
{
_response.getHttpOutput().closed();
abort(x);
super.failed(x);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,9 @@ protected void completed()
}
}

// release any aggregate buffer from a closing flush
_channel.getResponse().getHttpOutput().closed();

if (event != null)
{
cancelTimeout(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,8 @@ private void write(ByteBuffer content, boolean complete) throws IOException
{
write(content, complete, blocker);
blocker.block();
if (complete)
closed();
}
catch (Exception failure)
{
Expand Down Expand Up @@ -403,21 +405,19 @@ public void closed()
State state = _state.get();
switch (state)
{
case CLOSING:
{
if (!_state.compareAndSet(state, State.CLOSED))
break;
releaseBuffer();
return;
}
case CLOSED:
{
return;
}
case UNREADY:
{
if (_state.compareAndSet(state, State.ERROR))
_writeListener.onError(_onError == null ? new EofException("Async closed") : _onError);
{
if (_onError == null)
_onError = new EofException("Async closed");
releaseBuffer();
return;
}
break;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.http.HttpCompliance;
import org.eclipse.jetty.http.HttpTester;
Expand All @@ -41,6 +47,7 @@
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.ManagedSelector;
import org.eclipse.jetty.io.SocketChannelEndPoint;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
Expand Down Expand Up @@ -89,15 +96,14 @@ public void proceed()
_delay.get(10, TimeUnit.SECONDS);
getCallback().succeeded();
}
catch(Throwable th)
catch (Throwable th)
{
th.printStackTrace();
getCallback().failed(th);
}
}
}


@BeforeEach
public void init() throws Exception
{
Expand Down Expand Up @@ -153,7 +159,7 @@ public ExtendedHttpConnection(HttpConfiguration config, Connector connector, End
@Override
public void onCompleted()
{
COMPLETE.compareAndSet(false,true);
COMPLETE.compareAndSet(false, true);
super.onCompleted();
}
}
Expand All @@ -163,7 +169,8 @@ public static Stream<Arguments> tests()
{
List<Object[]> tests = new ArrayList<>();
tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"});
tests.add(new Object[]{new SendErrorHandler(499,"Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"});
tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data});
return tests.stream().map(Arguments::of);
}

Expand Down Expand Up @@ -197,7 +204,7 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr

// wait for threads to return to base level
long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
while(_threadPool.getBusyThreads() != base)
while (_threadPool.getBusyThreads() != base)
{
if (System.nanoTime() > end)
throw new TimeoutException();
Expand All @@ -210,12 +217,54 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr
// proceed with the completion
delay.proceed();

while(!COMPLETE.get())
while (!COMPLETE.get())
{
if (System.nanoTime() > end)
throw new TimeoutException();
Thread.sleep(10);
}
}
}

private static class AsyncReadyCompleteHandler extends AbstractHandler
{
static String data = "Now is the time for all good men to come to the aid of the party";

@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
AsyncContext context = request.startAsync();
ServletOutputStream out = response.getOutputStream();
out.setWriteListener(new WriteListener()
{
byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1);

@Override
public void onWritePossible() throws IOException
{
while (out.isReady())
{
if (bytes != null)
{
response.setContentType("text/plain");
response.setContentLength(bytes.length);
out.write(bytes);
bytes = null;
}
else
{
context.complete();
return;
}
}
}

@Override
public void onError(Throwable t)
{
t.printStackTrace();
}
});
}
}
}