Skip to content

Commit

Permalink
feat: add possibility to add an exception handler to Watchers (#4365)
Browse files Browse the repository at this point in the history
* feat: add possibility to add an exception handler to Informers

* fix #4369: allowing the handler to determine if the stop or retry

* fix #4369: refining the exception handler and adding back backoff

* fix #4369: narrowing the changes to only a stopped future

* feat: pass raw message to WatcherException, easier JSON error detection

* fix #4369: refining the watch and informer reconnect and adding tests

* cleaning up how the watch is stopped

* just logging resourceeventhandler exceptions

also stopping messages when a watch closes

Co-authored-by: Steve Hawkins <shawkins@redhat.com>
  • Loading branch information
metacosm and shawkins committed Sep 8, 2022
1 parent 28761e0 commit 951a7f5
Show file tree
Hide file tree
Showing 16 changed files with 295 additions and 145 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Expand Up @@ -3,10 +3,12 @@
### 6.2-SNAPSHOT

#### Bugs
* Fix #4369: Informers will retry with a backoff on list/watch failure as they did in 5.12 and prior.
* Fix #4350: SchemaSwap annotation is now repeatable and is applied multiple times if classes are used more than once in the class hierarchy.

#### Improvements
* Fix #4348: Introduce specific annotations for the generators
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.

#### Dependency Upgrade
* Fix #4243: Update Tekton pipeline model to v0.39.0
Expand Down
Expand Up @@ -16,27 +16,39 @@
package io.fabric8.kubernetes.client;

import java.net.HttpURLConnection;
import java.util.Optional;

public class WatcherException extends Exception {
private final String rawWatchMessage;

public WatcherException(String message, Throwable cause) {
super(message, cause);
this(message, cause, null);
}

public WatcherException(String message) {
super(message);
rawWatchMessage = null;
}

public WatcherException(String message, Throwable cause, String rawWatchMessage) {
super(message, cause);
this.rawWatchMessage = rawWatchMessage;
}

public KubernetesClientException asClientException() {
final Throwable cause = getCause();
return cause instanceof KubernetesClientException ?
(KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause);
return cause instanceof KubernetesClientException ? (KubernetesClientException) cause
: new KubernetesClientException(getMessage(), cause);
}

public boolean isHttpGone() {
final KubernetesClientException cause = asClientException();
return cause.getCode() == HttpURLConnection.HTTP_GONE
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
return cause != null && (cause.getCode() == HttpURLConnection.HTTP_GONE
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE));
}

@SuppressWarnings("unused")
public Optional<String> getRawWatchMessage() {
return Optional.ofNullable(rawWatchMessage);
}
}
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
import io.fabric8.kubernetes.client.informers.cache.ItemStore;
Expand Down Expand Up @@ -82,7 +83,8 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
* @param handle the event handler
* @param resyncPeriod the specific resync period
*/
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle, long resyncPeriod);
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle,
long resyncPeriod);

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
Expand Down Expand Up @@ -165,4 +167,13 @@ default boolean hasSynced() {
*/
CompletableFuture<Void> start();

/**
* Return a future that will allow notification of informer stopping.
* <p>
* If {@link #stop()} is called, the future will be completed with a null value.
* <p>
* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception
* - typically a {@link WatcherException}
*/
CompletableFuture<Void> stopped();
}
@@ -0,0 +1,41 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client;

import io.fabric8.kubernetes.api.model.Status;
import org.junit.jupiter.api.Test;

import java.net.HttpURLConnection;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class WatcherExceptionTest {

@Test
void testIsHttpGone() {
WatcherException we = new WatcherException("I've failed");
assertFalse(we.isHttpGone());

we = new WatcherException("I've failed", new ClassCastException());
assertFalse(we.isHttpGone());

we = new WatcherException("I've failed",
new KubernetesClientException("http gone", HttpURLConnection.HTTP_GONE, new Status()));
assertTrue(we.isHttpGone());
}

}
Expand Up @@ -44,7 +44,6 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

Expand All @@ -60,13 +59,12 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
final AtomicBoolean forceClosed;
private final int reconnectLimit;
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
final AtomicInteger currentReconnectAttempt;
private Future<?> reconnectAttempt;

