diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java index 70d026ac6b..21fe65ad92 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java @@ -1,9 +1,32 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.fabric8.kubernetes.client.informers; import io.fabric8.kubernetes.client.WatcherException; public interface InformerExceptionHandler { - void onWatchNonrecoverable(WatcherException e); + /** + * Provides a callback when the informer could terminated with a non-recoverable exception. + * + * @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an + * exception from calling list or watch + * @return true if the informer should retry, false if it should stop + */ + boolean retry(Throwable t); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 0919e69def..3250cac2ed 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -83,7 +83,7 @@ default SharedIndexInformer removeNamespaceIndex() { * @param resyncPeriod the specific resync period */ SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, - long resyncPeriod); + long resyncPeriod); /** * Starts the shared informer, which will be stopped when {@link #stop()} is called. diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index ab6aa5827c..20db9b9a7f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -48,14 +48,14 @@ public class Reflector watchFuture; - private volatile Future reconnectFuture; + private volatile CompletableFuture reconnectFuture; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this(listerWatcher, store, null); } public Reflector(ListerWatcher listerWatcher, SyncableStore store, - Supplier exceptionHandlerSupplier) { + Supplier exceptionHandlerSupplier) { this.listerWatcher = listerWatcher; this.store = store; this.watcher = new ReflectorWatcher(exceptionHandlerSupplier); @@ -126,9 +126,9 @@ public CompletableFuture listSyncAndWatch() { private CompletableFuture processList(Set nextKeys, String continueVal) { CompletableFuture futureResult = listerWatcher - .submitList( - new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) - .build()); + .submitList( + new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) + .build()); return futureResult.thenCompose(result -> { result.getItems().forEach(i -> { @@ -157,9 +157,10 @@ private synchronized CompletableFuture startWatcher(final String latestRe log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); // there's no need to stop the old watch, that will happen automatically when this call completes watchFuture = listerWatcher.submitWatch( - new ListOptionsBuilder().withResourceVersion(latestResourceVersion) - .withTimeoutSeconds(null) - .build(), watcher); + new ListOptionsBuilder().withResourceVersion(latestResourceVersion) + .withTimeoutSeconds(null) + .build(), + watcher); return watchFuture; } @@ -187,7 +188,6 @@ class ReflectorWatcher implements Watcher { this.exceptionHandlerSupplier = exceptionHandlerSupplier; } - @Override public void eventReceived(Action action, T resource) { if (action == null) { @@ -198,8 +198,8 @@ public void eventReceived(Action action, T resource) { } if (log.isDebugEnabled()) { log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), - resource.getKind(), - resource.getMetadata().getResourceVersion(), Reflector.this); + resource.getKind(), + resource.getMetadata().getResourceVersion(), Reflector.this); } switch (action) { case ERROR: @@ -221,38 +221,43 @@ public void eventReceived(Action action, T resource) { public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this - boolean restarted = false; - try { - if (exception.isHttpGone()) { - if (log.isDebugEnabled()) { - log.debug("Watch restarting due to http gone for {}", Reflector.this); - } - listSyncAndWatch().whenComplete((v, t) -> { - if (t != null) { - watchStopped(); - // start a whole new list/watch cycle, can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); - } - }); - restarted = true; - } else { - running = false; // shouldn't happen, but it means the watch won't restart - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); - if (handler != null) { - handler.onWatchNonrecoverable(exception); - } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } + watchStopped(); + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + boolean reconnect = false; + if (exception.isHttpGone()) { + if (log.isDebugEnabled()) { + log.debug("Watch restarting due to http gone for {}", Reflector.this); } - } finally { - if (!restarted) { - watchStopped(); // report the watch as stopped after a problem + reconnect = true; + } else if (handler != null) { + reconnect = handler.retry(exception.getCause()); + } + if (reconnect) { + // start a whole new list/watch cycle + reconnect(); + } else { + running = false; // shouldn't happen, but it means the watch won't restart + if (handler == null) { + log.warn("Watch closing with exception for {}", Reflector.this, exception); } } } + private void reconnect() { + // this can be run in the scheduler thread because + // any further operations will happen on the io thread + reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, + listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); + reconnectFuture.whenComplete((v, t) -> { + if (t != null) { + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + if (handler == null || handler.retry(t)) { + reconnect(); + } + } + }); + } + @Override public void onClose() { watchStopped();