Skip to content

Commit

Permalink
fix #4369: refining the exception handler and adding back backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 6, 2022
1 parent 69aa906 commit 401b2e3
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 62 deletions.
Expand Up @@ -16,17 +16,44 @@

package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;

public interface InformerExceptionHandler {

public enum EventType {
/**
* an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception.
*/
LIST_OR_WATCH,
/**
* an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception.
*/
HANDLER
}

/**
* Provides a callback when the informer could terminated with a non-recoverable exception.
* Determine if the informer should stop given from a non-http gone WatchException cause.
* <p>
* The default behavior is to terminate as we cannot guarantee if the state is still correct
*
* @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an
* exception from calling list or watch
* @return true if the informer should retry, false if it should stop
* @param t the non-http gone WatchException cause
* @return true if the informer should stop, false if it should attempt to keep running
*/
default boolean shouldStop(Throwable t) {
return true;
}

/**
* Override the default handling of exceptions seen while the informer is running.
* <p>
* If you want to stop the informer as a side-effect of this call, then construct your implementation
* of this class with a reference to the informer then call the stop method.
*/
boolean retry(Throwable t);
void onException(EventType eventType, Throwable t);

/**
* Called after each time the list, sync and watch operations have been successful.
*/
default void onWatching() {

}

}
Expand Up @@ -44,7 +44,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand All @@ -60,7 +59,6 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
final AtomicBoolean forceClosed;
private final int reconnectLimit;
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
final AtomicInteger currentReconnectAttempt;
private Future<?> reconnectAttempt;

protected final HttpClient client;
Expand All @@ -77,7 +75,6 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
this.reconnectLimit = reconnectLimit;
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent);
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks());
// opt into bookmarks by default
Expand Down Expand Up @@ -164,18 +161,15 @@ synchronized void reconnect() {
}

final boolean cannotReconnect() {
return !watcher.reconnecting() && currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0;
return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit && reconnectLimit >= 0;
}

final long nextReconnectInterval() {
int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
long ret = retryIntervalCalculator.getInterval(exponentOfTwo);
logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo);
return ret;
return retryIntervalCalculator.nextReconnectInterval();
}

void resetReconnectAttempts() {
currentReconnectAttempt.set(0);
retryIntervalCalculator.resetReconnectAttempts();
}

