Skip to content

Commit

Permalink
Backport from 10.0.x of the changes using Awaitility.
Browse files Browse the repository at this point in the history
Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Aug 25, 2021
1 parent 1b79fce commit 05c08e1
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 125 deletions.
4 changes: 4 additions & 0 deletions jetty-client/pom.xml
Expand Up @@ -135,5 +135,9 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>
</project>
Expand Up @@ -44,6 +44,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -430,8 +431,6 @@ public void onComplete(Result result)

@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
@Tag("Slow")
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Exception
{
start(scenario, new EmptyServerHandler());
Expand All @@ -457,10 +456,7 @@ public void testIdleConnectionIsClosedOnRemoteClose(Scenario scenario) throws Ex
connector.stop();

// Give the connection some time to process the remote close
TimeUnit.SECONDS.sleep(1);

assertEquals(0, idleConnections.size());
assertEquals(0, activeConnections.size());
await().atMost(5, TimeUnit.SECONDS).until(() -> idleConnections.size() == 0 && activeConnections.size() == 0);
}

@ParameterizedTest
Expand Down
Expand Up @@ -38,8 +38,8 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledIfSystemProperty;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -94,7 +94,6 @@ public void onSuccess(Request request)
}

@Test
@DisabledIfSystemProperty(named = "env", matches = "ci") // TODO: SLOW, needs review
public void testSendNoRequestContentIncompleteFlush() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
Expand All @@ -108,7 +107,7 @@ public void testSendNoRequestContentIncompleteFlush() throws Exception
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());

// Wait for the write to complete
TimeUnit.SECONDS.sleep(1);
await().atMost(5, TimeUnit.SECONDS).until(() -> endPoint.toEndPointString().contains(",flush=P,"));

String chunk = endPoint.takeOutputString();
while (chunk.length() > 0)
Expand Down
4 changes: 4 additions & 0 deletions jetty-http2/http2-http-client-transport/pom.xml
Expand Up @@ -118,6 +118,10 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>

</project>
Expand Up @@ -21,7 +21,6 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -56,10 +55,10 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class BlockedWritesWithSmallThreadPoolTest
{
Expand Down Expand Up @@ -143,16 +142,23 @@ protected void service(String target, Request jettyRequest, HttpServletRequest r
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
awaitUntil(0, () -> clientBlockLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
if (frame.isEndStream())
clientDataLatch.countDown();
try
{
// Block here to stop reading from the network
// to cause the server to TCP congest.
clientBlockLatch.await(5, TimeUnit.SECONDS);
callback.succeeded();
if (frame.isEndStream())
clientDataLatch.countDown();
}
catch (InterruptedException x)
{
callback.failed(x);
}
}
});

awaitUntil(5000, () ->
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AbstractEndPoint serverEndPoint = serverEndPointRef.get();
return serverEndPoint != null && serverEndPoint.getWriteFlusher().isPending();
Expand All @@ -164,11 +170,11 @@ public void onData(Stream stream, DataFrame frame, Callback callback)
if (serverThreads.getAvailableReservedThreads() != 1)
{
assertFalse(serverThreads.tryExecute(() -> {}));
awaitUntil(5000, () -> serverThreads.getAvailableReservedThreads() == 1);
await().atMost(5, TimeUnit.SECONDS).until(() -> serverThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
CountDownLatch serverBlockLatch = new CountDownLatch(1);
assertTrue(serverThreads.tryExecute(() -> awaitUntil(0, () -> serverBlockLatch.await(15, TimeUnit.SECONDS))));
assertTrue(serverThreads.tryExecute(() -> await().atMost(20, TimeUnit.SECONDS).until(() -> serverBlockLatch.await(15, TimeUnit.SECONDS), b -> true)));

assertEquals(0, serverThreads.getReadyThreads());

Expand All @@ -194,14 +200,21 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
@Override
public void onData(Stream stream, DataFrame frame, Callback callback)
{
// Block here to stop reading from the network
// to cause the client to TCP congest.
awaitUntil(0, () -> serverBlockLatch.await(5, TimeUnit.SECONDS));
callback.succeeded();
if (frame.isEndStream())
try
{
// Block here to stop reading from the network
// to cause the client to TCP congest.
serverBlockLatch.await(5, TimeUnit.SECONDS);
callback.succeeded();
if (frame.isEndStream())
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
}
}
catch (InterruptedException x)
{
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
callback.failed(x);
}
}
};
Expand Down Expand Up @@ -241,7 +254,7 @@ public void onHeaders(Stream stream, HeadersFrame frame)
Stream stream = streamPromise.get(5, TimeUnit.SECONDS);
stream.data(new DataFrame(stream.getId(), ByteBuffer.allocate(contentLength), true), Callback.NOOP);

awaitUntil(5000, () ->
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
AbstractEndPoint clientEndPoint = (AbstractEndPoint)((HTTP2Session)session).getEndPoint();
return clientEndPoint.getWriteFlusher().isPending();
Expand All @@ -251,53 +264,22 @@ public void onHeaders(Stream stream, HeadersFrame frame)

CountDownLatch clientBlockLatch = new CountDownLatch(1);
// Make sure the application thread is blocked.
clientThreads.execute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS)));
clientThreads.execute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true));
// Make sure the reserved thread is blocked.
if (clientThreads.getAvailableReservedThreads() != 1)
{
assertFalse(clientThreads.tryExecute(() -> {}));
awaitUntil(5000, () -> clientThreads.getAvailableReservedThreads() == 1);
await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getAvailableReservedThreads() == 1);
}
// Use the reserved thread for a blocking operation, simulating another blocking write.
assertTrue(clientThreads.tryExecute(() -> awaitUntil(0, () -> clientBlockLatch.await(15, TimeUnit.SECONDS))));
assertTrue(clientThreads.tryExecute(() -> await().until(() -> clientBlockLatch.await(15, TimeUnit.SECONDS), b -> true)));

