Skip to content

Commit

Permalink
fix #4408: allowing for users to set the exception handling behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Oct 19, 2022
1 parent 00cf3fd commit 5f6a11a
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 73 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.
* Fix #4396: Provide more error context when @Group/@Version annotations are missing
* Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348
* Fix #4408: Allowing informers started via the start() method to have configurable exception / retry handling.
* Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE
* Fix #4470: Added timestamps support for deployment logs and other resources.

Expand Down
@@ -0,0 +1,56 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers;

import com.fasterxml.jackson.core.JacksonException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;

public interface ExceptionHandler {

/**
* Called to determine if the informer should continue to retry after the given exception.
* <p>
* See also {@link #isDeserializationException(Throwable)} that can help determine if the
* problem is a mismatch with the target client class.
*
* @param isStarted true if the informer had already successfully started
* @param t the non-http gone {@link WatcherException} from a
* {@link Watcher#onClose(io.fabric8.kubernetes.client.WatcherException)}
* or throwable from a list/watch call.
* @return true if the informer should continue to retry
*/
boolean retryAfterException(boolean isStarted, Throwable t);

/**
* Check to see if the exception of it's cause is coming from what is likely a
* deserialization issue - either an exception from jackson or a class cast
*
* @param t
* @return
*/
public static boolean isDeserializationException(Throwable t) {
while (!(t instanceof ClassCastException || t instanceof JacksonException)) {
Throwable cause = t.getCause();
if (cause == t || cause == null) {
return false;
}
t = cause;
}
return true;
}
}
Expand Up @@ -23,7 +23,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -125,6 +125,8 @@ default boolean hasSynced() {

/**
* Return true if the informer is running
* <p>
* See also {@link #stopped()}
*/
boolean isRunning();

Expand Down Expand Up @@ -161,19 +163,33 @@ default boolean hasSynced() {
SharedIndexInformer<T> itemStore(ItemStore<T> itemStore);

/**
* A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called.
* A non-blocking alternative to run. Starts the shared informer, which will normally be stopped when {@link #stop()} is
* called.
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
* The stage will be completed normally once the Informer starts watching successfully for the first time.
* <p>
* By default the informer will attempt only a single start attempt. Use {@link #exceptionHandler(ExceptionHandler)} to
* modify this behavior.
*/
CompletionStage<Void> start();

/**
* Sets the {@link ExceptionHandler} for this informer. For example, exceptionHandler((b, t) -&gt; true)), will
* keep retying no matter what the exception is.
* <p>
* May only be called prior to the informer starting
*
* @param handler
*/
CompletableFuture<Void> start();
SharedIndexInformer<T> exceptionHandler(ExceptionHandler handler);

/**
* Return a future that will allow notification of informer stopping.
* Return a {@link CompletionStage} that will allow notification of the informer stopping.
* <p>
* If {@link #stop()} is called, the future will be completed with a null value.
* If {@link #stop()} is called, the CompletionStage will complete normally.
* <p>
* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception
* - typically a {@link WatcherException}
*/
CompletableFuture<Void> stopped();
CompletionStage<Void> stopped();
}
@@ -0,0 +1,35 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers;

import com.fasterxml.jackson.databind.JsonMappingException;
import io.fabric8.kubernetes.client.WatcherException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ExceptionHandlerTest {

@Test
void testDeserializationException() {
assertFalse(ExceptionHandler.isDeserializationException(new NullPointerException()));
assertTrue(
ExceptionHandler.isDeserializationException(new WatcherException("x", new JsonMappingException(() -> {
}, ""))));
}

}
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand Down Expand Up @@ -96,7 +97,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
* @param handler event handler
*/
@Override
public SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler) {
public DefaultSharedIndexInformer<T, L> addEventHandler(ResourceEventHandler<? super T> handler) {
addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod);
return this;
}
Expand Down Expand Up @@ -149,7 +150,7 @@ public CompletableFuture<Void> start() {
}
synchronized (this) {
if (!started.compareAndSet(false, true)) {
return CompletableFuture.completedFuture(null);
return reflector.getStartFuture();
}

if (initialState != null) {
Expand All @@ -161,14 +162,7 @@ public CompletableFuture<Void> start() {

scheduleResync(processor::shouldResync);

return reflector.start().whenComplete((v, t) -> {
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
if (stopped && reflector.isRunning()) {
stop();
}
}
});
return reflector.start();
}

@Override
Expand Down Expand Up @@ -221,7 +215,7 @@ private long determineResyncPeriod(long desired, long check) {

@Override
public boolean isRunning() {
return !stopped && started.get() && reflector.isRunning();
return !stopped && started.get() && !reflector.isStopped();
}

@Override
Expand Down Expand Up @@ -294,4 +288,13 @@ public CompletableFuture<Void> stopped() {
return this.reflector.getStopFuture();
}

@Override
public synchronized DefaultSharedIndexInformer<T, L> exceptionHandler(ExceptionHandler handler) {
if (started.get()) {
throw new KubernetesClientException("Informer cannot be running when handler is set");
}
this.reflector.setExceptionHandler(handler);
return this;
}

}
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -106,11 +107,11 @@ public synchronized <T> SharedIndexInformer<T> getExistingSharedIndexInformer(Cl

@Override
public synchronized Future<Void> startAllRegisteredInformers() {
List<CompletableFuture<Void>> startInformerTasks = new ArrayList<>();
List<CompletionStage<Void>> startInformerTasks = new ArrayList<>();

if (!informers.isEmpty()) {
for (SharedIndexInformer<?> informer : informers) {
CompletableFuture<Void> future = informer.start();
CompletionStage<Void> future = informer.start();
startInformerTasks.add(future);
future.whenComplete((v, t) -> {
if (t != null) {
Expand Down

0 comments on commit 5f6a11a

Please sign in to comment.