boolean isForceClosed() {
Expand Down
Expand Up @@ -60,7 +60,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class OperationSupport {

Expand All @@ -83,7 +82,6 @@ public class OperationSupport {
protected String apiGroupName;
protected String apiGroupVersion;
protected boolean dryRun;
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
private final int requestRetryBackoffLimit;

public OperationSupport(Client client) {
Expand All @@ -107,16 +105,11 @@ public OperationSupport(OperationContext ctx) {
this.apiGroupVersion = "v1";
}

final int requestRetryBackoffInterval;
if (ctx.getConfig() != null) {
requestRetryBackoffInterval = ctx.getConfig().getRequestRetryBackoffInterval();
this.requestRetryBackoffLimit = ctx.getConfig().getRequestRetryBackoffLimit();
} else {
requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
this.requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
}
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval,
MAX_RETRY_INTERVAL_EXPONENT);
}

public String getAPIGroupName() {
Expand Down Expand Up @@ -574,7 +567,7 @@ protected <T> CompletableFuture<T> handleResponse(HttpClient client, HttpRequest
VersionUsageUtils.log(this.resourceT, this.apiGroupVersion);
HttpRequest request = requestBuilder.build();
CompletableFuture<HttpResponse<byte[]>> futureResponse = new CompletableFuture<>();
retryWithExponentialBackoff(futureResponse, new AtomicInteger(), Utils.getNonNullOrElse(client, httpClient), request);
retryWithExponentialBackoff(futureResponse, new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT), Utils.getNonNullOrElse(client, httpClient), request);

return futureResponse.thenApply(response -> {
try {
Expand All @@ -593,13 +586,13 @@ protected <T> CompletableFuture<T> handleResponse(HttpClient client, HttpRequest
}

protected void retryWithExponentialBackoff(CompletableFuture<HttpResponse<byte[]>> result,
AtomicInteger numRetries,
ExponentialBackoffIntervalCalculator retryIntervalCalculator,
HttpClient client, HttpRequest request) {
client.sendAsync(request, byte[].class)
.whenComplete((response, throwable) -> {
int retries = numRetries.getAndIncrement();
int retries = retryIntervalCalculator.getCurrentReconnectAttempt();
if (retries < requestRetryBackoffLimit) {
long retryInterval = retryIntervalCalculator.getInterval(retries);
long retryInterval = retryIntervalCalculator.nextReconnectInterval();
boolean retry = false;
if (response != null && response.code() >= 500) {
LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
Expand All @@ -612,7 +605,7 @@ protected void retryWithExponentialBackoff(CompletableFuture<HttpResponse<byte[]
}
if (retry) {
Utils.schedule(context.getExecutor(),
() -> retryWithExponentialBackoff(result, numRetries, client, request), retryInterval, TimeUnit.MILLISECONDS);
() -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, TimeUnit.MILLISECONDS);
return;
}
}
Expand Down
Expand Up @@ -45,6 +45,8 @@
public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesResourceList<T>>
extends AbstractWatchManager<T> {

public static final int BACKOFF_MAX_EXPONENT = 5;

private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class);

protected WatcherWebSocketListener<T> listener;
Expand Down Expand Up @@ -78,7 +80,7 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation<T, L,
final ListOptions listOptions, final Watcher<T> watcher, final int reconnectInterval, final int reconnectLimit,
long websocketTimeout) throws MalformedURLException {
// Default max 32x slowdown from base interval
this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5);
this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, BACKOFF_MAX_EXPONENT);
}

@Override
Expand Down
Expand Up @@ -73,8 +73,6 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne

private Stream<T> initialState;

private InformerExceptionHandler exceptionHandler;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod,
Executor informerExecutor) {
if (resyncPeriod < 0) {
Expand All @@ -90,11 +88,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(informerExecutor, description);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler);
}

public InformerExceptionHandler getExceptionHandler() {
return exceptionHandler;
this.reflector = new Reflector<>(listerWatcher, processorStore);
}

/**
Expand Down Expand Up @@ -298,6 +292,7 @@ public String toString() {

@Override
public void setExceptionHandler(InformerExceptionHandler handler) {
this.exceptionHandler = handler;
this.reflector.setExceptionHandler(handler);
this.processor.setExceptionHandler(handler);
}
}
Expand Up @@ -22,9 +22,12 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,7 +38,6 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {

Expand All @@ -49,16 +51,14 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile CompletableFuture<?> reconnectFuture;
private volatile InformerExceptionHandler handler;
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, null);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store,
Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher(exceptionHandlerSupplier);
this.watcher = new ReflectorWatcher();
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), WatchConnectionManager.BACKOFF_MAX_EXPONENT);
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -114,7 +114,14 @@ public CompletableFuture<Void> listSyncAndWatch() {
started.whenComplete((w, t) -> {
if (w != null) {
if (running) {
if (log.isDebugEnabled()) {
log.debug("Watch started for {}", Reflector.this);
}
watching = true;
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onWatching();
}
} else {
stopWatch(w);
}
Expand Down Expand Up @@ -182,12 +189,6 @@ public boolean isWatching() {

class ReflectorWatcher implements Watcher<T> {

private final Supplier<InformerExceptionHandler> exceptionHandlerSupplier;

ReflectorWatcher(Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.exceptionHandlerSupplier = exceptionHandlerSupplier;
}

@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
Expand Down Expand Up @@ -222,38 +223,45 @@ public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
watchStopped();
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
InformerExceptionHandler theHandler = handler;
boolean reconnect = false;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
reconnect = true;
} else if (handler != null) {
reconnect = handler.retry(exception.getCause());
} else if (theHandler != null) {
reconnect = !theHandler.shouldStop(exception.getCause());
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
if (handler == null) {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}
}

private void reconnect() {
if (!running) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler == null || handler.retry(t)) {
reconnect();
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onException(EventType.LIST_OR_WATCH, t);
} else {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
}
reconnect();
} else {
retryIntervalCalculator.resetReconnectAttempts();
}
});
}
Expand All @@ -278,4 +286,8 @@ ReflectorWatcher getWatcher() {
public String toString() {
return listerWatcher.getApiEndpointPath();
}

public void setExceptionHandler(InformerExceptionHandler handler) {
this.handler = handler;
}
}
Expand Up @@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.informers.impl.cache;

import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class SharedProcessor<T> {
private final SerialExecutor executor;
private final String informerDescription;

private volatile InformerExceptionHandler handler;

public SharedProcessor() {
this(Runnable::run, "informer");
}
Expand Down Expand Up @@ -109,8 +113,13 @@ public void distribute(Consumer<ProcessorListener<T>> operation, boolean isSync)
try {
operation.accept(listener);
} catch (Exception ex) {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onException(EventType.HANDLER, ex);
} else {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
}
}
});
Expand Down Expand Up @@ -170,4 +179,9 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
lock.writeLock().unlock();
}
}

public void setExceptionHandler(InformerExceptionHandler handler) {
this.handler = handler;
}

}

0 comments on commit 401b2e3

Please sign in to comment.