diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 12ee2e0c5725..48b4ed8b4db0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -297,12 +297,6 @@ public void close(Callback callback) callback.succeeded(); return; - case CLOSING: - // Close already initiated, so just add the callback to those - // executed when it is complete. - _closeCallback = Callback.combine(_closeCallback, callback); - return; - case ERROR: // TODO is this right? Callback cb = Callback.combine(_closeCallback, callback); @@ -311,16 +305,16 @@ public void close(Callback callback) _state = State.CLOSED; return; - case PENDING: - case UNREADY: + case CLOSING: // Close already initiated, so just add callback + case PENDING: // Add the callback and close when write is complete. + case UNREADY: // Add the callback and close when write is complete. // Let's just add the callback so it get's noticed once write is possible. _closeCallback = Callback.combine(_closeCallback, callback); - break; + return; default: _state = State.CLOSING; _closeCallback = Callback.combine(_closeCallback, callback); - break; } } diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 4f8e3e8fc63f..e9687451275c 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -28,6 +28,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -170,19 +171,19 @@ public void onCompleted() public static Stream tests() { List tests = new ArrayList<>(); - tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"}); - tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"}); - tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), 200, __data}); + tests.add(new Object[]{new HelloWorldHandler(), false, 200, "Hello world"}); + tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), false, 499, "Test async sendError"}); + tests.add(new Object[]{new AsyncReadyCompleteHandler(), false, 200, __data}); + tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), false, 200, __data}); + tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), true, 200, __data}); + tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), false, 200, __data}); + tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), true, 200, __data}); return tests.stream().map(Arguments::of); } @ParameterizedTest @MethodSource("tests") - public void testAsyncCompletion(Handler handler, int status, String message) throws Exception + public void testAsyncCompletion(Handler handler, boolean blocked, int status, String message) throws Exception { configureServer(handler); @@ -210,7 +211,7 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr // wait for threads to return to base level long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); - while (_threadPool.getBusyThreads() != base) + while (_threadPool.getBusyThreads() != base + (blocked ? 1 : 0)) { if (System.nanoTime() > end) throw new TimeoutException(); @@ -220,6 +221,14 @@ public void testAsyncCompletion(Handler handler, int status, String message) thr // We are now asynchronously waiting! assertThat(__complete.get(), is(false)); + // Do we need to wait for an unready state? + if (handler instanceof AsyncWriteCompleteHandler) + { + AsyncWriteCompleteHandler awch = (AsyncWriteCompleteHandler)handler; + if (awch._unReady) + assertThat(awch._unReadySeen.await(5, TimeUnit.SECONDS),is(true)); + } + // proceed with the completion delay.proceed(); @@ -276,6 +285,8 @@ private static class AsyncWriteCompleteHandler extends AbstractHandler { final boolean _unReady; final boolean _close; + final CountDownLatch _unReadySeen = new CountDownLatch(1); + boolean _written; AsyncWriteCompleteHandler(boolean unReady, boolean close) { @@ -295,13 +306,23 @@ public void onWritePossible() throws IOException { if (out.isReady()) { - response.setContentType("text/plain"); - response.setContentLength(bytes.length); - out.write(bytes); - if (_unReady) - assertThat(out.isReady(),Matchers.is(false)); + if (!_written) + { + _written = true; + response.setContentType("text/plain"); + response.setContentLength(bytes.length); + out.write(bytes); + } + if (_unReady && _unReadySeen.getCount() == 1) + { + assertThat(out.isReady(), Matchers.is(false)); + _unReadySeen.countDown(); + return; + } if (_close) + { out.close(); + } context.complete(); } } @@ -313,5 +334,11 @@ public void onError(Throwable t) } }); } + + @Override + public String toString() + { + return String.format("%s@%x{ur=%b,c=%b}", this.getClass().getSimpleName(), hashCode(), _unReady, _close); + } } }