Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add possibility to add an exception handler to Watchers #4365

Merged
merged 10 commits into from Sep 8, 2022
@@ -0,0 +1,32 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;

public interface InformerExceptionHandler {

/**
* Provides a callback when the informer could terminated with a non-recoverable exception.
*
* @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an
* exception from calling list or watch
* @return true if the informer should retry, false if it should stop
*/
boolean retry(Throwable t);
shawkins marked this conversation as resolved.
Show resolved Hide resolved

}
Expand Up @@ -82,7 +82,8 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
* @param handle the event handler
* @param resyncPeriod the specific resync period
*/
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle, long resyncPeriod);
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle,
long resyncPeriod);

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
Expand Down Expand Up @@ -165,4 +166,6 @@ default boolean hasSynced() {
*/
CompletableFuture<Void> start();

default void setExceptionHandler(InformerExceptionHandler handler) {
}
}
Expand Up @@ -303,11 +303,14 @@ protected void onMessage(String message) {
logger.error("Unknown message received: {}", message);
}
} catch (ClassCastException e) {
logger.error("Received wrong type of object for watch", e);
final String msg = "Received wrong type of object for watch";
close(new WatcherException(msg, e));
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
final String msg = "Invalid event type";
close(new WatcherException(msg, e));
} catch (Exception e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
final String msg = "Unhandled exception encountered in watcher event handler";
close(new WatcherException(msg, e));
}
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand Down Expand Up @@ -72,6 +73,8 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne

private Stream<T> initialState;

private InformerExceptionHandler exceptionHandler;

public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod,
Executor informerExecutor) {
if (resyncPeriod < 0) {
Expand All @@ -87,7 +90,11 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
this.processor = new SharedProcessor<>(informerExecutor, description);

processorStore = new ProcessorStore<>(this.indexer, this.processor);
this.reflector = new Reflector<>(listerWatcher, processorStore);
this.reflector = new Reflector<>(listerWatcher, processorStore, this::getExceptionHandler);
}

public InformerExceptionHandler getExceptionHandler() {
return exceptionHandler;
}

/**
Expand Down Expand Up @@ -289,4 +296,8 @@ public String toString() {
return this.description;
}

@Override
public void setExceptionHandler(InformerExceptionHandler handler) {
this.exceptionHandler = handler;
}
}
Expand Up @@ -22,6 +22,7 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import org.slf4j.Logger;
Expand All @@ -34,6 +35,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T>> {

Expand All @@ -46,12 +48,17 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean running;
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile Future<?> reconnectFuture;
private volatile CompletableFuture<?> reconnectFuture;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, null);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store,
Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher();
this.watcher = new ReflectorWatcher(exceptionHandlerSupplier);
}

public CompletableFuture<Void> start() {
Expand Down Expand Up @@ -119,7 +126,9 @@ public CompletableFuture<Void> listSyncAndWatch() {

private CompletableFuture<L> processList(Set<String> nextKeys, String continueVal) {
CompletableFuture<L> futureResult = listerWatcher
.submitList(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build());
.submitList(
new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal)
.build());

return futureResult.thenCompose(result -> {
result.getItems().forEach(i -> {
Expand Down Expand Up @@ -147,9 +156,11 @@ private synchronized CompletableFuture<Watch> startWatcher(final String latestRe
}
log.debug("Starting watcher for {} at v{}", this, latestResourceVersion);
// there's no need to stop the old watch, that will happen automatically when this call completes
watchFuture = listerWatcher.submitWatch(new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
.withTimeoutSeconds(null)
.build(), watcher);
watchFuture = listerWatcher.submitWatch(
new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
.withTimeoutSeconds(null)
.build(),
watcher);
return watchFuture;
}

Expand All @@ -171,6 +182,12 @@ public boolean isWatching() {

class ReflectorWatcher implements Watcher<T> {

private final Supplier<InformerExceptionHandler> exceptionHandlerSupplier;

ReflectorWatcher(Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
shawkins marked this conversation as resolved.
Show resolved Hide resolved
this.exceptionHandlerSupplier = exceptionHandlerSupplier;
}

@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
Expand All @@ -180,7 +197,8 @@ public void eventReceived(Action action, T resource) {
throw new KubernetesClientException("Unrecognized resource for " + Reflector.this);
}
if (log.isDebugEnabled()) {
log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), resource.getKind(),
log.debug("Event received {} {} resourceVersion v{} for {}", action.name(),
resource.getKind(),
resource.getMetadata().getResourceVersion(), Reflector.this);
}
switch (action) {
Expand All @@ -203,33 +221,43 @@ public void eventReceived(Action action, T resource) {
public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
boolean restarted = false;
try {
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
listSyncAndWatch().whenComplete((v, t) -> {
if (t != null) {
watchStopped();
// start a whole new list/watch cycle, can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
}
});
restarted = true;
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
running = false; // shouldn't happen, but it means the watch won't restart
watchStopped();
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
boolean reconnect = false;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
} finally {
if (!restarted) {
watchStopped(); // report the watch as stopped after a problem
reconnect = true;
} else if (handler != null) {
reconnect = handler.retry(exception.getCause());
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
if (handler == null) {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}
}

private void reconnect() {
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler == null || handler.retry(t)) {
reconnect();
}
}
});
}

@Override
public void onClose() {
watchStopped();
Expand All @@ -240,7 +268,6 @@ public void onClose() {
public boolean reconnecting() {
return true;
}

}

ReflectorWatcher getWatcher() {
Expand All @@ -251,5 +278,4 @@ ReflectorWatcher getWatcher() {
public String toString() {
return listerWatcher.getApiEndpointPath();
}

}