diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java index 21fe65ad922..8c37819579d 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java @@ -20,13 +20,32 @@ public interface InformerExceptionHandler { + public enum EventType { + /** + * the cause of a non-http gone {@link WatcherException}. The default handling is to log the exception and stop the + * informer. + */ + WATCH_CLOSE, + /** + * an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception. + */ + LIST_OR_WATCH, + /** + * an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception. + */ + HANDLER + } + /** - * Provides a callback when the informer could terminated with a non-recoverable exception. - * - * @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an - * exception from calling list or watch - * @return true if the informer should retry, false if it should stop + * Override the default handling of exceptions seen while the informer is running. */ - boolean retry(Throwable t); + void onException(EventType eventType, Throwable t); + + /** + * Called after each time the list, sync and watch operations have been successful. + */ + default void onWatching() { + + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 1a27f7ece20..19866930309 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -73,8 +73,6 @@ public class DefaultSharedIndexInformer initialState; - private InformerExceptionHandler exceptionHandler; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, Executor informerExecutor) { if (resyncPeriod < 0) { @@ -90,11 +88,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(informerExecutor, description); processorStore = new ProcessorStore<>(this.indexer, this.processor); - this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler); - } - - public InformerExceptionHandler getExceptionHandler() { - return exceptionHandler; + this.reflector = new Reflector<>(listerWatcher, processorStore); } /** @@ -298,6 +292,7 @@ public String toString() { @Override public void setExceptionHandler(InformerExceptionHandler handler) { - this.exceptionHandler = handler; + this.reflector.setExceptionHandler(handler); + this.processor.setExceptionHandler(handler); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 20db9b9a7f4..639bf5a0336 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -23,6 +23,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; import org.slf4j.Logger; @@ -35,7 +36,6 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; public class Reflector> { @@ -49,16 +49,12 @@ public class Reflector watchFuture; private volatile CompletableFuture reconnectFuture; + private volatile InformerExceptionHandler handler; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { - this(listerWatcher, store, null); - } - - public Reflector(ListerWatcher listerWatcher, SyncableStore store, - Supplier exceptionHandlerSupplier) { this.listerWatcher = listerWatcher; this.store = store; - this.watcher = new ReflectorWatcher(exceptionHandlerSupplier); + this.watcher = new ReflectorWatcher(); } public CompletableFuture start() { @@ -114,7 +110,14 @@ public CompletableFuture listSyncAndWatch() { started.whenComplete((w, t) -> { if (w != null) { if (running) { + if (log.isDebugEnabled()) { + log.debug("Watch started for {}", Reflector.this); + } watching = true; + InformerExceptionHandler theHandler = handler; + if (theHandler != null) { + theHandler.onWatching(); + } } else { stopWatch(w); } @@ -182,12 +185,6 @@ public boolean isWatching() { class ReflectorWatcher implements Watcher { - private final Supplier exceptionHandlerSupplier; - - ReflectorWatcher(Supplier exceptionHandlerSupplier) { - this.exceptionHandlerSupplier = exceptionHandlerSupplier; - } - @Override public void eventReceived(Action action, T resource) { if (action == null) { @@ -222,38 +219,43 @@ public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this watchStopped(); - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); - boolean reconnect = false; + InformerExceptionHandler theHandler = handler; + boolean reconnect = true; if (exception.isHttpGone()) { if (log.isDebugEnabled()) { log.debug("Watch restarting due to http gone for {}", Reflector.this); } - reconnect = true; - } else if (handler != null) { - reconnect = handler.retry(exception.getCause()); + } else if (theHandler != null) { + theHandler.onException(EventType.WATCH_CLOSE, exception.getCause()); + } else { + reconnect = false; + log.warn("Watch closing with exception for {}", Reflector.this, exception); } if (reconnect) { // start a whole new list/watch cycle reconnect(); } else { running = false; // shouldn't happen, but it means the watch won't restart - if (handler == null) { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } } } private void reconnect() { + if (!running) { + return; + } // this can be run in the scheduler thread because // any further operations will happen on the io thread reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); reconnectFuture.whenComplete((v, t) -> { if (t != null) { - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); - if (handler == null || handler.retry(t)) { - reconnect(); + InformerExceptionHandler theHandler = handler; + if (theHandler != null) { + theHandler.onException(EventType.LIST_OR_WATCH, t); + } else { + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); } + reconnect(); } }); } @@ -278,4 +280,8 @@ ReflectorWatcher getWatcher() { public String toString() { return listerWatcher.getApiEndpointPath(); } + + public void setExceptionHandler(InformerExceptionHandler handler) { + this.handler = handler; + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index 86e334bb93a..d421c19428d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -15,6 +15,8 @@ */ package io.fabric8.kubernetes.client.informers.impl.cache; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; +import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; @@ -51,6 +53,8 @@ public class SharedProcessor { private final SerialExecutor executor; private final String informerDescription; + private volatile InformerExceptionHandler handler; + public SharedProcessor() { this(Runnable::run, "informer"); } @@ -109,8 +113,13 @@ public void distribute(Consumer> operation, boolean isSync) try { operation.accept(listener); } catch (Exception ex) { - log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), - ex); + InformerExceptionHandler theHandler = handler; + if (theHandler != null) { + theHandler.onException(EventType.HANDLER, ex); + } else { + log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), + ex); + } } } }); @@ -170,4 +179,9 @@ public ProcessorListener addProcessorListener(ResourceEventHandler lock.writeLock().unlock(); } } + + public void setExceptionHandler(InformerExceptionHandler handler) { + this.handler = handler; + } + }