From be5dba7432f8e646aa55bcdaa524462876955c74 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 14 Sep 2022 09:06:51 -0400 Subject: [PATCH] fix #4365: adding a test for the termination exception --- .../informers/impl/cache/Reflector.java | 10 +++- .../mock/DefaultSharedIndexInformerTest.java | 49 +++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 627051a48fb..c4c11efd3b8 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -61,7 +61,15 @@ public Reflector(ListerWatcher listerWatcher, SyncableStore store) { public CompletableFuture start() { this.running = true; - return listSyncAndWatch(false); + CompletableFuture result = listSyncAndWatch(false); + result.whenComplete((v, t) -> { + if (t != null) { + stopFuture.completeExceptionally(t); + } else { + stopFuture.complete(null); + } + }); + return result; } public void stop() { diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index ea60bdce641..b5d4213eafb 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -26,6 +26,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.StatusBuilder; import io.fabric8.kubernetes.api.model.WatchEvent; @@ -36,7 +37,9 @@ import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder; import io.fabric8.kubernetes.client.CustomResourceList; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; @@ -65,6 +68,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; @@ -897,6 +901,51 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { assertFalse(podInformer.isRunning()); } + @Test + void testTerminalException() throws InterruptedException, TimeoutException { + // should be an initial 404 + SharedIndexInformer informer = client.pods().runnableInformer(0); + try { + informer.run(); + } catch (Exception e) { + } + try { + informer.stopped().get(10, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof KubernetesClientException); + } + + String startResourceVersion = "1000"; + + // initial list + server.expect().withPath("/api/v1/pods") + .andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata() + .withItems(Collections.emptyList()).build()) + .once(); + + // initial watch - terminates with an exception + server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(WATCH_EVENT_EMIT_TIME) + .andEmit(new WatchEvent(new Service(), "ADDED")) // not a pod + .waitFor(OUTDATED_WATCH_EVENT_EMIT_TIME) + .andEmit(outdatedEvent) + .done().always(); + + // When + informer = client.pods().inAnyNamespace().runnableInformer(0); + try { + informer.run(); + } catch (Exception e) { + } + try { + informer.stopped().get(10, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof WatcherException); + } + } + @Test void testRunAfterStop() { // Given