Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4408: allowing for users to set the exception handling behavior #4488

Merged
merged 3 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
* Fix #3733: The authentication command from the .kube/config won't be discarded if no arguments are specified
* Fix #4312: fix timestamp can't be deserialized for IstioCondition
* Fix #4369: Informers will retry with a backoff on list/watch failure as they did in 5.12 and prior.
* Fix #4426: [java-generator] Encode an `AnyType` instead of an Object if `x-kubernetes-preserve-unknown-fields` is present and the type is null.
* Fix #4350: SchemaSwap annotation is now repeatable and is applied multiple times if classes are used more than once in the class hierarchy.
* Fix #3733: The authentication command from the .kube/config won't be discarded if no arguments are specified
* Fix #4441: corrected patch base handling for the patch methods available from a Resource - resource(item).patch() will be evaluated as resource(latest).patch(item). Also undeprecated patch(item), which is consistent with leaving patch(context, item) undeprecated as well. For consistency with the other operations (such as edit), patch(item) will use the context item as the base when available, or the server side item when not. This means that patch(item) is only the same as resource(item).patch() when the patch(item) is called when the context item is missing or is the same as the latest.
* Fix #4442: TokenRefreshInterceptor doesn't overwrite existing OAuth token with empty string
* Fix #4350: SchemaSwap annotation is now repeatable and is applied multiple times if classes are used more than once in the class hierarchy.
Expand All @@ -24,6 +25,7 @@
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.
* Fix #4396: Provide more error context when @Group/@Version annotations are missing
* Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348
* Fix #4408: Allowing informers started via the start() method to have configurable exception / retry handling.
* Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE

#### Dependency Upgrade
Expand All @@ -44,6 +46,7 @@
* Fix #3924: Extension Mock modules have been removed
* Fix #4384: javax.validation.* annotations are no longer added by the Java generator.
* Fix #3906: removed BaseKubernetesList, use KubernetesList instead
* Fix #4408: deprecated SharedInformerFactory.addSharedInformerEventListener, instead use the SharedIndexInformer.stopped method. Also the signature of SharedIndexInformer.start was changed to a CompletionStage rather than a CompletableFuture.

### 5.12.4 (2022-09-30)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers;

import com.fasterxml.jackson.core.JacksonException;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;

