Skip to content

Commit

Permalink
fix #4369: generalizing the exception handler
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 6, 2022
1 parent 69aa906 commit 6110402
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 40 deletions.
Expand Up @@ -20,13 +20,32 @@

public interface InformerExceptionHandler {

public enum EventType {
/**
* the cause of a non-http gone {@link WatcherException}. The default handling is to log the exception and stop the
* informer.
*/
WATCH_CLOSE,
/**
* an exception that occurs trying to perform the list or watch operation. The default handling is to log the exception.
*/
LIST_OR_WATCH,
/**
* an exception that occurs invoking a {@link ResourceEventHandler} method. The default handling is to log the exception.
*/
HANDLER
}

/**
* Provides a callback when the informer could terminated with a non-recoverable exception.
*
* @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an
* exception from calling list or watch
* @return true if the informer should retry, false if it should stop
* Override the default handling of exceptions seen while the informer is running.
*/
boolean retry(Throwable t);
void onException(EventType eventType, Throwable t);

/**
* Called after each time the list, sync and watch operations have been successful.
*/
default void onWatching() {

}

}
Expand Up @@ -73,8 +73,6 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne

private Stream<T> initialState;

private InformerExceptionHandler exceptionHandler;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod,
Executor informerExecutor) {
if (resyncPeriod < 0) {
Expand All @@ -90,11 +88,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(informerExecutor, description);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler);
}

public InformerExceptionHandler getExceptionHandler() {
return exceptionHandler;
this.reflector = new Reflector<>(listerWatcher, processorStore);
}

/**
Expand Down Expand Up @@ -298,6 +292,7 @@ public String toString() {

@Override
public void setExceptionHandler(InformerExceptionHandler handler) {
this.exceptionHandler = handler;
this.reflector.setExceptionHandler(handler);
this.processor.setExceptionHandler(handler);
}
}
Expand Up @@ -23,6 +23,7 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
Expand All @@ -35,7 +36,6 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {

Expand All @@ -49,16 +49,12 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile CompletableFuture<?> reconnectFuture;
private volatile InformerExceptionHandler handler;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, null);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store,
Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher(exceptionHandlerSupplier);
this.watcher = new ReflectorWatcher();
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -114,7 +110,14 @@ public CompletableFuture<Void> listSyncAndWatch() {
started.whenComplete((w, t) -> {
if (w != null) {
if (running) {
if (log.isDebugEnabled()) {
log.debug("Watch started for {}", Reflector.this);
}
watching = true;
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onWatching();
}
} else {
stopWatch(w);
}
Expand Down Expand Up @@ -182,12 +185,6 @@ public boolean isWatching() {

class ReflectorWatcher implements Watcher<T> {

private final Supplier<InformerExceptionHandler> exceptionHandlerSupplier;

ReflectorWatcher(Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.exceptionHandlerSupplier = exceptionHandlerSupplier;
}

@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
Expand Down Expand Up @@ -222,38 +219,43 @@ public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
watchStopped();
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
boolean reconnect = false;
InformerExceptionHandler theHandler = handler;
boolean reconnect = true;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
reconnect = true;
} else if (handler != null) {
reconnect = handler.retry(exception.getCause());
} else if (theHandler != null) {
theHandler.onException(EventType.WATCH_CLOSE, exception.getCause());
} else {
reconnect = false;
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
if (handler == null) {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}
}

private void reconnect() {
if (!running) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler == null || handler.retry(t)) {
reconnect();
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onException(EventType.LIST_OR_WATCH, t);
} else {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
}
reconnect();
}
});
}
Expand All @@ -278,4 +280,8 @@ ReflectorWatcher getWatcher() {
public String toString() {
return listerWatcher.getApiEndpointPath();
}

public void setExceptionHandler(InformerExceptionHandler handler) {
this.handler = handler;
}
}
Expand Up @@ -15,6 +15,8 @@
*/
package io.fabric8.kubernetes.client.informers.impl.cache;

import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -51,6 +53,8 @@ public class SharedProcessor<T> {
private final SerialExecutor executor;
private final String informerDescription;

private volatile InformerExceptionHandler handler;

public SharedProcessor() {
this(Runnable::run, "informer");
}
Expand Down Expand Up @@ -109,8 +113,13 @@ public void distribute(Consumer<ProcessorListener<T>> operation, boolean isSync)
try {
operation.accept(listener);
} catch (Exception ex) {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onException(EventType.HANDLER, ex);
} else {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
}
}
});
Expand Down Expand Up @@ -170,4 +179,9 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
lock.writeLock().unlock();
}
}

public void setExceptionHandler(InformerExceptionHandler handler) {
this.handler = handler;
}

}

0 comments on commit 6110402

Please sign in to comment.