diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java deleted file mode 100644 index 7d828744c02..00000000000 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.fabric8.kubernetes.client.informers; - -public interface InformerExceptionHandler { - - public enum EventType { - /** - * an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception. - */ - LIST_OR_WATCH, - /** - * an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception. - */ - HANDLER - } - - /** - * Determine if the informer should stop given from a non-http gone WatchException cause. - *

- * The default behavior is to terminate as we cannot guarantee if the state is still correct - * - * @param t the non-http gone WatchException cause - * @return true if the informer should stop, false if it should attempt to keep running - */ - default boolean shouldStop(Throwable t) { - return true; - } - - /** - * Override the default handling of exceptions seen while the informer is running. - *

- * If you want to stop the informer as a side-effect of this call, then construct your implementation - * of this class with a reference to the informer then call the stop method. - */ - void onException(EventType eventType, Throwable t); - - /** - * Called after each time the list, sync and watch operations have been successful. - */ - default void onWatching() { - - } - -} diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 3250cac2ed9..c9082164e55 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -166,6 +166,9 @@ default boolean hasSynced() { */ CompletableFuture start(); - default void setExceptionHandler(InformerExceptionHandler handler) { - } + /** + * Return a future that will allow notification of informer stopping. + * + */ + CompletableFuture stopped(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 19866930309..74b5cbfff16 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -18,7 +18,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Indexer; @@ -291,8 +290,8 @@ public String toString() { } @Override - public void setExceptionHandler(InformerExceptionHandler handler) { - this.reflector.setExceptionHandler(handler); - this.processor.setExceptionHandler(handler); + public CompletableFuture stopped() { + return this.reflector.getStopFuture(); } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index abfd4b33008..8d6e0259726 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -23,8 +23,6 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; @@ -51,7 +49,7 @@ public class Reflector watchFuture; private volatile CompletableFuture reconnectFuture; - private volatile InformerExceptionHandler handler; + private volatile CompletableFuture stopFuture = new CompletableFuture(); private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { @@ -68,6 +66,7 @@ public CompletableFuture start() { public void stop() { running = false; + stopFuture.complete(null); Future future = reconnectFuture; if (future != null) { future.cancel(true); @@ -118,10 +117,6 @@ public CompletableFuture listSyncAndWatch() { log.debug("Watch started for {}", Reflector.this); } watching = true; - InformerExceptionHandler theHandler = handler; - if (theHandler != null) { - theHandler.onWatching(); - } } else { stopWatch(w); } @@ -223,23 +218,16 @@ public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this watchStopped(); - InformerExceptionHandler theHandler = handler; - boolean reconnect = false; if (exception.isHttpGone()) { if (log.isDebugEnabled()) { log.debug("Watch restarting due to http gone for {}", Reflector.this); } - reconnect = true; - } else if (theHandler != null) { - reconnect = !theHandler.shouldStop(exception.getCause()); - } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } - if (reconnect) { // start a whole new list/watch cycle reconnect(); } else { running = false; // shouldn't happen, but it means the watch won't restart + stopFuture.completeExceptionally(exception.getCause()); + log.warn("Watch closing with exception for {}", Reflector.this, exception); } } @@ -253,12 +241,7 @@ private void reconnect() { retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); reconnectFuture.whenComplete((v, t) -> { if (t != null) { - InformerExceptionHandler theHandler = handler; - if (theHandler != null) { - theHandler.onException(EventType.LIST_OR_WATCH, t); - } else { - log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); - } + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); reconnect(); } else { retryIntervalCalculator.resetReconnectAttempts(); @@ -287,7 +270,8 @@ public String toString() { return listerWatcher.getApiEndpointPath(); } - public void setExceptionHandler(InformerExceptionHandler handler) { - this.handler = handler; + public CompletableFuture getStopFuture() { + return stopFuture; } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java index d421c19428d..86e334bb93a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java @@ -15,8 +15,6 @@ */ package io.fabric8.kubernetes.client.informers.impl.cache; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler; -import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.utils.internal.SerialExecutor; import org.slf4j.Logger; @@ -53,8 +51,6 @@ public class SharedProcessor { private final SerialExecutor executor; private final String informerDescription; - private volatile InformerExceptionHandler handler; - public SharedProcessor() { this(Runnable::run, "informer"); } @@ -113,13 +109,8 @@ public void distribute(Consumer> operation, boolean isSync) try { operation.accept(listener); } catch (Exception ex) { - InformerExceptionHandler theHandler = handler; - if (theHandler != null) { - theHandler.onException(EventType.HANDLER, ex); - } else { - log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), - ex); - } + log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(), + ex); } } }); @@ -179,9 +170,4 @@ public ProcessorListener addProcessorListener(ResourceEventHandler lock.writeLock().unlock(); } } - - public void setExceptionHandler(InformerExceptionHandler handler) { - this.handler = handler; - } - }