diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 627051a48fb..e7a96264a43 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -60,8 +60,20 @@ public Reflector(ListerWatcher listerWatcher, SyncableStore store) { } public CompletableFuture start() { + return start(false); // start without reconnecting + } + + public CompletableFuture start(boolean reconnect) { this.running = true; - return listSyncAndWatch(false); + CompletableFuture result = listSyncAndWatch(reconnect); + if (!reconnect) { + result.whenComplete((v, t) -> { + if (t != null) { + stopFuture.completeExceptionally(t); + } + }); + } + return result; } public void stop() { @@ -130,7 +142,7 @@ public CompletableFuture listSyncAndWatch(boolean reconnect) { return theFuture; } - private void reconnect() { + protected void reconnect() { if (!running) { return; } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index a8a3d2a82f9..cd856a906b6 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -41,7 +41,12 @@ void testStateFlags() { PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); - Reflector reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class)); + Reflector reflector = new Reflector(mock, Mockito.mock(SyncableStore.class)) { + @Override + protected void reconnect() { + // do nothing + } + }; assertFalse(reflector.isWatching()); assertFalse(reflector.isRunning()); @@ -51,7 +56,7 @@ void testStateFlags() { .thenThrow(new KubernetesClientException("error")) .thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class))); - CompletableFuture future = reflector.start(); + CompletableFuture future = reflector.start(true); assertThrows(CompletionException.class, future::join); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index ea60bdce641..b5d4213eafb 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -26,6 +26,7 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.api.model.Service; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.StatusBuilder; import io.fabric8.kubernetes.api.model.WatchEvent; @@ -36,7 +37,9 @@ import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder; import io.fabric8.kubernetes.client.CustomResourceList; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; @@ -65,6 +68,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; @@ -897,6 +901,51 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { assertFalse(podInformer.isRunning()); } + @Test + void testTerminalException() throws InterruptedException, TimeoutException { + // should be an initial 404 + SharedIndexInformer informer = client.pods().runnableInformer(0); + try { + informer.run(); + } catch (Exception e) { + } + try { + informer.stopped().get(10, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof KubernetesClientException); + } + + String startResourceVersion = "1000"; + + // initial list + server.expect().withPath("/api/v1/pods") + .andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata() + .withItems(Collections.emptyList()).build()) + .once(); + + // initial watch - terminates with an exception + server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() + .waitFor(WATCH_EVENT_EMIT_TIME) + .andEmit(new WatchEvent(new Service(), "ADDED")) // not a pod + .waitFor(OUTDATED_WATCH_EVENT_EMIT_TIME) + .andEmit(outdatedEvent) + .done().always(); + + // When + informer = client.pods().inAnyNamespace().runnableInformer(0); + try { + informer.run(); + } catch (Exception e) { + } + try { + informer.stopped().get(10, TimeUnit.SECONDS); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof WatcherException); + } + } + @Test void testRunAfterStop() { // Given