diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java
index 21fe65ad922..7d828744c02 100644
--- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java
+++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java
@@ -16,17 +16,44 @@
package io.fabric8.kubernetes.client.informers;
-import io.fabric8.kubernetes.client.WatcherException;
-
public interface InformerExceptionHandler {
+ public enum EventType {
+ /**
+ * an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception.
+ */
+ LIST_OR_WATCH,
+ /**
+ * an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception.
+ */
+ HANDLER
+ }
+
/**
- * Provides a callback when the informer could terminated with a non-recoverable exception.
+ * Determine if the informer should stop given from a non-http gone WatchException cause.
+ *
+ * The default behavior is to terminate as we cannot guarantee if the state is still correct
*
- * @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an
- * exception from calling list or watch
- * @return true if the informer should retry, false if it should stop
+ * @param t the non-http gone WatchException cause
+ * @return true if the informer should stop, false if it should attempt to keep running
+ */
+ default boolean shouldStop(Throwable t) {
+ return true;
+ }
+
+ /**
+ * Override the default handling of exceptions seen while the informer is running.
+ *
+ * If you want to stop the informer as a side-effect of this call, then construct your implementation
+ * of this class with a reference to the informer then call the stop method.
*/
- boolean retry(Throwable t);
+ void onException(EventType eventType, Throwable t);
+
+ /**
+ * Called after each time the list, sync and watch operations have been successful.
+ */
+ default void onWatching() {
+
+ }
}
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java
index ca68b92a34a..8c6ae32a6bd 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java
@@ -44,7 +44,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
@@ -60,7 +59,6 @@ public abstract class AbstractWatchManager implements Wat
final AtomicBoolean forceClosed;
private final int reconnectLimit;
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
- final AtomicInteger currentReconnectAttempt;
private Future> reconnectAttempt;
protected final HttpClient client;
@@ -77,7 +75,6 @@ public abstract class AbstractWatchManager implements Wat
this.reconnectLimit = reconnectLimit;
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent);
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
- this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks());
// opt into bookmarks by default
@@ -164,18 +161,15 @@ synchronized void reconnect() {
}
final boolean cannotReconnect() {
- return !watcher.reconnecting() && currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0;
+ return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit && reconnectLimit >= 0;
}
final long nextReconnectInterval() {
- int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
- long ret = retryIntervalCalculator.getInterval(exponentOfTwo);
- logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo);
- return ret;
+ return retryIntervalCalculator.nextReconnectInterval();
}
void resetReconnectAttempts() {
- currentReconnectAttempt.set(0);
+ retryIntervalCalculator.resetReconnectAttempts();
}
boolean isForceClosed() {
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java
index 3a2f322363a..f86e74544f8 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java
@@ -60,7 +60,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
public class OperationSupport {
@@ -83,7 +82,6 @@ public class OperationSupport {
protected String apiGroupName;
protected String apiGroupVersion;
protected boolean dryRun;
- private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
private final int requestRetryBackoffLimit;
public OperationSupport(Client client) {
@@ -107,16 +105,11 @@ public OperationSupport(OperationContext ctx) {
this.apiGroupVersion = "v1";
}
- final int requestRetryBackoffInterval;
if (ctx.getConfig() != null) {
- requestRetryBackoffInterval = ctx.getConfig().getRequestRetryBackoffInterval();
this.requestRetryBackoffLimit = ctx.getConfig().getRequestRetryBackoffLimit();
} else {
- requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
this.requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
}
- this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval,
- MAX_RETRY_INTERVAL_EXPONENT);
}
public String getAPIGroupName() {
@@ -574,7 +567,7 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest
VersionUsageUtils.log(this.resourceT, this.apiGroupVersion);
HttpRequest request = requestBuilder.build();
CompletableFuture> futureResponse = new CompletableFuture<>();
- retryWithExponentialBackoff(futureResponse, new AtomicInteger(), Utils.getNonNullOrElse(client, httpClient), request);
+ retryWithExponentialBackoff(futureResponse, new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT), Utils.getNonNullOrElse(client, httpClient), request);
return futureResponse.thenApply(response -> {
try {
@@ -593,13 +586,13 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest
}
protected void retryWithExponentialBackoff(CompletableFuture> result,
- AtomicInteger numRetries,
+ ExponentialBackoffIntervalCalculator retryIntervalCalculator,
HttpClient client, HttpRequest request) {
client.sendAsync(request, byte[].class)
.whenComplete((response, throwable) -> {
- int retries = numRetries.getAndIncrement();
+ int retries = retryIntervalCalculator.getCurrentReconnectAttempt();
if (retries < requestRetryBackoffLimit) {
- long retryInterval = retryIntervalCalculator.getInterval(retries);
+ long retryInterval = retryIntervalCalculator.nextReconnectInterval();
boolean retry = false;
if (response != null && response.code() >= 500) {
LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
@@ -612,7 +605,7 @@ protected void retryWithExponentialBackoff(CompletableFuture retryWithExponentialBackoff(result, numRetries, client, request), retryInterval, TimeUnit.MILLISECONDS);
+ () -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, TimeUnit.MILLISECONDS);
return;
}
}
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java
index 33c43ee4568..0d9fcf23fae 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java
@@ -45,6 +45,8 @@
public class WatchConnectionManager>
extends AbstractWatchManager {
+ public static final int BACKOFF_MAX_EXPONENT = 5;
+
private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class);
protected WatcherWebSocketListener listener;
@@ -78,7 +80,7 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation watcher, final int reconnectInterval, final int reconnectLimit,
long websocketTimeout) throws MalformedURLException {
// Default max 32x slowdown from base interval
- this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5);
+ this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, BACKOFF_MAX_EXPONENT);
}
@Override
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java
index 1a27f7ece20..19866930309 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java
@@ -73,8 +73,6 @@ public class DefaultSharedIndexInformer initialState;
- private InformerExceptionHandler exceptionHandler;
-
public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod,
Executor informerExecutor) {
if (resyncPeriod < 0) {
@@ -90,11 +88,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis
this.processor = new SharedProcessor<>(informerExecutor, description);
processorStore = new ProcessorStore<>(this.indexer, this.processor);
- this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler);
- }
-
- public InformerExceptionHandler getExceptionHandler() {
- return exceptionHandler;
+ this.reflector = new Reflector<>(listerWatcher, processorStore);
}
/**
@@ -298,6 +292,7 @@ public String toString() {
@Override
public void setExceptionHandler(InformerExceptionHandler handler) {
- this.exceptionHandler = handler;
+ this.reflector.setExceptionHandler(handler);
+ this.processor.setExceptionHandler(handler);
}
}
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java
index 20db9b9a7f4..abfd4b33008 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java
@@ -22,9 +22,12 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
+import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
+import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
+import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,7 +38,6 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
public class Reflector> {
@@ -49,16 +51,14 @@ public class Reflector watchFuture;
private volatile CompletableFuture> reconnectFuture;
+ private volatile InformerExceptionHandler handler;
+ private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
public Reflector(ListerWatcher listerWatcher, SyncableStore store) {
- this(listerWatcher, store, null);
- }
-
- public Reflector(ListerWatcher listerWatcher, SyncableStore store,
- Supplier exceptionHandlerSupplier) {
this.listerWatcher = listerWatcher;
this.store = store;
- this.watcher = new ReflectorWatcher(exceptionHandlerSupplier);
+ this.watcher = new ReflectorWatcher();
+ this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), WatchConnectionManager.BACKOFF_MAX_EXPONENT);
}
public CompletableFuture start() {
@@ -114,7 +114,14 @@ public CompletableFuture listSyncAndWatch() {
started.whenComplete((w, t) -> {
if (w != null) {
if (running) {
+ if (log.isDebugEnabled()) {
+ log.debug("Watch started for {}", Reflector.this);
+ }
watching = true;
+ InformerExceptionHandler theHandler = handler;
+ if (theHandler != null) {
+ theHandler.onWatching();
+ }
} else {
stopWatch(w);
}
@@ -182,12 +189,6 @@ public boolean isWatching() {
class ReflectorWatcher implements Watcher {
- private final Supplier exceptionHandlerSupplier;
-
- ReflectorWatcher(Supplier exceptionHandlerSupplier) {
- this.exceptionHandlerSupplier = exceptionHandlerSupplier;
- }
-
@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
@@ -222,38 +223,45 @@ public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
watchStopped();
- InformerExceptionHandler handler = exceptionHandlerSupplier.get();
+ InformerExceptionHandler theHandler = handler;
boolean reconnect = false;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
reconnect = true;
- } else if (handler != null) {
- reconnect = handler.retry(exception.getCause());
+ } else if (theHandler != null) {
+ reconnect = !theHandler.shouldStop(exception.getCause());
+ } else {
+ log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
- if (handler == null) {
- log.warn("Watch closing with exception for {}", Reflector.this, exception);
- }
}
}
private void reconnect() {
+ if (!running) {
+ return;
+ }
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
- listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
+ retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
- InformerExceptionHandler handler = exceptionHandlerSupplier.get();
- if (handler == null || handler.retry(t)) {
- reconnect();
+ InformerExceptionHandler theHandler = handler;
+ if (theHandler != null) {
+ theHandler.onException(EventType.LIST_OR_WATCH, t);
+ } else {
+ log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
}
+ reconnect();
+ } else {
+ retryIntervalCalculator.resetReconnectAttempts();
}
});
}
@@ -278,4 +286,8 @@ ReflectorWatcher getWatcher() {
public String toString() {
return listerWatcher.getApiEndpointPath();
}
+
+ public void setExceptionHandler(InformerExceptionHandler handler) {
+ this.handler = handler;
+ }
}
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java
index 86e334bb93a..d421c19428d 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/SharedProcessor.java
@@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.informers.impl.cache;
+import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
+import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import org.slf4j.Logger;
@@ -51,6 +53,8 @@ public class SharedProcessor {
private final SerialExecutor executor;
private final String informerDescription;
+ private volatile InformerExceptionHandler handler;
+
public SharedProcessor() {
this(Runnable::run, "informer");
}
@@ -109,8 +113,13 @@ public void distribute(Consumer> operation, boolean isSync)
try {
operation.accept(listener);
} catch (Exception ex) {
- log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
- ex);
+ InformerExceptionHandler theHandler = handler;
+ if (theHandler != null) {
+ theHandler.onException(EventType.HANDLER, ex);
+ } else {
+ log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
+ ex);
+ }
}
}
});
@@ -170,4 +179,9 @@ public ProcessorListener addProcessorListener(ResourceEventHandler super T>
lock.writeLock().unlock();
}
}
+
+ public void setExceptionHandler(InformerExceptionHandler handler) {
+ this.handler = handler;
+ }
+
}
diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java
index 269e2528e22..3e9696c5c5a 100644
--- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java
+++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java
@@ -15,10 +15,18 @@
*/
package io.fabric8.kubernetes.client.utils.internal;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
public class ExponentialBackoffIntervalCalculator {
+ private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffIntervalCalculator.class);
+
private final int initialInterval;
private final int maxRetryIntervalExponent;
+ final AtomicInteger currentReconnectAttempt = new AtomicInteger();
public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetryIntervalExponent) {
this.initialInterval = initialInterval;
@@ -33,4 +41,19 @@ public long getInterval(int retryIndex) {
return (long)initialInterval * (1 << exponentOfTwo);
}
+ public void resetReconnectAttempts() {
+ currentReconnectAttempt.set(0);
+ }
+
+ public final long nextReconnectInterval() {
+ int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
+ long ret = getInterval(exponentOfTwo);
+ logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo);
+ return ret;
+ }
+
+ public int getCurrentReconnectAttempt() {
+ return currentReconnectAttempt.get();
+ }
+
}