Skip to content

Commit

Permalink
feat: add possibility to add an exception handler to Informers
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Aug 31, 2022
1 parent 22f0dcb commit 90cf0db
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 14 deletions.
@@ -0,0 +1,9 @@
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;

public interface InformerExceptionHandler {

void onWatchNonrecoverable(WatcherException e);

}
Expand Up @@ -82,7 +82,8 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
* @param handle the event handler
* @param resyncPeriod the specific resync period
*/
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle, long resyncPeriod);
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle,
long resyncPeriod);

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
Expand Down Expand Up @@ -165,4 +166,6 @@ default boolean hasSynced() {
*/
CompletableFuture<Void> start();

default void setExceptionHandler(InformerExceptionHandler handler) {
}
}
Expand Up @@ -303,11 +303,14 @@ protected void onMessage(String message) {
logger.error("Unknown message received: {}", message);
}
} catch (ClassCastException e) {
logger.error("Received wrong type of object for watch", e);
final String msg = "Received wrong type of object for watch";
close(new WatcherException(msg, e));
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
final String msg = "Invalid event type";
close(new WatcherException(msg, e));
} catch (Exception e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
final String msg = "Unhandled exception encountered in watcher event handler";
close(new WatcherException(msg, e));
}
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne

private Stream<T> initialState;

private InformerExceptionHandler exceptionHandler;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod,
Executor informerExecutor) {
if (resyncPeriod < 0) {
Expand All @@ -87,7 +90,11 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(informerExecutor, description);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(listerWatcher, processorStore);
this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler);
}

public InformerExceptionHandler getExceptionHandler() {
return exceptionHandler;
}

/**
Expand Down Expand Up @@ -289,4 +296,8 @@ public String toString() {
return this.description;
}

@Override
public void setExceptionHandler(InformerExceptionHandler handler) {
this.exceptionHandler = handler;
}
}
Expand Up @@ -22,6 +22,7 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {

Expand All @@ -49,9 +51,14 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile Future<?> reconnectFuture;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, null);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store,
Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher();
this.watcher = new ReflectorWatcher(exceptionHandlerSupplier);
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -119,7 +126,9 @@ public CompletableFuture<Void> listSyncAndWatch() {

private CompletableFuture<L> processList(Set<String> nextKeys, String continueVal) {
CompletableFuture<L> futureResult = listerWatcher
.submitList(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build());
.submitList(
new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal)
.build());

return futureResult.thenCompose(result -> {
result.getItems().forEach(i -> {
Expand Down Expand Up @@ -147,7 +156,8 @@ private synchronized CompletableFuture<Watch> startWatcher(final String latestRe
}
log.debug("Starting watcher for {} at v{}", this, latestResourceVersion);
// there's no need to stop the old watch, that will happen automatically when this call completes
watchFuture = listerWatcher.submitWatch(new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
watchFuture = listerWatcher.submitWatch(
new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
.withTimeoutSeconds(null)
.build(), watcher);
return watchFuture;
Expand All @@ -171,6 +181,13 @@ public boolean isWatching() {

class ReflectorWatcher implements Watcher<T> {

private final Supplier<InformerExceptionHandler> exceptionHandlerSupplier;

ReflectorWatcher(Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.exceptionHandlerSupplier = exceptionHandlerSupplier;
}


@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
Expand All @@ -180,8 +197,9 @@ public void eventReceived(Action action, T resource) {
throw new KubernetesClientException("Unrecognized resource for " + Reflector.this);
}
if (log.isDebugEnabled()) {
log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), resource.getKind(),
resource.getMetadata().getResourceVersion(), Reflector.this);
log.debug("Event received {} {} resourceVersion v{} for {}", action.name(),
resource.getKind(),
resource.getMetadata().getResourceVersion(), Reflector.this);
}
switch (action) {
case ERROR:
Expand Down Expand Up @@ -215,13 +233,18 @@ public void onClose(WatcherException exception) {
// start a whole new list/watch cycle, can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
}
});
restarted = true;
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
running = false; // shouldn't happen, but it means the watch won't restart
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler != null) {
handler.onWatchNonrecoverable(exception);
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}
} finally {
if (!restarted) {
Expand All @@ -240,7 +263,6 @@ public void onClose() {
public boolean reconnecting() {
return true;
}

}

ReflectorWatcher getWatcher() {
Expand All @@ -251,5 +273,4 @@ ReflectorWatcher getWatcher() {
public String toString() {
return listerWatcher.getApiEndpointPath();
}

}

0 comments on commit 90cf0db

Please sign in to comment.