Skip to content

Commit

Permalink
fix fabric8io#4408: allowing for users to set the exception handling …
Browse files Browse the repository at this point in the history
…behavior
  • Loading branch information
shawkins committed Oct 10, 2022
1 parent 8e89540 commit 313487c
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 70 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -17,6 +17,7 @@
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.
* Fix #4396: Provide more error context when @Group/@Version annotations are missing
* Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348
* Fix #4408: Allowing informers started via the start() method to have configurable exception / retry handling.
* Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE

#### Dependency Upgrade
Expand Down
@@ -0,0 +1,49 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;

public interface ExceptionHandler {

/**
* Called to determine if the informer should continue to retry after the given exception.
* <p>
* See also {@link #isDeserializationException(Throwable)} that can help determine if the
* problem is a mismatch with the target client class.
*
* @param isStarted true if the informer had already successfully started
* @param t the {@link WatcherException} from a
* {@link Watcher#onClose(io.fabric8.kubernetes.client.WatcherException)}
* or throwable from a list/watch call.
* @return true if the informer should continue to retry
*/
boolean retryAfterException(boolean isStarted, Throwable t);

public static boolean isDeserializationException(Throwable t) {
while (t instanceof RuntimeException && !(t instanceof ClassCastException)) {
Throwable cause = ((RuntimeException) t).getCause();
if (cause == t) {
break;
}
t = cause;
}
return t instanceof ClassCastException || t instanceof JsonProcessingException;
}
}
Expand Up @@ -23,7 +23,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -125,6 +125,8 @@ default boolean hasSynced() {

/**
* Return true if the informer is running
* <p>
* See also {@link #stopped()}
*/
boolean isRunning();

Expand Down Expand Up @@ -161,19 +163,33 @@ default boolean hasSynced() {
SharedIndexInformer<T> itemStore(ItemStore<T> itemStore);

/**
* A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called.
* A non-blocking alternative to run. Starts the shared informer, which will normally be stopped when {@link #stop()} is
* called.
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
* The stage will be completed normally once the Informer starts watching successfully for the first time.
* <p>
* By default the informer will attempt only a single start attempt. Use {@link #exceptionHandler(ExceptionHandler)} to
* modify this behavior.
*/
CompletionStage<Void> start();

/**
* Sets the {@link ExceptionHandler} for this informer. For example, exceptionHandler((b, t) -&gt; true)), will
* keep retying no matter what the exception is.
* <p>
* May only be called prior to the informer starting
*
* @param handler
*/
CompletableFuture<Void> start();
SharedIndexInformer<T> exceptionHandler(ExceptionHandler handler);

/**
* Return a future that will allow notification of informer stopping.
* Return a {@link CompletionStage} that will allow notification of the informer stopping.
* <p>
* If {@link #stop()} is called, the future will be completed with a null value.
* If {@link #stop()} is called, the CompletionStage will complete normally.
* <p>
* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception
* - typically a {@link WatcherException}
*/
CompletableFuture<Void> stopped();
CompletionStage<Void> stopped();
}
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand Down Expand Up @@ -96,7 +97,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
* @param handler event handler
*/
@Override
public SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler) {
public DefaultSharedIndexInformer<T, L> addEventHandler(ResourceEventHandler<? super T> handler) {
addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod);
return this;
}
Expand Down Expand Up @@ -149,7 +150,7 @@ public CompletableFuture<Void> start() {
}
synchronized (this) {
if (!started.compareAndSet(false, true)) {
return CompletableFuture.completedFuture(null);
return reflector.getStartFuture();
}

if (initialState != null) {
Expand All @@ -161,14 +162,7 @@ public CompletableFuture<Void> start() {

scheduleResync(processor::shouldResync);

return reflector.start().whenComplete((v, t) -> {
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
if (stopped && reflector.isRunning()) {
stop();
}
}
});
return reflector.start();
}

@Override
Expand Down Expand Up @@ -221,7 +215,7 @@ private long determineResyncPeriod(long desired, long check) {

@Override
public boolean isRunning() {
return !stopped && started.get() && reflector.isRunning();
return !stopped && started.get() && !reflector.isStopped();
}

@Override
Expand Down Expand Up @@ -294,4 +288,13 @@ public CompletableFuture<Void> stopped() {
return this.reflector.getStopFuture();
}

@Override
public synchronized DefaultSharedIndexInformer<T, L> exceptionHandler(ExceptionHandler handler) {
if (started.get()) {
throw new KubernetesClientException("Informer cannot be running when handler is set");
}
this.reflector.setExceptionHandler(handler);
return this;
}

}
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -106,11 +107,11 @@ public synchronized <T> SharedIndexInformer<T> getExistingSharedIndexInformer(Cl

@Override
public synchronized Future<Void> startAllRegisteredInformers() {
List<CompletableFuture<Void>> startInformerTasks = new ArrayList<>();
List<CompletionStage<Void>> startInformerTasks = new ArrayList<>();

if (!informers.isEmpty()) {
for (SharedIndexInformer<?> informer : informers) {
CompletableFuture<Void> future = informer.start();
CompletionStage<Void> future = informer.start();
startInformerTasks.add(future);
future.whenComplete((v, t) -> {
if (t != null) {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator;
Expand All @@ -44,12 +45,14 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private final ListerWatcher<T, L> listerWatcher;
private final SyncableStore<T> store;
private final ReflectorWatcher watcher;
private volatile boolean running;
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile CompletableFuture<?> reconnectFuture;
private final CompletableFuture<Void> startFuture = new CompletableFuture<>();
private final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
//default behavior - retry if started and it's not a watcherexception
private volatile ExceptionHandler handler = (b, t) -> b && !(t instanceof WatcherException);

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this.listerWatcher = listerWatcher;
Expand All @@ -60,25 +63,17 @@ public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
}

public CompletableFuture<Void> start() {
return start(false); // start without reconnecting
listSyncAndWatch();
return startFuture;
}

public CompletableFuture<Void> start(boolean reconnect) {
this.running = true;
CompletableFuture<Void> result = listSyncAndWatch(reconnect);
if (!reconnect) {
result.whenComplete((v, t) -> {
if (t != null) {
stopFuture.completeExceptionally(t);
}
});
}
return result;
public CompletableFuture<Void> getStartFuture() {
return startFuture;
}

public void stop() {
running = false;
stopFuture.complete(null);
startFuture.completeExceptionally(new KubernetesClientException("informer manually stopped before starting"));
Future<?> future = reconnectFuture;
if (future != null) {
future.cancel(true);
Expand Down Expand Up @@ -106,8 +101,8 @@ private synchronized void stopWatcher() {
*
* @return a future that completes when the list and watch are established
*/
public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
if (!running) {
public CompletableFuture<Void> listSyncAndWatch() {
if (isStopped()) {
return CompletableFuture.completedFuture(null);
}
Set<String> nextKeys = new ConcurrentSkipListSet<>();
Expand All @@ -119,7 +114,7 @@ public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
return startWatcher(latestResourceVersion);
}).thenAccept(w -> {
if (w != null) {
if (running) {
if (!isStopped()) {
if (log.isDebugEnabled()) {
log.debug("Watch started for {}", Reflector.this);
}
Expand All @@ -129,26 +124,34 @@ public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
}
}
});
if (reconnect) {
theFuture.whenComplete((v, t) -> {
if (t != null) {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
reconnect();
} else {
retryIntervalCalculator.resetReconnectAttempts();
}
});
}
theFuture.whenComplete((v, t) -> {
if (t != null) {
onException("listSyncAndWatch", t);
} else {
startFuture.complete(null);
retryIntervalCalculator.resetReconnectAttempts();
}
});
return theFuture;
}

private void onException(String operation, Throwable t) {
if (handler.retryAfterException(startFuture.isDone() && !startFuture.isCompletedExceptionally(), t)) {
log.warn("{} failed for {}, will retry", operation, Reflector.this, t);
reconnect();
} else {
startFuture.completeExceptionally(t);
stopFuture.completeExceptionally(t);
}
}

protected void reconnect() {
if (!running) {
if (isStopped()) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(true),
reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(),
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
}

Expand Down Expand Up @@ -179,7 +182,7 @@ private void stopWatch(Watch w) {
}

private synchronized CompletableFuture<Watch> startWatcher(final String latestResourceVersion) {
if (!running) {
if (isStopped()) {
return CompletableFuture.completedFuture(null);
}
log.debug("Starting watcher for {} at v{}", this, latestResourceVersion);
Expand All @@ -200,8 +203,8 @@ public String getLastSyncResourceVersion() {
return lastSyncResourceVersion;
}

public boolean isRunning() {
return running;
public boolean isStopped() {
return stopFuture.isDone();
}

public boolean isWatching() {
Expand Down Expand Up @@ -251,9 +254,7 @@ public void onClose(WatcherException exception) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
stopFuture.completeExceptionally(exception);
log.warn("Watch closing with exception for {}", Reflector.this, exception);
onException("watch", exception);
}
}

Expand Down Expand Up @@ -282,4 +283,8 @@ public CompletableFuture<Void> getStopFuture() {
return stopFuture;
}

public void setExceptionHandler(ExceptionHandler handler) {
this.handler = handler;
}

}
Expand Up @@ -17,11 +17,15 @@

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.client.Watch;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -45,6 +49,15 @@ private DefaultSharedIndexInformer<Pod, PodList> createDefaultSharedIndexInforme
return defaultSharedIndexInformer;
}

@BeforeEach
void beforeEach() {
Mockito.when(listerWatcher.submitWatch(Mockito.any(), Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class)));
PodList result = new PodListBuilder().withNewMetadata().endMetadata().build();
Mockito.when(listerWatcher.submitList(Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(result));
}

@AfterEach
void afterEach() {
if (defaultSharedIndexInformer != null) {
Expand Down

0 comments on commit 313487c

Please sign in to comment.