diff --git a/CHANGELOG.md b/CHANGELOG.md index 587e1c01562..4d14935fa60 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ * Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture. * Fix #4396: Provide more error context when @Group/@Version annotations are missing * Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348 +* Fix #4408: Allowing informers started via the start() method to retry indefinitely rather than making only a single attempt. * Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE #### Dependency Upgrade diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 264b5222dce..826cba4771d 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -159,14 +159,34 @@ default boolean hasSynced() { SharedIndexInformer initialState(Stream items); SharedIndexInformer itemStore(ItemStore itemStore); - + /** * A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called. *
- * Only one start attempt is made - subsequent calls will not re-start the informer. + * Only one start attempt is made - subsequent calls will not re-start the informer. + * The future will be completed normally once the Informer starts watching successfully for the first time. + * If the informer does not start successfully the first time, the future will complete exceptionally. + *

+ * Canceling this future has no effect on the informer. + *

+ * If the informer has already been started, then a completed future will be returned. + *

+ * see also {@link #forceStart()} */ CompletableFuture start(); + /** + * A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called. + * For situations where it's not known if list or watch operations will complete normally on their first attempt - such as the + * server being unavailable, permissions not yet available, or CRD not yet installed. + *

+ * The future will be completed normally once the Informer starts watching successfully for the first time. + * If the informer stops manually or exceptionally before starting, the future will be completed with an exception. + *

+ * Canceling this future has no effect on the informer. + */ + CompletableFuture forceStart(); + /** * Return a future that will allow notification of informer stopping. *

@@ -174,6 +194,8 @@ default boolean hasSynced() { *

* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception * - typically a {@link WatcherException} + *

+ * Canceling this future has no effect on the informer. */ CompletableFuture stopped(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java index 28ec1394a69..97d44e11132 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperation.java @@ -868,7 +868,7 @@ public CompletableFuture> informOnCondition(Predicate> condition CompletableFuture> future = new CompletableFuture<>(); // create an informer that supplies the tester with events and empty list handling - SharedIndexInformer informer = this.createInformer(0, Runnable::run); + DefaultSharedIndexInformer informer = this.createInformer(0, Runnable::run); // prevent unnecessary watches and handle closure future.whenComplete((r, t) -> informer.stop()); @@ -905,7 +905,7 @@ public void onUpdate(T oldObj, T newObj) { public void onNothing() { test.accept(informer.getStore().list()); } - }).start().whenComplete((v, t) -> { + }).start(false).whenComplete((v, t) -> { if (t != null) { future.completeExceptionally(t); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index c1620330165..731f2d1b005 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -96,7 +96,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis * @param handler event handler */ @Override - public SharedIndexInformer addEventHandler(ResourceEventHandler handler) { + public DefaultSharedIndexInformer addEventHandler(ResourceEventHandler handler) { addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod); return this; } @@ -144,6 +144,21 @@ public String lastSyncResourceVersion() { @Override public CompletableFuture start() { + return start(false); + } + + @Override + public CompletableFuture forceStart() { + return start(true); + } + + /** + * Start the informer + * + * @param reconnect if true, keep trying to start. if false, try only once + * @return the started future from the reflector + */ + public CompletableFuture start(boolean reconnect) { if (stopped) { throw new IllegalStateException("Cannot restart a stopped informer"); } @@ -161,19 +176,12 @@ public CompletableFuture start() { scheduleResync(processor::shouldResync); - return reflector.start().whenComplete((v, t) -> { - // stop called while run is called could be ineffective, check for it afterwards - synchronized (this) { - if (stopped && reflector.isRunning()) { - stop(); - } - } - }); + return reflector.start(reconnect); } @Override public SharedIndexInformer run() { - Utils.waitUntilReadyOrFail(start(), -1, TimeUnit.MILLISECONDS); + Utils.waitUntilReadyOrFail(start(false), -1, TimeUnit.MILLISECONDS); return this; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index e7a96264a43..206f7821bcc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.informers.impl.cache; +import com.fasterxml.jackson.core.JsonProcessingException; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptionsBuilder; @@ -48,6 +49,7 @@ public class Reflector watchFuture; private volatile CompletableFuture reconnectFuture; + private final CompletableFuture startFuture = new CompletableFuture<>(); private final CompletableFuture stopFuture = new CompletableFuture<>(); private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; @@ -65,20 +67,14 @@ public CompletableFuture start() { public CompletableFuture start(boolean reconnect) { this.running = true; - CompletableFuture result = listSyncAndWatch(reconnect); - if (!reconnect) { - result.whenComplete((v, t) -> { - if (t != null) { - stopFuture.completeExceptionally(t); - } - }); - } - return result; + listSyncAndWatch(reconnect); + return startFuture; } public void stop() { running = false; stopFuture.complete(null); + startFuture.completeExceptionally(new KubernetesClientException("informer manually stopped before starting")); Future future = reconnectFuture; if (future != null) { future.cancel(true); @@ -129,16 +125,23 @@ public CompletableFuture listSyncAndWatch(boolean reconnect) { } } }); - if (reconnect) { - theFuture.whenComplete((v, t) -> { - if (t != null) { + theFuture.whenComplete((v, t) -> { + if (t != null) { + if (reconnect) { log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); reconnect(); } else { + running = false; + startFuture.completeExceptionally(t); + stopFuture.completeExceptionally(t); + } + } else { + startFuture.complete(null); + if (reconnect) { retryIntervalCalculator.resetReconnectAttempts(); } - }); - } + } + }); return theFuture; } @@ -157,7 +160,20 @@ private CompletableFuture processList(Set nextKeys, String continueVa .submitList( new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) .build()); - + futureResult.whenComplete((l, t) -> { + while (t instanceof RuntimeException && !(t instanceof ClassCastException)) { + Throwable cause = ((RuntimeException) t).getCause(); + if (cause == t) { + break; + } + t = cause; + } + if (t instanceof ClassCastException || t instanceof JsonProcessingException) { + // we'll interpret this as a hard failure, just like in the watch case + this.stopFuture.completeExceptionally(t); + this.stop(); + } + }); return futureResult.thenCompose(result -> { result.getItems().forEach(i -> { String key = store.getKey(i); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index b602219e8c0..a87c0c6a2be 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -17,11 +17,15 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.client.Watch; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -45,6 +49,15 @@ private DefaultSharedIndexInformer createDefaultSharedIndexInforme return defaultSharedIndexInformer; } + @BeforeEach + void beforeEach() { + Mockito.when(listerWatcher.submitWatch(Mockito.any(), Mockito.any())) + .thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class))); + PodList result = new PodListBuilder().withNewMetadata().endMetadata().build(); + Mockito.when(listerWatcher.submitList(Mockito.any())) + .thenReturn(CompletableFuture.completedFuture(result)); + } + @AfterEach void afterEach() { if (defaultSharedIndexInformer != null) { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index cd856a906b6..921aa93916d 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -58,7 +58,8 @@ protected void reconnect() { CompletableFuture future = reflector.start(true); - assertThrows(CompletionException.class, future::join); + // since we're reconnecting, but have overridden reconnect, this won't be done + assertFalse(future.isDone()); // running but watch failed assertFalse(reflector.isWatching()); @@ -69,6 +70,8 @@ protected void reconnect() { assertTrue(reflector.isWatching()); assertTrue(reflector.isRunning()); assertFalse(reflector.getStopFuture().isDone()); + assertTrue(future.isDone()); + assertTrue(!future.isCompletedExceptionally()); reflector.stop(); @@ -77,6 +80,26 @@ protected void reconnect() { assertTrue(reflector.getStopFuture().isDone()); } + @Test + void testNotRunningAfterStartError() { + ListerWatcher mock = Mockito.mock(ListerWatcher.class); + PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); + Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); + + Reflector reflector = new Reflector(mock, Mockito.mock(SyncableStore.class)); + + // throw an exception, then watch normally + Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any())) + .thenThrow(new KubernetesClientException("error")); + + // single start + CompletableFuture future = reflector.start(false); + + assertThrows(CompletionException.class, future::join); + + assertFalse(reflector.isRunning()); + } + @Test void testNonHttpGone() { ListerWatcher mock = Mockito.mock(ListerWatcher.class); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index f786c6204d1..1882041fdbf 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -69,7 +69,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function;