From 313487cfdc15d9537c6bc7e54e3256e79f1d5667 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 10 Oct 2022 17:16:10 -0400 Subject: [PATCH] fix #4408: allowing for users to set the exception handling behavior --- CHANGELOG.md | 1 + .../client/informers/ExceptionHandler.java | 49 ++++++++++++ .../client/informers/SharedIndexInformer.java | 30 ++++++-- .../impl/DefaultSharedIndexInformer.java | 25 ++++--- .../impl/SharedInformerFactoryImpl.java | 5 +- .../informers/impl/cache/Reflector.java | 75 ++++++++++--------- .../DefaultSharedIndexInformerResyncTest.java | 13 ++++ .../informers/impl/cache/ReflectorTest.java | 42 ++++++++--- .../mock/DefaultSharedIndexInformerTest.java | 11 ++- 9 files changed, 181 insertions(+), 70 deletions(-) create mode 100644 kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/ExceptionHandler.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fba3de354f..528b06a4162 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ * Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture. * Fix #4396: Provide more error context when @Group/@Version annotations are missing * Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348 +* Fix #4408: Allowing informers started via the start() method to have configurable exception / retry handling. * Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE #### Dependency Upgrade diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/ExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/ExceptionHandler.java new file mode 100644 index 00000000000..1bcbdf439f2 --- /dev/null +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/ExceptionHandler.java @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers; + +import com.fasterxml.jackson.core.JsonProcessingException; +import io.fabric8.kubernetes.client.Watcher; +import io.fabric8.kubernetes.client.WatcherException; + +public interface ExceptionHandler { + + /** + * Called to determine if the informer should continue to retry after the given exception. + *