public interface ExceptionHandler {

/**
* Called to determine if the informer should continue to retry after the given exception.
* <p>
* See also {@link #isDeserializationException(Throwable)} that can help determine if the
* problem is a mismatch with the target client class.
*
* @param isStarted true if the informer had already successfully started
* @param t the non-http gone {@link WatcherException} from a
* {@link Watcher#onClose(io.fabric8.kubernetes.client.WatcherException)}
* or throwable from a list/watch call.
* @return true if the informer should continue to retry
*/
boolean retryAfterException(boolean isStarted, Throwable t);

/**
* Check to see if the exception of it's cause is coming from what is likely a
* deserialization issue - either an exception from jackson or a class cast
*
* @param t
* @return
*/
public static boolean isDeserializationException(Throwable t) {
while (!(t instanceof ClassCastException || t instanceof JacksonException)) {
Throwable cause = t.getCause();
if (cause == t || cause == null) {
return false;
}
t = cause;
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Stream;
Expand Down Expand Up @@ -125,6 +125,8 @@ default boolean hasSynced() {

/**
* Return true if the informer is running
* <p>
* See also {@link #stopped()}
*/
boolean isRunning();

Expand Down Expand Up @@ -161,19 +163,33 @@ default boolean hasSynced() {
SharedIndexInformer<T> itemStore(ItemStore<T> itemStore);

/**
* A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called.
* A non-blocking alternative to run. Starts the shared informer, which will normally be stopped when {@link #stop()} is
* called.
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
* The stage will be completed normally once the Informer starts watching successfully for the first time.
* <p>
* By default the informer will attempt only a single start attempt. Use {@link #exceptionHandler(ExceptionHandler)} to
* modify this behavior.
*/
CompletionStage<Void> start();

/**
* Sets the {@link ExceptionHandler} for this informer. For example, exceptionHandler((b, t) -&gt; true)), will
* keep retrying no matter what the exception is.
* <p>
* May only be called prior to the informer starting
*
* @param handler
*/
CompletableFuture<Void> start();
SharedIndexInformer<T> exceptionHandler(ExceptionHandler handler);

/**
* Return a future that will allow notification of informer stopping.
* Return a {@link CompletionStage} that will allow notification of the informer stopping.
* <p>
* If {@link #stop()} is called, the future will be completed with a null value.
* If {@link #stop()} is called, the CompletionStage will complete normally.
* <p>
* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception
* - typically a {@link WatcherException}
*/
CompletableFuture<Void> stopped();
CompletionStage<Void> stopped();
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ <T extends HasMetadata> SharedIndexInformer<T> sharedIndexInformerFor(Class<T> a
*/
void stopAllRegisteredInformers();

/**
* @deprecated use {@link SharedIndexInformer#stopped()} method to get notified when an informer stops.
*/
@Deprecated
void addSharedInformerEventListener(SharedInformerEventListener event);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers;

import com.fasterxml.jackson.databind.JsonMappingException;
import io.fabric8.kubernetes.client.WatcherException;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class ExceptionHandlerTest {

@Test
void testDeserializationException() {
assertFalse(ExceptionHandler.isDeserializationException(new NullPointerException()));
assertTrue(
ExceptionHandler.isDeserializationException(new WatcherException("x", new JsonMappingException(() -> {
}, ""))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand Down Expand Up @@ -96,7 +97,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
* @param handler event handler
*/
@Override
public SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler) {
public DefaultSharedIndexInformer<T, L> addEventHandler(ResourceEventHandler<? super T> handler) {
addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod);
return this;
}
Expand Down Expand Up @@ -149,7 +150,7 @@ public CompletableFuture<Void> start() {
}
synchronized (this) {
if (!started.compareAndSet(false, true)) {
return CompletableFuture.completedFuture(null);
return reflector.getStartFuture();
}

if (initialState != null) {
Expand All @@ -161,14 +162,7 @@ public CompletableFuture<Void> start() {

scheduleResync(processor::shouldResync);

return reflector.start().whenComplete((v, t) -> {
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
if (stopped && reflector.isRunning()) {
stop();
}
}
});
return reflector.start();
}

@Override
Expand Down Expand Up @@ -221,7 +215,7 @@ private long determineResyncPeriod(long desired, long check) {

@Override
public boolean isRunning() {
return !stopped && started.get() && reflector.isRunning();
return !stopped && started.get() && !reflector.isStopped();
}

@Override
Expand Down Expand Up @@ -294,4 +288,13 @@ public CompletableFuture<Void> stopped() {
return this.reflector.getStopFuture();
}

@Override
public synchronized DefaultSharedIndexInformer<T, L> exceptionHandler(ExceptionHandler handler) {
if (started.get()) {
throw new KubernetesClientException("Informer cannot be running when handler is set");
}
this.reflector.setExceptionHandler(handler);
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Future;

Expand All @@ -52,7 +53,7 @@ public class SharedInformerFactoryImpl implements SharedInformerFactory {
private String name;
private String namespace;

private KubernetesClient client;
private final KubernetesClient client;

public SharedInformerFactoryImpl(KubernetesClient client) {
this.client = client;
Expand Down Expand Up @@ -110,12 +111,12 @@ public synchronized Future<Void> startAllRegisteredInformers() {

if (!informers.isEmpty()) {
for (SharedIndexInformer<?> informer : informers) {
CompletableFuture<Void> future = informer.start();
CompletableFuture<Void> future = informer.start().toCompletableFuture();
startInformerTasks.add(future);
future.whenComplete((v, t) -> {
if (t != null) {
if (this.eventListeners.isEmpty()) {
log.warn("Failed to start informer {}", informer.toString(), t);
log.warn("Failed to start informer {}", informer, t);
} else {
this.eventListeners
.forEach(listener -> listener.onException(informer, KubernetesClientException.launderThrowable(t)));
Expand Down