awaitUntil(5000, () -> clientThreads.getReadyThreads() == 0);
await().atMost(5, TimeUnit.SECONDS).until(() -> clientThreads.getReadyThreads() == 0);

// Unblock the server to read from the network, which should unblock the client.
serverBlockLatch.countDown();

assertTrue(latch.await(10, TimeUnit.SECONDS), client.dump());
clientBlockLatch.countDown();
}

private void awaitUntil(long millis, Callable<Boolean> test)
{
try
{
if (millis == 0)
{
if (test.call())
return;
}
else
{
long begin = System.nanoTime();
while (System.nanoTime() - begin < TimeUnit.MILLISECONDS.toNanos(millis))
{
if (test.call())
return;
Thread.sleep(10);
}
}
fail("Await elapsed: " + millis + "ms");
}
catch (RuntimeException | Error x)
{
throw x;
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
}
4 changes: 4 additions & 0 deletions jetty-util/pom.xml
Expand Up @@ -96,5 +96,9 @@
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>
</project>
Expand Up @@ -18,13 +18,10 @@

package org.eclipse.jetty.util;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
Expand All @@ -35,7 +32,7 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Test;

import static org.eclipse.jetty.util.BlockingArrayQueueTest.Await.await;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -533,35 +530,4 @@ public void testDrainTo() throws Exception
assertThat(queue.size(), Matchers.is(0));
assertThat(queue, Matchers.empty());
}

static class Await
{
private Duration duration;

public static Await await()
{
return new Await();
}

public Await atMost(long time, TimeUnit unit)
{
duration = Duration.ofMillis(unit.toMillis(time));
return this;
}

public void until(Callable<Boolean> condition) throws Exception
{
Objects.requireNonNull(duration);
long start = System.nanoTime();

while (true)
{
if (condition.call())
return;
if (duration.minus(Duration.ofNanos(System.nanoTime() - start)).isNegative())
throw new AssertionError("Duration expired");
Thread.sleep(10);
}
}
}
}
6 changes: 6 additions & 0 deletions pom.xml
Expand Up @@ -1045,6 +1045,12 @@
<artifactId>junit-jupiter</artifactId>
<version>${junit.version}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.1.0</version>
<scope>test</scope>
</dependency>
<!-- Test Container Deps -->
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
4 changes: 4 additions & 0 deletions tests/test-http-client-transport/pom.xml
Expand Up @@ -163,6 +163,10 @@
<artifactId>jetty-test-helper</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>

</project>
Expand Up @@ -75,12 +75,11 @@
import org.eclipse.jetty.util.log.StacklessLogging;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;

import static java.nio.ByteBuffer.wrap;
import static org.awaitility.Awaitility.await;
import static org.eclipse.jetty.http.client.Transport.FCGI;
import static org.eclipse.jetty.http.client.Transport.H2C;
import static org.eclipse.jetty.http.client.Transport.HTTP;
Expand Down Expand Up @@ -398,8 +397,6 @@ public void onError(Throwable t)

@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
@Tag("Unstable")
@Disabled
public void testAsyncWriteClosed(Transport transport) throws Exception
{
init(transport);
Expand Down Expand Up @@ -431,7 +428,19 @@ public void onWritePossible() throws IOException

// Wait for the failure to arrive to
// the server while we are about to write.
sleep(2000);
try
{
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
out.write(new byte[0]);
// Extract HttpOutput._apiState value from toString.
return !out.toString().split(",")[1].split("=")[1].equals("READY");
});
}
catch (Exception e)
{
throw new AssertionError(e);
}

out.write(data);
}
Expand Down

0 comments on commit 05c08e1

Please sign in to comment.