+ * See also {@link #isDeserializationException(Throwable)} that can help determine if the + * problem is a mismatch with the target client class. + * + * @param isStarted true if the informer had already successfully started + * @param t the {@link WatcherException} from a + * {@link Watcher#onClose(io.fabric8.kubernetes.client.WatcherException)} + * or throwable from a list/watch call. + * @return true if the informer should continue to retry + */ + boolean retryAfterException(boolean isStarted, Throwable t); + + public static boolean isDeserializationException(Throwable t) { + while (t instanceof RuntimeException && !(t instanceof ClassCastException)) { + Throwable cause = ((RuntimeException) t).getCause(); + if (cause == t) { + break; + } + t = cause; + } + return t instanceof ClassCastException || t instanceof JsonProcessingException; + } +} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 264b5222dce..9b5a071fba5 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -23,7 +23,7 @@ import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; import java.util.function.Function; import java.util.stream.Stream; @@ -125,6 +125,8 @@ default boolean hasSynced() { /** * Return true if the informer is running + *

+ * See also {@link #stopped()} */ boolean isRunning(); @@ -161,19 +163,33 @@ default boolean hasSynced() { SharedIndexInformer itemStore(ItemStore itemStore); /** - * A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called. + * A non-blocking alternative to run. Starts the shared informer, which will normally be stopped when {@link #stop()} is + * called. *
- * Only one start attempt is made - subsequent calls will not re-start the informer. + * The stage will be completed normally once the Informer starts watching successfully for the first time. + *

+ * By default the informer will attempt only a single start attempt. Use {@link #exceptionHandler(ExceptionHandler)} to + * modify this behavior. + */ + CompletionStage start(); + + /** + * Sets the {@link ExceptionHandler} for this informer. For example, exceptionHandler((b, t) -> true)), will + * keep retying no matter what the exception is. + *

+ * May only be called prior to the informer starting + * + * @param handler */ - CompletableFuture start(); + SharedIndexInformer exceptionHandler(ExceptionHandler handler); /** - * Return a future that will allow notification of informer stopping. + * Return a {@link CompletionStage} that will allow notification of the informer stopping. *

- * If {@link #stop()} is called, the future will be completed with a null value. + * If {@link #stop()} is called, the CompletionStage will complete normally. *

* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception * - typically a {@link WatcherException} */ - CompletableFuture stopped(); + CompletionStage stopped(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index c1620330165..c4bfb651da1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -18,6 +18,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.informers.ExceptionHandler; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Indexer; @@ -96,7 +97,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis * @param handler event handler */ @Override - public SharedIndexInformer addEventHandler(ResourceEventHandler handler) { + public DefaultSharedIndexInformer addEventHandler(ResourceEventHandler handler) { addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod); return this; } @@ -149,7 +150,7 @@ public CompletableFuture start() { } synchronized (this) { if (!started.compareAndSet(false, true)) { - return CompletableFuture.completedFuture(null); + return reflector.getStartFuture(); } if (initialState != null) { @@ -161,14 +162,7 @@ public CompletableFuture start() { scheduleResync(processor::shouldResync); - return reflector.start().whenComplete((v, t) -> { - // stop called while run is called could be ineffective, check for it afterwards - synchronized (this) { - if (stopped && reflector.isRunning()) { - stop(); - } - } - }); + return reflector.start(); } @Override @@ -221,7 +215,7 @@ private long determineResyncPeriod(long desired, long check) { @Override public boolean isRunning() { - return !stopped && started.get() && reflector.isRunning(); + return !stopped && started.get() && !reflector.isStopped(); } @Override @@ -294,4 +288,13 @@ public CompletableFuture stopped() { return this.reflector.getStopFuture(); } + @Override + public synchronized DefaultSharedIndexInformer exceptionHandler(ExceptionHandler handler) { + if (started.get()) { + throw new KubernetesClientException("Informer cannot be running when handler is set"); + } + this.reflector.setExceptionHandler(handler); + return this; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/SharedInformerFactoryImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/SharedInformerFactoryImpl.java index 5bc1f112978..c9036a9e9a4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/SharedInformerFactoryImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/SharedInformerFactoryImpl.java @@ -32,6 +32,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Future; @@ -106,11 +107,11 @@ public synchronized SharedIndexInformer getExistingSharedIndexInformer(Cl @Override public synchronized Future startAllRegisteredInformers() { - List> startInformerTasks = new ArrayList<>(); + List> startInformerTasks = new ArrayList<>(); if (!informers.isEmpty()) { for (SharedIndexInformer informer : informers) { - CompletableFuture future = informer.start(); + CompletionStage future = informer.start(); startInformerTasks.add(future); future.whenComplete((v, t) -> { if (t != null) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index e7a96264a43..361c9d0a64f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; +import io.fabric8.kubernetes.client.informers.ExceptionHandler; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; @@ -44,12 +45,14 @@ public class Reflector listerWatcher; private final SyncableStore store; private final ReflectorWatcher watcher; - private volatile boolean running; private volatile boolean watching; private volatile CompletableFuture watchFuture; private volatile CompletableFuture reconnectFuture; + private final CompletableFuture startFuture = new CompletableFuture<>(); private final CompletableFuture stopFuture = new CompletableFuture<>(); private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; + //default behavior - retry if started and it's not a watcherexception + private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException); public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this.listerWatcher = listerWatcher; @@ -60,25 +63,17 @@ public Reflector(ListerWatcher listerWatcher, SyncableStore store) { } public CompletableFuture start() { - return start(false); // start without reconnecting + listSyncAndWatch(); + return startFuture; } - public CompletableFuture start(boolean reconnect) { - this.running = true; - CompletableFuture result = listSyncAndWatch(reconnect); - if (!reconnect) { - result.whenComplete((v, t) -> { - if (t != null) { - stopFuture.completeExceptionally(t); - } - }); - } - return result; + public CompletableFuture getStartFuture() { + return startFuture; } public void stop() { - running = false; stopFuture.complete(null); + startFuture.completeExceptionally(new KubernetesClientException("informer manually stopped before starting")); Future future = reconnectFuture; if (future != null) { future.cancel(true); @@ -106,8 +101,8 @@ private synchronized void stopWatcher() { * * @return a future that completes when the list and watch are established */ - public CompletableFuture listSyncAndWatch(boolean reconnect) { - if (!running) { + public CompletableFuture listSyncAndWatch() { + if (isStopped()) { return CompletableFuture.completedFuture(null); } Set nextKeys = new ConcurrentSkipListSet<>(); @@ -119,7 +114,7 @@ public CompletableFuture listSyncAndWatch(boolean reconnect) { return startWatcher(latestResourceVersion); }).thenAccept(w -> { if (w != null) { - if (running) { + if (!isStopped()) { if (log.isDebugEnabled()) { log.debug("Watch started for {}", Reflector.this); } @@ -129,26 +124,34 @@ public CompletableFuture listSyncAndWatch(boolean reconnect) { } } }); - if (reconnect) { - theFuture.whenComplete((v, t) -> { - if (t != null) { - log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); - reconnect(); - } else { - retryIntervalCalculator.resetReconnectAttempts(); - } - }); - } + theFuture.whenComplete((v, t) -> { + if (t != null) { + onException("listSyncAndWatch", t); + } else { + startFuture.complete(null); + retryIntervalCalculator.resetReconnectAttempts(); + } + }); return theFuture; } + private void onException(String operation, Throwable t) { + if (handler.retryAfterException(startFuture.isDone() && !startFuture.isCompletedExceptionally(), t)) { + log.warn("{} failed for {}, will retry", operation, Reflector.this, t); + reconnect(); + } else { + startFuture.completeExceptionally(t); + stopFuture.completeExceptionally(t); + } + } + protected void reconnect() { - if (!running) { + if (isStopped()) { return; } // this can be run in the scheduler thread because // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(true), + reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(), retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); } @@ -179,7 +182,7 @@ private void stopWatch(Watch w) { } private synchronized CompletableFuture startWatcher(final String latestResourceVersion) { - if (!running) { + if (isStopped()) { return CompletableFuture.completedFuture(null); } log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); @@ -200,8 +203,8 @@ public String getLastSyncResourceVersion() { return lastSyncResourceVersion; } - public boolean isRunning() { - return running; + public boolean isStopped() { + return stopFuture.isDone(); } public boolean isWatching() { @@ -251,9 +254,7 @@ public void onClose(WatcherException exception) { // start a whole new list/watch cycle reconnect(); } else { - running = false; // shouldn't happen, but it means the watch won't restart - stopFuture.completeExceptionally(exception); - log.warn("Watch closing with exception for {}", Reflector.this, exception); + onException("watch", exception); } } @@ -282,4 +283,8 @@ public CompletableFuture getStopFuture() { return stopFuture; } + public void setExceptionHandler(ExceptionHandler handler) { + this.handler = handler; + } + } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index b602219e8c0..a87c0c6a2be 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -17,11 +17,15 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.api.model.PodListBuilder; +import io.fabric8.kubernetes.client.Watch; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -45,6 +49,15 @@ private DefaultSharedIndexInformer createDefaultSharedIndexInforme return defaultSharedIndexInformer; } + @BeforeEach + void beforeEach() { + Mockito.when(listerWatcher.submitWatch(Mockito.any(), Mockito.any())) + .thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class))); + PodList result = new PodListBuilder().withNewMetadata().endMetadata().build(); + Mockito.when(listerWatcher.submitList(Mockito.any())) + .thenReturn(CompletableFuture.completedFuture(result)); + } + @AfterEach void afterEach() { if (defaultSharedIndexInformer != null) { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index cd856a906b6..b445f4f7fc3 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -47,36 +47,60 @@ protected void reconnect() { // do nothing } }; + reflector.setExceptionHandler((b, t) -> true); assertFalse(reflector.isWatching()); - assertFalse(reflector.isRunning()); + assertFalse(reflector.isStopped()); // throw an exception, then watch normally Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any())) .thenThrow(new KubernetesClientException("error")) .thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class))); - CompletableFuture future = reflector.start(true); + CompletableFuture future = reflector.start(); - assertThrows(CompletionException.class, future::join); + // since we're reconnecting, but have overridden reconnect, this won't be done + assertFalse(future.isDone()); // running but watch failed assertFalse(reflector.isWatching()); - assertTrue(reflector.isRunning()); + assertFalse(reflector.isStopped()); - reflector.listSyncAndWatch(false).join(); + reflector.listSyncAndWatch().join(); assertTrue(reflector.isWatching()); - assertTrue(reflector.isRunning()); + assertFalse(reflector.isStopped()); assertFalse(reflector.getStopFuture().isDone()); + assertTrue(future.isDone()); + assertTrue(!future.isCompletedExceptionally()); reflector.stop(); assertFalse(reflector.isWatching()); - assertFalse(reflector.isRunning()); + assertTrue(reflector.isStopped()); assertTrue(reflector.getStopFuture().isDone()); } + @Test + void testNotRunningAfterStartError() { + ListerWatcher mock = Mockito.mock(ListerWatcher.class); + PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build(); + Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list)); + + Reflector reflector = new Reflector(mock, Mockito.mock(SyncableStore.class)); + + // throw an exception, then watch normally + Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any())) + .thenThrow(new KubernetesClientException("error")); + + // single start + CompletableFuture future = reflector.start(); + + assertThrows(CompletionException.class, future::join); + + assertTrue(reflector.isStopped()); + } + @Test void testNonHttpGone() { ListerWatcher mock = Mockito.mock(ListerWatcher.class); @@ -91,12 +115,12 @@ void testNonHttpGone() { reflector.start(); assertTrue(reflector.isWatching()); - assertTrue(reflector.isRunning()); + assertFalse(reflector.isStopped()); reflector.getWatcher().onClose(new WatcherException(null)); assertFalse(reflector.isWatching()); - assertFalse(reflector.isRunning()); + assertTrue(reflector.isStopped()); } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index f786c6204d1..a4e3b252e26 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -64,12 +64,11 @@ import java.net.HttpURLConnection; import java.util.Collections; -import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.function.BiFunction; import java.util.function.Function; @@ -906,9 +905,9 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { void terminalExceptionWhenNoResourcesFoundIsPropagatedToStopped() { try (SharedIndexInformer informer = client.pods().runnableInformer(0)) { final KubernetesClientException runException = assertThrows(KubernetesClientException.class, informer::run); - final CompletableFuture stopped = informer.stopped(); + final CompletionStage stopped = informer.stopped(); final ExecutionException result = assertThrows(ExecutionException.class, - () -> stopped.get(10, TimeUnit.SECONDS)); + () -> stopped.toCompletableFuture().get(10, TimeUnit.SECONDS)); assertThat(result) .cause() .isInstanceOf(KubernetesClientException.class) @@ -934,9 +933,9 @@ void terminalExceptionWhenWatchFailsIsPropagatedToStopped() { .done().always(); try (SharedIndexInformer informer = client.pods().inAnyNamespace().runnableInformer(0)) { informer.run(); - final CompletableFuture stopped = informer.stopped(); + final CompletionStage stopped = informer.stopped(); final ExecutionException result = assertThrows(ExecutionException.class, - () -> stopped.get(10, TimeUnit.SECONDS)); + () -> stopped.toCompletableFuture().get(10, TimeUnit.SECONDS)); assertThat(result) .cause() .isInstanceOf(WatcherException.class)