Skip to content

Commit

Permalink
speculative changes for integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Dec 5, 2022
1 parent 9b248dd commit 5535de4
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ public OkHttpClientImpl(OkHttpClient client, OkHttpClientBuilderImpl builder) {

@Override
public void close() {
LOG.debug("Shutting down dispatcher " + this.httpClient.dispatcher(), new Exception());
LOG.info("Shutting down dispatcher " + this.httpClient.dispatcher() + " "
+ new Exception().fillInStackTrace().getStackTrace()[2]);
ConnectionPool connectionPool = httpClient.connectionPool();
Dispatcher dispatcher = httpClient.dispatcher();
ExecutorService executorService = httpClient.dispatcher() != null ? httpClient.dispatcher().executorService() : null;
Expand All @@ -260,6 +261,7 @@ public void close() {

private CompletableFuture<HttpResponse<AsyncBody>> sendAsync(HttpRequest request,
Function<BufferedSource, AsyncBody> handler) {
Exception exception = new Exception();
CompletableFuture<HttpResponse<AsyncBody>> future = new CompletableFuture<>();
Call call = httpClient.newCall(requestBuilder((StandardHttpRequest) request).build());
try {
Expand All @@ -278,13 +280,13 @@ public void onResponse(Call call, Response response) throws IOException {
public void onFailure(Call call, IOException e) {
Throwable t = e;
if (e instanceof InterruptedIOException && e.getCause() instanceof RejectedExecutionException) {
t = wrapRejected((RejectedExecutionException) e.getCause());
t = wrapRejected((RejectedExecutionException) e.getCause(), exception);
}
future.completeExceptionally(t);
}
});
} catch (RejectedExecutionException e) {
throw wrapRejected(e);
throw wrapRejected(e, exception);
}
future.whenComplete((r, t) -> {
if (future.isCancelled()) {
Expand All @@ -294,7 +296,8 @@ public void onFailure(Call call, IOException e) {
return future;
}

private KubernetesClientException wrapRejected(RejectedExecutionException e) {
private KubernetesClientException wrapRejected(RejectedExecutionException e, Exception callStack) {
LOG.info("Attempting to run against an already shutdown dispatcher " + this.httpClient.dispatcher(), callStack);
return new KubernetesClientException("The okhttp client executor has been shutdown. "
+ "More than likely this is because the KubernetesClient.close method (see debug logging) has been called "
+ "- please ensure that is intentional. Dispatcher: " + this.httpClient.dispatcher(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,14 @@ public void onOpen(okhttp3.WebSocket webSocket, Response response) {
@Override
public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) {
awaitMoreRequest();
System.out.println(this + " " + bytes);
listener.onMessage(new OkHttpWebSocketImpl(webSocket, this::request), bytes.asByteBuffer());
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, String text) {
awaitMoreRequest();
System.out.println(this + " " + text);
listener.onMessage(new OkHttpWebSocketImpl(webSocket, this::request), text);
}

Expand All @@ -123,6 +125,9 @@ public void onMessage(okhttp3.WebSocket webSocket, String text) {
private void awaitMoreRequest() {
lock.lock();
try {
if (!more) {
System.out.println(this + " Waiting for more to be requested");
}
while (!more) {
// arbitrary timeout to make it clearer that messages are not being processed
// - likely due to streams returned off of websocket not being read
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.fabric8.kubernetes.client.Client;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.utils.Serialization;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.BeforeEachCallback;
Expand All @@ -35,24 +37,28 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class KubernetesNamespacedTestExtension implements BeforeAllCallback, BeforeEachCallback, AfterAllCallback {

private static final ExtensionContext.Namespace EXT_NAMESPACE = ExtensionContext.Namespace
.create(KubernetesNamespacedTestExtension.class);

@Override
public void beforeAll(ExtensionContext context) throws Exception {
final KubernetesClient client = new KubernetesClientBuilder().build();
getStore(context).put(Namespace.class, initNamespace(client));
getStore(context).put(KubernetesClient.class,
client.adapt(NamespacedKubernetesClient.class).inNamespace(getNamespace(context).getMetadata().getName()));
for (Field field : extractFields(context, KubernetesClient.class, f -> Modifier.isStatic(f.getModifiers()))) {
setFieldValue(field, null, getClient(context).adapt((Class<Client>) field.getType()));
}
for (Field field : extractFields(context, Namespace.class, f -> Modifier.isStatic(f.getModifiers()))) {
setFieldValue(field, null, getNamespace(context));
try {
final KubernetesClient client = new KubernetesClientBuilder().build();
getStore(context).put(Namespace.class, initNamespace(client));
getStore(context).put(KubernetesClient.class,
client.adapt(NamespacedKubernetesClient.class).inNamespace(getNamespace(context).getMetadata().getName()));
for (Field field : extractFields(context, KubernetesClient.class, f -> Modifier.isStatic(f.getModifiers()))) {
setFieldValue(field, null, getClient(context).adapt((Class<Client>) field.getType()));
}
for (Field field : extractFields(context, Namespace.class, f -> Modifier.isStatic(f.getModifiers()))) {
setFieldValue(field, null, getNamespace(context));
}
} catch (Exception e) {
System.out.println("Exception in beforeAll " + context.getDisplayName());
e.printStackTrace();
throw e;
}
}

Expand Down Expand Up @@ -82,7 +88,9 @@ static KubernetesClient getClient(ExtensionContext context) {
}

private static ExtensionContext.Store getStore(ExtensionContext context) {
return context.getRoot().getStore(EXT_NAMESPACE);
ExtensionContext.Namespace namespace = ExtensionContext.Namespace.create(KubernetesNamespacedTestExtension.class,
context.getRequiredTestClass());
return context.getRoot().getStore(namespace);
}

/**
Expand All @@ -106,16 +114,36 @@ private static Namespace initNamespace(KubernetesClient client) {
final int major = Integer.parseInt(client.getKubernetesVersion().getMajor().replaceAll("\\D+", ""));
final int minor = Integer.parseInt(client.getKubernetesVersion().getMinor().replaceAll("\\D+", ""));
if (major < 1 || (major == 1 && minor < 24)) {
final List<ObjectReference> secrets = client.serviceAccounts()
.inNamespace(namespace.getMetadata().getName())
.withName("default")
.waitUntilCondition(sa -> sa != null && sa.getSecrets() != null
&& sa.getSecrets().stream().anyMatch(s -> s.getName().matches("default-token.+")),
5, TimeUnit.SECONDS)
.getSecrets();
for (ObjectReference secret : secrets) {
client.secrets().inNamespace(namespace.getMetadata().getName()).withName(secret.getName())
.waitUntilCondition(Objects::nonNull, 5, TimeUnit.SECONDS);
List<ObjectReference> secrets = null;
try {
secrets = client.serviceAccounts()
.inNamespace(namespace.getMetadata().getName())
.withName("default")
.waitUntilCondition(sa -> {
System.out.println("Checking condition " + Serialization.asYaml(sa));
return sa != null && sa.getSecrets() != null
&& sa.getSecrets().stream().anyMatch(s -> s.getName().matches("default-token.+"));
},
5, TimeUnit.SECONDS)
.getSecrets();
} catch (KubernetesClientTimeoutException e) {
System.out.println(Serialization.asYaml(client.serviceAccounts()
.inNamespace(namespace.getMetadata().getName())
.withName("default").get()));
throw e;
}
if (secrets != null) {
try {
for (ObjectReference secret : secrets) {
client.secrets().inNamespace(namespace.getMetadata().getName()).withName(secret.getName())
.waitUntilCondition(Objects::nonNull, 5, TimeUnit.SECONDS);
}
} catch (KubernetesClientTimeoutException e) {
String secretNames = client.secrets().inNamespace(namespace.getMetadata().getName()).list().getItems().stream()
.map(s -> s.getMetadata().getName()).collect(Collectors.joining(", "));
System.out.println(secretNames);
throw e;
}
}
}
return namespace;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public void beforeAll(ExtensionContext context) {
final LoadKubernetesManifests annotation = context.getRequiredTestClass()
.getAnnotation(LoadKubernetesManifests.class);
final KubernetesClient kc = getClient(context);
System.out.println(context.getDisplayName() + " load beforeall " + kc);
for (String resource : annotation.value()) {
kc.load(context.getRequiredTestClass().getResourceAsStream(resource)).create();
}
Expand All @@ -41,6 +42,7 @@ public void afterAll(ExtensionContext context) {
.getAnnotation(LoadKubernetesManifests.class);
if (annotation.deleteAfterTest()) {
final KubernetesClient kc = getClient(context);
System.out.println(context.getDisplayName() + " load afterall " + kc);
for (String resource : annotation.value()) {
kc.load(context.getRequiredTestClass().getResourceAsStream(resource))
.withGracePeriod(annotation.deleteGracePeriod()).delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ protected void onMessage(String message) {
// for consistency we only want to process the message when we're open
if (this.websocketFuture != null) {
super.onMessage(message);
System.out.println("procesed " + message);
} else {
System.out.println("Not procesing " + message);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.fabric8.kubernetes.api.model.apps.DeploymentSpecBuilder;
import io.fabric8.kubernetes.api.model.apps.ReplicaSet;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.dsl.Resource;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -141,7 +142,7 @@ void testDeleteExistingWithOrphanDeletion() throws Exception {

// Recreate deployment
resource.withGracePeriod(0L).delete();
resource.waitUntilCondition(Objects::isNull, 30, TimeUnit.SECONDS);
checkDeleted(resource);
client.apps().replicaSets().withLabel("run", deploymentName).informOnCondition(Collection::isEmpty)
.get(60, TimeUnit.SECONDS);
resource.create();
Expand Down Expand Up @@ -175,7 +176,7 @@ void testDeleteExistingWithoutOrphanDeletion() throws Exception {
.withPropagationPolicy(DeletionPropagation.ORPHAN)
.withGracePeriod(0L)
.delete();
resource.waitUntilCondition(Objects::isNull, 30, TimeUnit.SECONDS);
checkDeleted(resource);
resource.create();

// check that uid matches original, meaning the orphan was not deleted
Expand All @@ -191,6 +192,27 @@ void testDeleteExistingWithoutOrphanDeletion() throws Exception {
.get(60, TimeUnit.SECONDS);
}

private void checkDeleted(Resource<Deployment> resource) throws AssertionError {
int retries = 10;
for (int i = 0; i < retries; i++) {
try {
resource.waitUntilCondition(Objects::isNull, 6, TimeUnit.SECONDS);
} catch (KubernetesClientTimeoutException e) {
if (i == retries - 1) {
throw e;
}
Deployment deleted = resource.get();
if (deleted == null) {
throw new AssertionError("There was likely a missed deletion event");
}
if (deleted.getMetadata().getDeletionTimestamp() == null) {
throw new AssertionError("Deployment is not marked for deletion");
}
System.out.println("Deployment marked for deletion, but the deletion has not completed");
}
}
}

@Test
void testDeletionWithOrphanDeletion() throws Exception {
final String deploymentName = "delete-with-orphan-deletion";
Expand Down

0 comments on commit 5535de4

Please sign in to comment.