diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index 016a488d48f0..d4c95aec9f4b 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -18,7 +18,9 @@ package org.eclipse.jetty.server; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.nio.ByteBuffer; @@ -27,6 +29,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Exchanger; @@ -49,10 +52,12 @@ import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.hamcrest.Matchers; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -67,10 +72,13 @@ */ public class AsyncCompletionTest extends HttpServerTestFixture { - private static final Exchanger X = new Exchanger<>(); - private static final AtomicBoolean __complete = new AtomicBoolean(); - private static String __data = "Now is the time for all good men to come to the aid of the party"; - + private static final int POLL = 10; // milliseconds + private static final int WAIT = 10; // seconds + private static final String SMALL = "Now is the time for all good men to come to the aid of the party. "; + private static final String LARGE = SMALL + SMALL + SMALL + SMALL + SMALL; + private static final int BUFFER_SIZE = SMALL.length() * 3 / 2; + private static final BlockingQueue __queue = new BlockingArrayQueue<>(); + private static final AtomicBoolean __transportComplete = new AtomicBoolean(); private static class PendingCallback extends Callback.Nested { @@ -97,7 +105,7 @@ public void proceed() { try { - _pending.get(10, TimeUnit.SECONDS); + _pending.get(WAIT, TimeUnit.SECONDS); getCallback().succeeded(); } catch (Throwable th) @@ -111,13 +119,15 @@ public void proceed() @BeforeEach public void init() throws Exception { - __complete.set(false); + __transportComplete.set(false); startServer(new ServerConnector(_server, new HttpConnectionFactory() { @Override public Connection newConnection(Connector connector, EndPoint endPoint) { + getHttpConfiguration().setOutputBufferSize(BUFFER_SIZE); + getHttpConfiguration().setOutputAggregationSize(BUFFER_SIZE); return configure(new ExtendedHttpConnection(getHttpConfiguration(), connector, endPoint), connector, endPoint); } }) @@ -142,14 +152,7 @@ public void write(Callback callback, ByteBuffer... buffers) throws IllegalStateE { PendingCallback delay = new PendingCallback(callback); super.write(delay, buffers); - try - { - X.exchange(delay); - } - catch (InterruptedException e) - { - throw new RuntimeException(e); - } + __queue.offer(delay); } } @@ -163,38 +166,44 @@ public ExtendedHttpConnection(HttpConfiguration config, Connector connector, End @Override public void onCompleted() { - __complete.compareAndSet(false,true); + __transportComplete.compareAndSet(false, true); super.onCompleted(); } } - // Tests from here use these parameters - public static Stream tests() + enum WriteStyle + {ARRAY, BUFFER, BYTE, BYTE_THEN_ARRAY, PRINT} + + ; + + public static Stream asyncIOWriteTests() { List tests = new ArrayList<>(); - tests.add(new Object[]{new HelloWorldHandler(), false, 200, "Hello world"}); - tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), false, 499, "Test async sendError"}); - tests.add(new Object[]{new AsyncReadyCompleteHandler(), false, 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(false, false), false, 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(false, true), false, 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(true, false), false, 200, __data}); - tests.add(new Object[]{new AsyncWriteCompleteHandler(true, true), false, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(false, false, false), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(false, false, true), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(false, true, false), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(false, true, true), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(true, false, false), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(true, false, true), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(true, true, false), true, 200, __data}); - tests.add(new Object[]{new BlockingWriteCompleteHandler(true, true, true), true, 200, __data}); - tests.add(new Object[]{new SendContentHandler(false), false, 200, __data}); - tests.add(new Object[]{new SendContentHandler(true), true, 200, __data}); + for (WriteStyle w : WriteStyle.values()) + { + for (boolean contentLength : new Boolean[]{true, false}) + { + for (boolean isReady : new Boolean[]{true, false}) + { + for (boolean flush : new Boolean[]{true, false}) + { + for (boolean close : new Boolean[]{true, false}) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new AsyncIOWriteHandler(w, contentLength, isReady, flush, close, data)}); + } + } + } + } + } + } return tests.stream().map(Arguments::of); } @ParameterizedTest - @MethodSource("tests") - public void testAsyncCompletion(Handler handler, boolean blocked, int status, String message) throws Exception + @MethodSource("asyncIOWriteTests") + public void testAsyncIOWrite(AsyncIOWriteHandler handler) throws Exception { configureServer(handler); @@ -202,95 +211,242 @@ public void testAsyncCompletion(Handler handler, boolean blocked, int status, St try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) { OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); // write the request os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); os.flush(); - // The write should happen but the callback is delayed - HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); - assertThat(response, Matchers.notNullValue()); - assertThat(response.getStatus(), is(status)); - String content = response.getContent(); - assertThat(content, containsString(message)); + // wait for OWP to execute (proves we do not block in write APIs) + boolean completeCalled = handler.waitForOWPExit(); + + while (true) + { + // wait for threads to return to base level (proves we are really async) + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (_threadPool.getBusyThreads() != base) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + Thread.sleep(POLL); + } + + if (completeCalled) + break; + + // We are now asynchronously waiting! + assertThat(__transportComplete.get(), is(false)); + + // If we are not complete, we must be waiting for one or more writes to complete + while (true) + { + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + { + delay.proceed(); + continue; + } + // No delay callback found, have we finished OWP again? + Boolean c = handler.pollForOWPExit(); - // Check that a thread is held busy in write - assertThat(_threadPool.getBusyThreads(), Matchers.greaterThan(base)); // TODO why is this the case for async? + if (c == null) + // No we haven't, so look for another delay callback + continue; - // Getting the Delayed callback will free the thread - PendingCallback delay = X.exchange(null, 10, TimeUnit.SECONDS); + // We have a OWP result, so let's handle it. + completeCalled = c; + break; + } + } - // wait for threads to return to base level - long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); - while (_threadPool.getBusyThreads() != base + (blocked ? 1 : 0)) + // Wait for full completion + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) { if (System.nanoTime() > end) throw new TimeoutException(); - Thread.sleep(10); + + // proceed with any delayCBs needed for completion + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); } - // We are now asynchronously waiting! - assertThat(__complete.get(), is(false)); + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); + } + } - // Do we need to wait for an unready state? - if (handler instanceof AsyncWriteCompleteHandler) + private static class AsyncIOWriteHandler extends AbstractHandler + { + final WriteStyle _write; + final boolean _contentLength; + final boolean _isReady; + final boolean _flush; + final boolean _close; + final String _data; + final Exchanger _ready = new Exchanger<>(); + int _toWrite; + boolean _flushed; + boolean _closed; + + AsyncIOWriteHandler(WriteStyle write, boolean contentLength, boolean isReady, boolean flush, boolean close, String data) + { + _write = write; + _contentLength = contentLength; + _isReady = isReady; + _flush = flush; + _close = close; + _data = data; + _toWrite = data.length(); + } + + public String getExpectedMessage() + { + return SMALL; + } + + boolean waitForOWPExit() + { + try { - AsyncWriteCompleteHandler awch = (AsyncWriteCompleteHandler)handler; - if (awch._unReady) - assertThat(awch._unReadySeen.await(5, TimeUnit.SECONDS),is(true)); + return _ready.exchange(null); } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + } - // proceed with the completion - delay.proceed(); - - while(!__complete.get()) + Boolean pollForOWPExit() + { + try { - if (System.nanoTime() > end) - throw new TimeoutException(); - try - { - X.exchange(null, 10, TimeUnit.MILLISECONDS).proceed(); - } - catch (TimeoutException e) - {} + return _ready.exchange(null, POLL, TimeUnit.MILLISECONDS); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + catch (TimeoutException e) + { + return null; } } - } - private static class AsyncReadyCompleteHandler extends AbstractHandler - { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - // start async - // register WriteListener - // if ready write bytes - // if ready complete baseRequest.setHandled(true); AsyncContext context = request.startAsync(); ServletOutputStream out = response.getOutputStream(); + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); + if (_contentLength) + response.setContentLength(bytes.length); + out.setWriteListener(new WriteListener() { - byte[] bytes = __data.getBytes(StandardCharsets.ISO_8859_1); - @Override public void onWritePossible() throws IOException { - while (out.isReady()) + try { - if (bytes != null) - { - response.setContentType("text/plain"); - response.setContentLength(bytes.length); - out.write(bytes); - bytes = null; - } - else + if (out.isReady()) { + if (_toWrite > 0) + { + switch (_write) + { + case ARRAY: + _toWrite = 0; + out.write(bytes, 0, bytes.length); + break; + + case BUFFER: + _toWrite = 0; + ((HttpOutput)out).write(BufferUtil.toBuffer(bytes)); + break; + + case BYTE: + for (int i = bytes.length - _toWrite; i < bytes.length; i++) + { + _toWrite--; + out.write(bytes[i]); + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } + break; + + case BYTE_THEN_ARRAY: + _toWrite = 0; + out.write(bytes[0]); // This should always aggregate + assertThat(out.isReady(), is(true)); + out.write(bytes, 1, bytes.length - 1); + break; + + case PRINT: + _toWrite = 0; + out.print(_data); + break; + } + } + + if (_flush && !_flushed) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + _flushed = true; + out.flush(); + } + + if (_close && !_closed) + { + if (_isReady) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } + _closed = true; + out.close(); + } + + if (_isReady) + { + boolean ready = out.isReady(); + if (!ready) + { + _ready.exchange(Boolean.FALSE); + return; + } + } context.complete(); - return; + _ready.exchange(Boolean.TRUE); } } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + finally + { + } } @Override @@ -300,190 +456,312 @@ public void onError(Throwable t) } }); } - } - - private static class AsyncWriteCompleteHandler extends AbstractHandler - { - final boolean _unReady; - final boolean _close; - final CountDownLatch _unReadySeen = new CountDownLatch(1); - boolean _written; - AsyncWriteCompleteHandler(boolean unReady, boolean close) + @Override + public String toString() { - _unReady = unReady; - _close = close; + return String.format("AWCH{w=%s,cl=%b,ir=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _isReady, _flush, _close, _data.length()); } + } - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + public static Stream blockingWriteTests() + { + List tests = new ArrayList<>(); + for (WriteStyle w : WriteStyle.values()) { - // start async - // register WriteListener - // if ready - // if not written write bytes - // if _unReady check that isReady() returns false and return - // if _close then call close without checking isReady() - // context.complete() without checking is ready - baseRequest.setHandled(true); - AsyncContext context = request.startAsync(); - ServletOutputStream out = response.getOutputStream(); - out.setWriteListener(new WriteListener() { - byte[] bytes = __data.getBytes(StandardCharsets.ISO_8859_1); - @Override - public void onWritePossible() throws IOException + for (boolean contentLength : new Boolean[]{true, false}) + { + for (boolean flush : new Boolean[]{true, false}) { - if (out.isReady()) + for (boolean close : new Boolean[]{true, false}) { - if (!_written) + for (String data : new String[]{SMALL, LARGE}) { - _written = true; - response.setContentType("text/plain"); - response.setContentLength(bytes.length); - out.write(bytes); + tests.add(new Object[]{new BlockingWriteHandler(w, contentLength, flush, close, data)}); } - if (_unReady && _unReadySeen.getCount() == 1) - { - assertThat(out.isReady(), Matchers.is(false)); - _unReadySeen.countDown(); - return; - } - if (_close) - { - out.close(); - } - context.complete(); } } + } + } + return tests.stream().map(Arguments::of); + } - @Override - public void onError(Throwable t) + @ParameterizedTest + @MethodSource("blockingWriteTests") + public void testBlockingWrite(BlockingWriteHandler handler) throws Exception + { + configureServer(handler); + + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); + + // write the request + os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + handler.wait4handle(); + + // Wait for full completion + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + + // proceed with any delayCBs needed for completion + try { - t.printStackTrace(); + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); } - }); - } + catch (Exception e) + { + // ignored + } + } - @Override - public String toString() - { - return String.format("AWCH@%x{ur=%b,c=%b}", hashCode(), _unReady, _close); + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); } } - private static class BlockingWriteCompleteHandler extends AbstractHandler + private static class BlockingWriteHandler extends AbstractHandler { + final WriteStyle _write; final boolean _contentLength; + final boolean _flush; final boolean _close; - final boolean _dispatchComplete; + final String _data; + final CountDownLatch _wait = new CountDownLatch(1); - BlockingWriteCompleteHandler(boolean contentLength, boolean close, boolean dispatchComplete) + BlockingWriteHandler(WriteStyle write, boolean contentLength, boolean flush, boolean close, String data) { + _write = write; _contentLength = contentLength; + _flush = flush; _close = close; - _dispatchComplete = dispatchComplete; + _data = data; + } + + public String getExpectedMessage() + { + return SMALL; + } + + public void wait4handle() + { + try + { + Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - // Start async - // Do a blocking write in another thread - // call complete while the write is still blocking baseRequest.setHandled(true); AsyncContext context = request.startAsync(); ServletOutputStream out = response.getOutputStream(); - CountDownLatch writing = new CountDownLatch(1); - Runnable write = () -> + context.start(() -> { try { - byte[] bytes = __data.getBytes(StandardCharsets.ISO_8859_1); + _wait.countDown(); + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); if (_contentLength) response.setContentLength(bytes.length); - writing.countDown(); - out.write(bytes); + switch (_write) + { + case ARRAY: + out.write(bytes, 0, bytes.length); + break; + + case BUFFER: + ((HttpOutput)out).write(BufferUtil.toBuffer(bytes)); + break; + + case BYTE: + for (byte b : bytes) + { + out.write(b); + } + break; + + case BYTE_THEN_ARRAY: + out.write(bytes[0]); // This should always aggregate + out.write(bytes, 1, bytes.length - 1); + break; + + case PRINT: + out.print(_data); + break; + } + + if (_flush) + out.flush(); if (_close) out.close(); + + context.complete(); } - catch(Exception e) + catch (Exception e) { - e.printStackTrace(); + throw new RuntimeException(e); } - }; + }); + } + + @Override + public String toString() + { + return String.format("BWCH{w=%s,cl=%b,f=%b,c=%b,d=%d}", _write, _contentLength, _flush, _close, _data.length()); + } + } + + public static Stream sendContentTests() + { + List tests = new ArrayList<>(); + for (ContentStyle style : ContentStyle.values()) + { + for (String data : new String[]{SMALL, LARGE}) + { + tests.add(new Object[]{new SendContentHandler(style, data)}); + } + } + return tests.stream().map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("sendContentTests") + public void testSendContent(SendContentHandler handler) throws Exception + { + configureServer(handler); - Runnable complete = () -> + int base = _threadPool.getBusyThreads(); + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream os = client.getOutputStream(); + InputStream in = client.getInputStream(); + + // write the request + os.write("GET / HTTP/1.0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + os.flush(); + + handler.wait4handle(); + + long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (_threadPool.getBusyThreads() != base) + { + if (System.nanoTime() > end) + throw new TimeoutException(); + Thread.sleep(POLL); + } + + // Wait for full completion + end = System.nanoTime() + TimeUnit.SECONDS.toNanos(WAIT); + while (!__transportComplete.get()) { + if (System.nanoTime() > end) + throw new TimeoutException(); + + // proceed with any delayCBs needed for completion try { - writing.await(5, TimeUnit.SECONDS); - Thread.sleep(200); - context.complete(); + PendingCallback delay = __queue.poll(POLL, TimeUnit.MILLISECONDS); + if (delay != null) + delay.proceed(); } - catch(Exception e) + catch (Exception e) { - e.printStackTrace(); + // ignored } - }; - - if (_dispatchComplete) - { - context.start(complete); - write.run(); } - else - { - context.start(write); - complete.run(); - } - } - @Override - public String toString() - { - return String.format("BWCH@%x{cl=%b,c=%b,dc=%b}", hashCode(), _contentLength, _close, _dispatchComplete); + // Check we got a response! + HttpTester.Response response = HttpTester.parseResponse(in); + assertThat(response, Matchers.notNullValue()); + assertThat(response.getStatus(), is(200)); + String content = response.getContent(); + assertThat(content, containsString(handler.getExpectedMessage())); } } + enum ContentStyle + {BUFFER, STREAM} // TODO more types needed here + private static class SendContentHandler extends AbstractHandler { - final boolean _blocking; + final ContentStyle _style; + final String _data; + final CountDownLatch _wait = new CountDownLatch(1); + + SendContentHandler(ContentStyle style, String data) + { + _style = style; + _data = data; + } + + public String getExpectedMessage() + { + return SMALL; + } - private SendContentHandler(boolean blocking) + public void wait4handle() { - _blocking = blocking; + try + { + Assertions.assertTrue(_wait.await(WAIT, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } } @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - // Start async - // Do a blocking write in another thread - // call complete while the write is still blocking baseRequest.setHandled(true); AsyncContext context = request.startAsync(); - ServletOutputStream out = response.getOutputStream(); + HttpOutput out = (HttpOutput)response.getOutputStream(); + response.setContentType("text/plain"); + byte[] bytes = _data.getBytes(StandardCharsets.ISO_8859_1); - if (_blocking) + switch (_style) { - ((HttpOutput)out).sendContent(BufferUtil.toBuffer(__data)); - context.complete(); - } - else - { - ((HttpOutput)out).sendContent(BufferUtil.toBuffer(__data), Callback.from(context::complete)); + case BUFFER: + out.sendContent(BufferUtil.toBuffer(bytes), Callback.from(context::complete)); + break; + + case STREAM: + out.sendContent(new ByteArrayInputStream(bytes), Callback.from(context::complete)); + break; } + + _wait.countDown(); } @Override public String toString() { - return String.format("SCH@%x{b=%b}", hashCode(), _blocking); + return String.format("SCCH{w=%s,d=%d}", _style, _data.length()); } } }