Skip to content

Commit

Permalink
fix #4365: adding a test for the termination exception
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Sep 20, 2022
1 parent 5eefe53 commit 80c1085
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 4 deletions.
Expand Up @@ -60,8 +60,20 @@ public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
}

public CompletableFuture<Void> start() {
return start(false); // start without reconnecting
}

public CompletableFuture<Void> start(boolean reconnect) {
this.running = true;
return listSyncAndWatch(false);
CompletableFuture<Void> result = listSyncAndWatch(reconnect);
if (!reconnect) {
result.whenComplete((v, t) -> {
if (t != null) {
stopFuture.completeExceptionally(t);
}
});
}
return result;
}

public void stop() {
Expand Down Expand Up @@ -130,7 +142,7 @@ public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
return theFuture;
}

private void reconnect() {
protected void reconnect() {
if (!running) {
return;
}
Expand Down
Expand Up @@ -41,7 +41,12 @@ void testStateFlags() {
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class));
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, Mockito.mock(SyncableStore.class)) {
@Override
protected void reconnect() {
// do nothing
}
};

assertFalse(reflector.isWatching());
assertFalse(reflector.isRunning());
Expand All @@ -51,7 +56,7 @@ void testStateFlags() {
.thenThrow(new KubernetesClientException("error"))
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class)));

CompletableFuture<Void> future = reflector.start();
CompletableFuture<Void> future = reflector.start(true);

assertThrows(CompletionException.class, future::join);

Expand Down
Expand Up @@ -26,6 +26,7 @@
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.StatusBuilder;
import io.fabric8.kubernetes.api.model.WatchEvent;
Expand All @@ -36,7 +37,9 @@
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder;
import io.fabric8.kubernetes.client.CustomResourceList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
Expand Down Expand Up @@ -65,6 +68,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down Expand Up @@ -897,6 +901,51 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) {
assertFalse(podInformer.isRunning());
}

@Test
void testTerminalException() throws InterruptedException, TimeoutException {
// should be an initial 404
SharedIndexInformer<Pod> informer = client.pods().runnableInformer(0);
try {
informer.run();
} catch (Exception e) {
}
try {
informer.stopped().get(10, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof KubernetesClientException);
}

String startResourceVersion = "1000";

// initial list
server.expect().withPath("/api/v1/pods")
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata()
.withItems(Collections.emptyList()).build())
.once();

// initial watch - terminates with an exception
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(WATCH_EVENT_EMIT_TIME)
.andEmit(new WatchEvent(new Service(), "ADDED")) // not a pod
.waitFor(OUTDATED_WATCH_EVENT_EMIT_TIME)
.andEmit(outdatedEvent)
.done().always();

// When
informer = client.pods().inAnyNamespace().runnableInformer(0);
try {
informer.run();
} catch (Exception e) {
}
try {
informer.stopped().get(10, TimeUnit.SECONDS);
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof WatcherException);
}
}

@Test
void testRunAfterStop() {
// Given
Expand Down

0 comments on commit 80c1085

Please sign in to comment.