diff --git a/jetty-client/pom.xml b/jetty-client/pom.xml
index 8d5cd655d3f0..73ecc601924b 100644
--- a/jetty-client/pom.xml
+++ b/jetty-client/pom.xml
@@ -135,5 +135,9 @@
jetty-test-helper
test
+
+ org.awaitility
+ awaitility
+
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java
index 1cb5fcf9e7fb..38d139a16104 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java
@@ -44,6 +44,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -430,8 +431,6 @@ public void onComplete(Result result)
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
- @Tag("Slow")
- @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
@@ -457,10 +456,7 @@ public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Ex
connector.stop();
// Give the connection some time to process the remote close
- TimeUnit.SECONDS.sleep(1);
-
- assertEquals(0, idleConnections.size());
- assertEquals(0, activeConnections.size());
+ await().atMost(5, TimeUnit.SECONDS).until(() -> idleConnections.size() == 0 && activeConnections.size() == 0);
}
@ParameterizedTest
diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java
index 22dc6b0a5f5a..e6ff04bc81f4 100644
--- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java
+++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpSenderOverHTTPTest.java
@@ -38,8 +38,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -94,7 +94,6 @@ public void onSuccess(Request request)
}
@Test
- @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testSendNoRequestContentIncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
@@ -108,7 +107,7 @@ public void testSendNoRequestContentIncompleteFlush() throws Exception
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
// Wait for the write to complete
- TimeUnit.SECONDS.sleep(1);
+ await().atMost(5, TimeUnit.SECONDS).until(() -> endPoint.toEndPointString().contains(",flush=P,"));
String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)
diff --git a/jetty-http2/http2-http-client-transport/pom.xml b/jetty-http2/http2-http-client-transport/pom.xml
index 1fa6f76fddd0..717bbfdf54b0 100644
--- a/jetty-http2/http2-http-client-transport/pom.xml
+++ b/jetty-http2/http2-http-client-transport/pom.xml
@@ -118,6 +118,10 @@
${project.version}
test
+
+ org.awaitility
+ awaitility
+
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java
index f935c713e5c8..ab8790a23708 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/BlockedWritesWithSmallThreadPoolTest.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -56,10 +55,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
public class BlockedWritesWithSmallThreadPoolTest
{
@@ -143,16 +142,23 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
- // Block here to stop reading from the network
- // to cause the server to TCP congest.
- awaitUntil(0, () -> clientBlockLatch.await(5, TimeUnit.SECONDS));
- callback.succeeded();
- if (frame.isEndStream())
- clientDataLatch.countDown();
+ try
+ {
+ // Block here to stop reading from the network
+ // to cause the server to TCP congest.
+ clientBlockLatch.await(5, TimeUnit.SECONDS);
+ callback.succeeded();
+ if (frame.isEndStream())
+ clientDataLatch.countDown();
+ }
+ catch (InterruptedException x)
+ {
+ callback.failed(x);
+ }
}
});
- awaitUntil(5000, () ->
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
@@ -164,11 +170,11 @@ public void onData(Stream stream, DataFrame frame, Callback callback)
if (serverThreads.getAvailableReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
- awaitUntil(5000, () -> serverThreads.getAvailableReservedThreads() == 1);
+ await().atMost(5, TimeUnit.SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch serverBlockLatch = new CountDownLatch(1);
- assertTrue(serverThreads.tryExecute(() -> awaitUntil(0, () -> serverBlockLatch.await(15, TimeUnit.SECONDS))));
+ assertTrue(serverThreads.tryExecute(() -> await().atMost(20, TimeUnit.SECONDS).until(() -> serverBlockLatch.await(15, TimeUnit.SECONDS), b -> true)));
assertEquals(0, serverThreads.getReadyThreads());
@@ -194,14 +200,21 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
- // Block here to stop reading from the network
- // to cause the client to TCP congest.
- awaitUntil(0, () -> serverBlockLatch.await(5, TimeUnit.SECONDS));
- callback.succeeded();
- if (frame.isEndStream())
+ try
+ {
+ // Block here to stop reading from the network
+ // to cause the client to TCP congest.
+ serverBlockLatch.await(5, TimeUnit.SECONDS);
+ callback.succeeded();
+ if (frame.isEndStream())
+ {
+ MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
+ stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ }
+ }
+ catch (InterruptedException x)
{
- MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
- stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
+ callback.failed(x);
}
}
};
@@ -241,7 +254,7 @@ public void onHeaders(Stream stream, HeadersFrame frame)
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);
- awaitUntil(5000, () ->
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint();
return clientEndPoint.getWriteFlusher().isPending();
@@ -251,17 +264,17 @@ public void onHeaders(Stream stream, HeadersFrame frame)
CountDownLatch clientBlockLatch = new CountDownLatch(1);
// Make sure the application thread is blocked.
- clientThreads.execute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS)));
+ clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true));
// Make sure the reserved thread is blocked.
if (clientThreads.getAvailableReservedThreads() != 1)
{
assertFalse(clientThreads.tryExecute(() -> {}));
- awaitUntil(5000, () -> clientThreads.getAvailableReservedThreads() == 1);
+ await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
- assertTrue(clientThreads.tryExecute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
+ assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true)));
- awaitUntil(5000, () -> clientThreads.getReadyThreads() == 0);
+ await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getReadyThreads() == 0);
// Unblock the server to read from the network, which should unblock the client.
serverBlockLatch.countDown();
@@ -269,35 +282,4 @@ public void onHeaders(Stream stream, HeadersFrame frame)
assertTrue(latch.await(10, TimeUnit.SECONDS), client.dump());
clientBlockLatch.countDown();
}
-
- private void awaitUntil(long millis, Callable test)
- {
- try
- {
- if (millis == 0)
- {
- if (test.call())
- return;
- }
- else
- {
- long begin = System.nanoTime();
- while (System.nanoTime() - begin < TimeUnit.MILLISECONDS.toNanos(millis))
- {
- if (test.call())
- return;
- Thread.sleep(10);
- }
- }
- fail("Await elapsed: " + millis + "ms");
- }
- catch (RuntimeException | Error x)
- {
- throw x;
- }
- catch (Exception x)
- {
- throw new RuntimeException(x);
- }
- }
}
diff --git a/jetty-util/pom.xml b/jetty-util/pom.xml
index 2d6ba32ca686..ee79b3e03b59 100644
--- a/jetty-util/pom.xml
+++ b/jetty-util/pom.xml
@@ -96,5 +96,9 @@
${slf4j.version}
test
+
+ org.awaitility
+ awaitility
+
diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java
index 01c36a5e1b6c..4c470cc8ad98 100644
--- a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java
+++ b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java
@@ -18,13 +18,10 @@
package org.eclipse.jetty.util;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@@ -35,7 +32,7 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;
-import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -533,35 +530,4 @@ public void testDrainTo() throws Exception
assertThat(queue.size(), Matchers.is(0));
assertThat(queue, Matchers.empty());
}
-
- static class Await
- {
- private Duration duration;
-
- public static Await await()
- {
- return new Await();
- }
-
- public Await atMost(long time, TimeUnit unit)
- {
- duration = Duration.ofMillis(unit.toMillis(time));
- return this;
- }
-
- public void until(Callable condition) throws Exception
- {
- Objects.requireNonNull(duration);
- long start = System.nanoTime();
-
- while (true)
- {
- if (condition.call())
- return;
- if (duration.minus(Duration.ofNanos(System.nanoTime() - start)).isNegative())
- throw new AssertionError("Duration expired");
- Thread.sleep(10);
- }
- }
- }
}
diff --git a/pom.xml b/pom.xml
index 9fd385c58f7c..d0bf802a30d8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1045,6 +1045,12 @@
junit-jupiter
${junit.version}
+
+ org.awaitility
+ awaitility
+ 4.1.0
+ test
+
org.testcontainers
diff --git a/tests/test-http-client-transport/pom.xml b/tests/test-http-client-transport/pom.xml
index 837a4d4ec809..bc66affa91c9 100644
--- a/tests/test-http-client-transport/pom.xml
+++ b/tests/test-http-client-transport/pom.xml
@@ -163,6 +163,10 @@
jetty-test-helper
test
+
+ org.awaitility
+ awaitility
+
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java
index 870b0559175e..a2f9105b930a 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/AsyncIOServletTest.java
@@ -75,12 +75,11 @@
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static java.nio.ByteBuffer.wrap;
+import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
@@ -398,8 +397,6 @@ public void onError(Throwable t)
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
- @Tag("Unstable")
- @Disabled
public void testAsyncWriteClosed(Transport transport) throws Exception
{
init(transport);
@@ -431,7 +428,19 @@ public void onWritePossible() throws IOException
// Wait for the failure to arrive to
// the server while we are about to write.
- sleep(2000);
+ try
+ {
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ out.write(new byte[0]);
+ // Extract HttpOutput._apiState value from toString.
+ return !out.toString().split(",")[1].split("=")[1].equals("READY");
+ });
+ }
+ catch (Exception e)
+ {
+ throw new AssertionError(e);
+ }
out.write(data);
}
diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java
index 70caa76b23e0..0c74ec244206 100644
--- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java
+++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/HttpClientContinueTest.java
@@ -19,6 +19,7 @@
package org.eclipse.jetty.http.client;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -30,8 +31,10 @@
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
+import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -55,6 +58,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -481,10 +485,16 @@ public void onComplete(Result result)
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
- @Tag("Slow")
- @DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport transport) throws Exception
{
+ byte[] chunk1 = new byte[]{0, 1, 2, 3};
+ byte[] chunk2 = new byte[]{4, 5, 6, 7};
+ byte[] data = new byte[chunk1.length + chunk2.length];
+ System.arraycopy(chunk1, 0, data, 0, chunk1.length);
+ System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
+
+ CountDownLatch serverLatch = new CountDownLatch(1);
+ AtomicReference handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@@ -492,18 +502,22 @@ public void testExpect100ContinueWithDeferredContentRespond100Continue(Transport
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
+ handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
- IO.copy(request.getInputStream(), response.getOutputStream());
+
+ ServletOutputStream outputStream = response.getOutputStream();
+ DataInputStream inputStream = new DataInputStream(request.getInputStream());
+ // Block until the 1st chunk is fully received.
+ byte[] buf1 = new byte[chunk1.length];
+ inputStream.readFully(buf1);
+ outputStream.write(buf1);
+
+ serverLatch.countDown();
+ IO.copy(inputStream, outputStream);
}
});
- final byte[] chunk1 = new byte[]{0, 1, 2, 3};
- final byte[] chunk2 = new byte[]{4, 5, 6, 7};
- final byte[] data = new byte[chunk1.length + chunk2.length];
- System.arraycopy(chunk1, 0, data, 0, chunk1.length);
- System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
-
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch requestLatch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
@@ -514,20 +528,31 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
public void onComplete(Result result)
{
assertArrayEquals(data, getContent());
- latch.countDown();
+ requestLatch.countDown();
}
});
- Thread.sleep(1000);
+ // Wait for the handler thread to be blocked in the 1st IO.
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread thread = handlerThread.get();
+ return thread != null && thread.getState() == Thread.State.WAITING;
+ });
content.offer(ByteBuffer.wrap(chunk1));
- Thread.sleep(1000);
+ // Wait for the handler thread to be blocked in the 2nd IO.
+ assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread thread = handlerThread.get();
+ return thread != null && thread.getState() == Thread.State.WAITING;
+ });
content.offer(ByteBuffer.wrap(chunk2));
content.close();
- assertTrue(latch.await(5, TimeUnit.SECONDS));
+ assertTrue(requestLatch.await(5, TimeUnit.SECONDS));
}
@ParameterizedTest
@@ -581,6 +606,7 @@ public void onComplete(Result result)
@ArgumentsSource(TransportProvider.class)
public void testExpect100ContinueWithConcurrentDeferredContentRespond100Continue(Transport transport) throws Exception
{
+ AtomicReference handlerThread = new AtomicReference<>();
init(transport);
scenario.start(new AbstractHandler()
{
@@ -588,22 +614,22 @@ public void testExpect100ContinueWithConcurrentDeferredContentRespond100Continue
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException
{
baseRequest.setHandled(true);
+ handlerThread.set(Thread.currentThread());
// Send 100-Continue and echo the content
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
- final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7};
- final DeferredContentProvider content = new DeferredContentProvider();
+ byte[] chunk1 = new byte[]{0, 1, 2, 3};
+ byte[] chunk2 = new byte[]{4, 5, 6, 7};
+ byte[] data = new byte[chunk1.length + chunk2.length];
+ System.arraycopy(chunk1, 0, data, 0, chunk1.length);
+ System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);
- final CountDownLatch latch = new CountDownLatch(1);
+ CountDownLatch latch = new CountDownLatch(1);
+ DeferredContentProvider content = new DeferredContentProvider(ByteBuffer.wrap(chunk1));
scenario.client.newRequest(scenario.newURI())
.header(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString())
- .onRequestHeaders(request ->
- {
- content.offer(ByteBuffer.wrap(data));
- content.close();
- })
.content(content)
.send(new BufferingResponseListener()
{
@@ -615,6 +641,16 @@ public void onComplete(Result result)
}
});
+ // Wait for the handler thread to be blocked in IO.
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread thread = handlerThread.get();
+ return thread != null && thread.getState() == Thread.State.WAITING;
+ });
+
+ content.offer(ByteBuffer.wrap(chunk2));
+ content.close();
+
assertTrue(latch.await(5, TimeUnit.SECONDS));
}