From 05c08e16029a5bad9cc148b10a0ad18992a9627a Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 25 Aug 2021 15:46:37 +0200 Subject: [PATCH] Backport from 10.0.x of the changes using Awaitility. Signed-off-by: Simone Bordet --- jetty-client/pom.xml | 4 + .../client/HttpConnectionLifecycleTest.java | 8 +- .../client/http/HttpSenderOverHTTPTest.java | 5 +- .../http2-http-client-transport/pom.xml | 4 + .../BlockedWritesWithSmallThreadPoolTest.java | 90 ++++++++----------- jetty-util/pom.xml | 4 + .../jetty/util/BlockingArrayQueueTest.java | 36 +------- pom.xml | 6 ++ tests/test-http-client-transport/pom.xml | 4 + .../jetty/http/client/AsyncIOServletTest.java | 19 ++-- .../http/client/HttpClientContinueTest.java | 80 ++++++++++++----- 11 files changed, 135 insertions(+), 125 deletions(-) diff --git a/jetty-client/pom.xml b/jetty-client/pom.xml index 8d5cd655d3f0..73ecc601924b 100644 --- a/jetty-client/pom.xml +++ b/jetty-client/pom.xml @@ -135,5 +135,9 @@ jetty-test-helper test + + org.awaitility + awaitility + diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java index 1cb5fcf9e7fb..38d139a16104 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java @@ -44,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -430,8 +431,6 @@ public void onComplete(Result result) @ParameterizedTest @ArgumentsSource(ScenarioProvider.class) - @Tag("Slow") - @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Exception { start(scenario, new EmptyServerHandler()); @@ -457,10 +456,7 @@ public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Ex connector.stop(); // Give the connection some time to process the remote close - TimeUnit.SECONDS.sleep(1); - - assertEquals(0, idleConnections.size()); - assertEquals(0, activeConnections.size()); + await().atMost(5, TimeUnit.SECONDS).until(() -> idleConnections.size() == 0 && activeConnections.size() == 0); } @ParameterizedTest diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java index 22dc6b0a5f5a..e6ff04bc81f4 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java @@ -38,8 +38,8 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.DisabledIfSystemProperty; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -94,7 +94,6 @@ public void onSuccess(Request request) } @Test - @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review public void testSendNoRequestContentIncompleteFlush() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); @@ -108,7 +107,7 @@ public void testSendNoRequestContentIncompleteFlush() throws Exception StringBuilder builder = new StringBuilder(endPoint.takeOutputString()); // Wait for the write to complete - TimeUnit.SECONDS.sleep(1); + await().atMost(5, TimeUnit.SECONDS).until(() -> endPoint.toEndPointString().contains(",flush=P,")); String chunk = endPoint.takeOutputString(); while (chunk.length() > 0) diff --git a/jetty-http2/http2-http-client-transport/pom.xml b/jetty-http2/http2-http-client-transport/pom.xml index 1fa6f76fddd0..717bbfdf54b0 100644 --- a/jetty-http2/http2-http-client-transport/pom.xml +++ b/jetty-http2/http2-http-client-transport/pom.xml @@ -118,6 +118,10 @@ ${project.version} test + + org.awaitility + awaitility + diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java index f935c713e5c8..ab8790a23708 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -56,10 +55,10 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.fail; public class BlockedWritesWithSmallThreadPoolTest { @@ -143,16 +142,23 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r @Override public void onData(Stream stream, DataFrame frame, Callback callback) { - // Block here to stop reading from the network - // to cause the server to TCP congest. - awaitUntil(0, () -> clientBlockLatch.await(5, TimeUnit.SECONDS)); - callback.succeeded(); - if (frame.isEndStream()) - clientDataLatch.countDown(); + try + { + // Block here to stop reading from the network + // to cause the server to TCP congest. + clientBlockLatch.await(5, TimeUnit.SECONDS); + callback.succeeded(); + if (frame.isEndStream()) + clientDataLatch.countDown(); + } + catch (InterruptedException x) + { + callback.failed(x); + } } }); - awaitUntil(5000, () -> + await().atMost(5, TimeUnit.SECONDS).until(() -> { AbstractEndPoint serverEndPoint = serverEndPointRef.get(); return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending(); @@ -164,11 +170,11 @@ public void onData(Stream stream, DataFrame frame, Callback callback) if (serverThreads.getAvailableReservedThreads() != 1) { assertFalse(serverThreads.tryExecute(() -> {})); - awaitUntil(5000, () -> serverThreads.getAvailableReservedThreads() == 1); + await().atMost(5, TimeUnit.SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1); } // Use the reserved thread for a blocking operation, simulating another blocking write. CountDownLatch serverBlockLatch = new CountDownLatch(1); - assertTrue(serverThreads.tryExecute(() -> awaitUntil(0, () -> serverBlockLatch.await(15, TimeUnit.SECONDS)))); + assertTrue(serverThreads.tryExecute(() -> await().atMost(20, TimeUnit.SECONDS).until(() -> serverBlockLatch.await(15, TimeUnit.SECONDS), b -> true))); assertEquals(0, serverThreads.getReadyThreads()); @@ -194,14 +200,21 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) @Override public void onData(Stream stream, DataFrame frame, Callback callback) { - // Block here to stop reading from the network - // to cause the client to TCP congest. - awaitUntil(0, () -> serverBlockLatch.await(5, TimeUnit.SECONDS)); - callback.succeeded(); - if (frame.isEndStream()) + try + { + // Block here to stop reading from the network + // to cause the client to TCP congest. + serverBlockLatch.await(5, TimeUnit.SECONDS); + callback.succeeded(); + if (frame.isEndStream()) + { + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + } + catch (InterruptedException x) { - MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); - stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + callback.failed(x); } } }; @@ -241,7 +254,7 @@ public void onHeaders(Stream stream, HeadersFrame frame) Stream stream = streamPromise.get(5, TimeUnit.SECONDS); stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP); - awaitUntil(5000, () -> + await().atMost(5, TimeUnit.SECONDS).until(() -> { AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint(); return clientEndPoint.getWriteFlusher().isPending(); @@ -251,17 +264,17 @@ public void onHeaders(Stream stream, HeadersFrame frame) CountDownLatch clientBlockLatch = new CountDownLatch(1); // Make sure the application thread is blocked. - clientThreads.execute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))); + clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true)); // Make sure the reserved thread is blocked. if (clientThreads.getAvailableReservedThreads() != 1) { assertFalse(clientThreads.tryExecute(() -> {})); - awaitUntil(5000, () -> clientThreads.getAvailableReservedThreads() == 1); + await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1); } // Use the reserved thread for a blocking operation, simulating another blocking write. - assertTrue(clientThreads.tryExecute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS)))); + assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true))); - awaitUntil(5000, () -> clientThreads.getReadyThreads() == 0); + await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getReadyThreads() == 0); // Unblock the server to read from the network, which should unblock the client. serverBlockLatch.countDown(); @@ -269,35 +282,4 @@ public void onHeaders(Stream stream, HeadersFrame frame) assertTrue(latch.await(10, TimeUnit.SECONDS), client.dump()); clientBlockLatch.countDown(); } - - private void awaitUntil(long millis, Callable test) - { - try - { - if (millis == 0) - { - if (test.call()) - return; - } - else - { - long begin = System.nanoTime(); - while (System.nanoTime() - begin < TimeUnit.MILLISECONDS.toNanos(millis)) - { - if (test.call()) - return; - Thread.sleep(10); - } - } - fail("Await elapsed: " + millis + "ms"); - } - catch (RuntimeException | Error x) - { - throw x; - } - catch (Exception x) - { - throw new RuntimeException(x); - } - } } diff --git a/jetty-util/pom.xml b/jetty-util/pom.xml index 2d6ba32ca686..ee79b3e03b59 100644 --- a/jetty-util/pom.xml +++ b/jetty-util/pom.xml @@ -96,5 +96,9 @@ ${slf4j.version} test + + org.awaitility + awaitility + diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java index 01c36a5e1b6c..4c470cc8ad98 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java @@ -18,13 +18,10 @@ package org.eclipse.jetty.util; -import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.ListIterator; -import java.util.Objects; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; @@ -35,7 +32,7 @@ import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; -import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -533,35 +530,4 @@ public void testDrainTo() throws Exception assertThat(queue.size(), Matchers.is(0)); assertThat(queue, Matchers.empty()); } - - static class Await - { - private Duration duration; - - public static Await await() - { - return new Await(); - } - - public Await atMost(long time, TimeUnit unit) - { - duration = Duration.ofMillis(unit.toMillis(time)); - return this; - } - - public void until(Callable condition) throws Exception - { - Objects.requireNonNull(duration); - long start = System.nanoTime(); - - while (true) - { - if (condition.call()) - return; - if (duration.minus(Duration.ofNanos(System.nanoTime() - start)).isNegative()) - throw new AssertionError("Duration expired"); - Thread.sleep(10); - } - } - } } diff --git a/pom.xml b/pom.xml index 9fd385c58f7c..d0bf802a30d8 100644 --- a/pom.xml +++ b/pom.xml @@ -1045,6 +1045,12 @@ junit-jupiter ${junit.version} + + org.awaitility + awaitility + 4.1.0 + test + org.testcontainers diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml index 837a4d4ec809..bc66affa91c9 100644 --- a/tests/test-http-client-transport/pom.xml +++ b/tests/test-http-client-transport/pom.xml @@ -163,6 +163,10 @@ jetty-test-helper test + + org.awaitility + awaitility + diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java index 870b0559175e..a2f9105b930a 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java @@ -75,12 +75,11 @@ import org.eclipse.jetty.util.log.StacklessLogging; import org.hamcrest.Matchers; import org.junit.jupiter.api.Assumptions; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; import static java.nio.ByteBuffer.wrap; +import static org.awaitility.Awaitility.await; import static org.eclipse.jetty.http.client.Transport.FCGI; import static org.eclipse.jetty.http.client.Transport.H2C; import static org.eclipse.jetty.http.client.Transport.HTTP; @@ -398,8 +397,6 @@ public void onError(Throwable t) @ParameterizedTest @ArgumentsSource(TransportProvider.class) - @Tag("Unstable") - @Disabled public void testAsyncWriteClosed(Transport transport) throws Exception { init(transport); @@ -431,7 +428,19 @@ public void onWritePossible() throws IOException // Wait for the failure to arrive to // the server while we are about to write. - sleep(2000); + try + { + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + out.write(new byte[0]); + // Extract HttpOutput._apiState value from toString. + return !out.toString().split(",")[1].split("=")[1].equals("READY"); + }); + } + catch (Exception e) + { + throw new AssertionError(e); + } out.write(data); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java index 70caa76b23e0..0c74ec244206 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.http.client; import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -30,8 +31,10 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import javax.servlet.ServletException; import javax.servlet.ServletInputStream; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -55,6 +58,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; +import static org.awaitility.Awaitility.await; import static org.eclipse.jetty.http.client.Transport.FCGI; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -481,10 +485,16 @@ public void onComplete(Result result) @ParameterizedTest @ArgumentsSource(TransportProvider.class) - @Tag("Slow") - @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport transport) throws Exception { + byte[] chunk1 = new byte[]{0, 1, 2, 3}; + byte[] chunk2 = new byte[]{4, 5, 6, 7}; + byte[] data = new byte[chunk1.length + chunk2.length]; + System.arraycopy(chunk1, 0, data, 0, chunk1.length); + System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length); + + CountDownLatch serverLatch = new CountDownLatch(1); + AtomicReference handlerThread = new AtomicReference<>(); init(transport); scenario.start(new AbstractHandler() { @@ -492,18 +502,22 @@ public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { baseRequest.setHandled(true); + handlerThread.set(Thread.currentThread()); // Send 100-Continue and echo the content - IO.copy(request.getInputStream(), response.getOutputStream()); + + ServletOutputStream outputStream = response.getOutputStream(); + DataInputStream inputStream = new DataInputStream(request.getInputStream()); + // Block until the 1st chunk is fully received. + byte[] buf1 = new byte[chunk1.length]; + inputStream.readFully(buf1); + outputStream.write(buf1); + + serverLatch.countDown(); + IO.copy(inputStream, outputStream); } }); - final byte[] chunk1 = new byte[]{0, 1, 2, 3}; - final byte[] chunk2 = new byte[]{4, 5, 6, 7}; - final byte[] data = new byte[chunk1.length + chunk2.length]; - System.arraycopy(chunk1, 0, data, 0, chunk1.length); - System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length); - - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch requestLatch = new CountDownLatch(1); DeferredContentProvider content = new DeferredContentProvider(); scenario.client.newRequest(scenario.newURI()) .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) @@ -514,20 +528,31 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques public void onComplete(Result result) { assertArrayEquals(data, getContent()); - latch.countDown(); + requestLatch.countDown(); } }); - Thread.sleep(1000); + // Wait for the handler thread to be blocked in the 1st IO. + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread thread = handlerThread.get(); + return thread != null && thread.getState() == Thread.State.WAITING; + }); content.offer(ByteBuffer.wrap(chunk1)); - Thread.sleep(1000); + // Wait for the handler thread to be blocked in the 2nd IO. + assertTrue(serverLatch.await(5, TimeUnit.SECONDS)); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread thread = handlerThread.get(); + return thread != null && thread.getState() == Thread.State.WAITING; + }); content.offer(ByteBuffer.wrap(chunk2)); content.close(); - assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); } @ParameterizedTest @@ -581,6 +606,7 @@ public void onComplete(Result result) @ArgumentsSource(TransportProvider.class) public void testExpect100ContinueWithConcurrentDeferredContentRespond100Continue(Transport transport) throws Exception { + AtomicReference handlerThread = new AtomicReference<>(); init(transport); scenario.start(new AbstractHandler() { @@ -588,22 +614,22 @@ public void testExpect100ContinueWithConcurrentDeferredContentRespond100Continue public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException { baseRequest.setHandled(true); + handlerThread.set(Thread.currentThread()); // Send 100-Continue and echo the content IO.copy(request.getInputStream(), response.getOutputStream()); } }); - final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; - final DeferredContentProvider content = new DeferredContentProvider(); + byte[] chunk1 = new byte[]{0, 1, 2, 3}; + byte[] chunk2 = new byte[]{4, 5, 6, 7}; + byte[] data = new byte[chunk1.length + chunk2.length]; + System.arraycopy(chunk1, 0, data, 0, chunk1.length); + System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length); - final CountDownLatch latch = new CountDownLatch(1); + CountDownLatch latch = new CountDownLatch(1); + DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(chunk1)); scenario.client.newRequest(scenario.newURI()) .header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()) - .onRequestHeaders(request -> - { - content.offer(ByteBuffer.wrap(data)); - content.close(); - }) .content(content) .send(new BufferingResponseListener() { @@ -615,6 +641,16 @@ public void onComplete(Result result) } }); + // Wait for the handler thread to be blocked in IO. + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread thread = handlerThread.get(); + return thread != null && thread.getState() == Thread.State.WAITING; + }); + + content.offer(ByteBuffer.wrap(chunk2)); + content.close(); + assertTrue(latch.await(5, TimeUnit.SECONDS)); }