protected final HttpClient client;
protected BaseOperation<T, ?, ?> baseOperation;
private ListOptions listOptions;
private URL requestUrl;
private final ListOptions listOptions;
private final URL requestUrl;

private final boolean receiveBookmarks;

Expand All @@ -77,7 +75,6 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat
this.reconnectLimit = reconnectLimit;
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent);
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks());
// opt into bookmarks by default
Expand Down Expand Up @@ -164,18 +161,16 @@ synchronized void reconnect() {
}

final boolean cannotReconnect() {
return !watcher.reconnecting() && currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0;
return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit
&& reconnectLimit >= 0;
}

final long nextReconnectInterval() {
int exponentOfTwo = currentReconnectAttempt.getAndIncrement();
long ret = retryIntervalCalculator.getInterval(exponentOfTwo);
logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo);
return ret;
return retryIntervalCalculator.nextReconnectInterval();
}

void resetReconnectAttempts() {
currentReconnectAttempt.set(0);
retryIntervalCalculator.resetReconnectAttempts();
}

boolean isForceClosed() {
Expand All @@ -192,7 +187,15 @@ void eventReceived(Watcher.Action action, HasMetadata resource) {
if (resource != null && !baseOperation.getType().isAssignableFrom(resource.getClass())) {
resource = Serialization.jsonMapper().convertValue(resource, baseOperation.getType());
}
watcher.eventReceived(action, (T) resource);
@SuppressWarnings("unchecked")
final T t = (T) resource;
try {
watcher.eventReceived(action, t);
} catch (Exception e) {
// for compatibility, this will just log the exception as was done in previous versions
// a case could be made for this to terminate the watch instead
logger.error("Unhandled exception encountered in watcher event handler", e);
}
}

void updateResourceVersion(final String newResourceVersion) {
Expand Down Expand Up @@ -228,29 +231,26 @@ public void close() {
cancelReconnect();
}

private WatchEvent contextAwareWatchEventDeserializer(String messageSource) {
private WatchEvent contextAwareWatchEventDeserializer(String messageSource)
throws JsonProcessingException {
try {
return Serialization.unmarshal(messageSource, WatchEvent.class);
} catch (Exception ex1) {
try {
JsonNode json = Serialization.jsonMapper().readTree(messageSource);
JsonNode objectJson = null;
if (json instanceof ObjectNode && json.has("object")) {
objectJson = ((ObjectNode) json).remove("object");
}
JsonNode json = Serialization.jsonMapper().readTree(messageSource);
JsonNode objectJson = null;
if (json instanceof ObjectNode && json.has("object")) {
objectJson = ((ObjectNode) json).remove("object");
}

WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class);
KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType());
WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class);
KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType());

watchEvent.setObject(object);
return watchEvent;
} catch (JsonProcessingException ex2) {
throw new IllegalArgumentException("Failed to deserialize WatchEvent", ex2);
}
watchEvent.setObject(object);
return watchEvent;
}
}

protected WatchEvent readWatchEvent(String messageSource) {
protected WatchEvent readWatchEvent(String messageSource) throws JsonProcessingException {
WatchEvent event = contextAwareWatchEventDeserializer(messageSource);
KubernetesResource object = null;
if (event != null) {
Expand Down Expand Up @@ -278,36 +278,47 @@ protected void onMessage(String message) {
try {
WatchEvent event = readWatchEvent(message);
Object object = event.getObject();
if (object instanceof Status) {
Status status = (Status) object;

onStatus(status);
} else if (object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
KubernetesResourceList list = (KubernetesResourceList) object;
updateResourceVersion(list.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, item);
}
Action action = Action.valueOf(event.getType());
if (action == Action.ERROR) {
if (object instanceof Status) {
Status status = (Status) object;

onStatus(status);
} else {
logger.error("Error received, but object is not a status - will retry");
closeRequest();
}
} else if (object instanceof HasMetadata) {
@SuppressWarnings("unchecked")
T obj = (T) object;
updateResourceVersion(obj.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
eventReceived(action, obj);
HasMetadata hasMetadata = (HasMetadata) object;
updateResourceVersion(hasMetadata.getMetadata().getResourceVersion());

if (object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
@SuppressWarnings({ "rawtypes" })
KubernetesResourceList list = (KubernetesResourceList) hasMetadata;
@SuppressWarnings("unchecked")
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, item);
}
}
} else {
eventReceived(action, hasMetadata);
}
} else {
logger.error("Unknown message received: {}", message);
final String msg = String.format("Invalid object received: %s", message);
close(new WatcherException(msg, null, message));
}
} catch (ClassCastException e) {
logger.error("Received wrong type of object for watch", e);
} catch (IllegalArgumentException e) {
logger.error("Invalid event type", e);
final String msg = "Received wrong type of object for watch";
close(new WatcherException(msg, e, message));
} catch (JsonProcessingException e) {
final String msg = "Couldn't deserialize watch event: " + message;
close(new WatcherException(msg, e, message));
} catch (Exception e) {
logger.error("Unhandled exception encountered in watcher event handler", e);
final String msg = "Unexpected exception processing watch event";
close(new WatcherException(msg, e, message));
}
}

Expand All @@ -318,8 +329,8 @@ protected boolean onStatus(Status status) {
return true;
}

eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status);
logger.error("Error received: {}, will retry", status);
closeRequest();
return false;
}

Expand Down
Expand Up @@ -60,7 +60,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class OperationSupport {

Expand All @@ -83,7 +82,6 @@ public class OperationSupport {
protected String apiGroupName;
protected String apiGroupVersion;
protected boolean dryRun;
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;
private final int requestRetryBackoffLimit;

public OperationSupport(Client client) {
Expand All @@ -107,16 +105,11 @@ public OperationSupport(OperationContext ctx) {
this.apiGroupVersion = "v1";
}

final int requestRetryBackoffInterval;
if (ctx.getConfig() != null) {
requestRetryBackoffInterval = ctx.getConfig().getRequestRetryBackoffInterval();
this.requestRetryBackoffLimit = ctx.getConfig().getRequestRetryBackoffLimit();
} else {
requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL;
this.requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT;
}
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval,
MAX_RETRY_INTERVAL_EXPONENT);
}

public String getAPIGroupName() {
Expand Down Expand Up @@ -574,7 +567,9 @@ protected <T> CompletableFuture<T> handleResponse(HttpClient client, HttpRequest
VersionUsageUtils.log(this.resourceT, this.apiGroupVersion);
HttpRequest request = requestBuilder.build();
CompletableFuture<HttpResponse<byte[]>> futureResponse = new CompletableFuture<>();
retryWithExponentialBackoff(futureResponse, new AtomicInteger(), Utils.getNonNullOrElse(client, httpClient), request);
retryWithExponentialBackoff(futureResponse,
new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT),
Utils.getNonNullOrElse(client, httpClient), request);

return futureResponse.thenApply(response -> {
try {
Expand All @@ -593,13 +588,13 @@ protected <T> CompletableFuture<T> handleResponse(HttpClient client, HttpRequest
}

protected void retryWithExponentialBackoff(CompletableFuture<HttpResponse<byte[]>> result,
AtomicInteger numRetries,
ExponentialBackoffIntervalCalculator retryIntervalCalculator,
HttpClient client, HttpRequest request) {
client.sendAsync(request, byte[].class)
.whenComplete((response, throwable) -> {
int retries = numRetries.getAndIncrement();
int retries = retryIntervalCalculator.getCurrentReconnectAttempt();
if (retries < requestRetryBackoffLimit) {
long retryInterval = retryIntervalCalculator.getInterval(retries);
long retryInterval = retryIntervalCalculator.nextReconnectInterval();
boolean retry = false;
if (response != null && response.code() >= 500) {
LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis",
Expand All @@ -612,7 +607,8 @@ protected void retryWithExponentialBackoff(CompletableFuture<HttpResponse<byte[]
}
if (retry) {
Utils.schedule(context.getExecutor(),
() -> retryWithExponentialBackoff(result, numRetries, client, request), retryInterval, TimeUnit.MILLISECONDS);
() -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval,
TimeUnit.MILLISECONDS);
return;
}
}
Expand Down

0 comments on commit 951a7f5

Please sign in to comment.