Skip to content

Commit

Permalink
feat: pass raw message to WatcherException, easier JSON error detection
Browse files Browse the repository at this point in the history
  • Loading branch information
metacosm committed Sep 7, 2022
1 parent 424ef82 commit 2c8b5fa
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,23 @@
package io.fabric8.kubernetes.client;

import java.net.HttpURLConnection;
import java.util.Optional;

public class WatcherException extends Exception {
private final String rawWatchMessage;

public WatcherException(String message, Throwable cause) {
super(message, cause);
this(message, cause, null);
}

public WatcherException(String message) {
super(message);
rawWatchMessage = null;
}

public WatcherException(String message, Throwable cause, String rawWatchMessage) {
super(message, cause);
this.rawWatchMessage = rawWatchMessage;
}

public KubernetesClientException asClientException() {
Expand All @@ -39,4 +47,8 @@ public boolean isHttpGone() {
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
}

@SuppressWarnings("unused")
public Optional<String> getRawWatchMessage() {
return Optional.ofNullable(rawWatchMessage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public abstract class AbstractWatchManager<T extends HasMetadata> implements Wat

protected final HttpClient client;
protected BaseOperation<T, ?, ?> baseOperation;
private ListOptions listOptions;
private URL requestUrl;
private final ListOptions listOptions;
private final URL requestUrl;

private final boolean receiveBookmarks;

Expand Down Expand Up @@ -187,7 +187,9 @@ void eventReceived(Watcher.Action action, HasMetadata resource) {
if (resource != null && !baseOperation.getType().isAssignableFrom(resource.getClass())) {
resource = Serialization.jsonMapper().convertValue(resource, baseOperation.getType());
}
watcher.eventReceived(action, (T) resource);
@SuppressWarnings("unchecked")
final T t = (T) resource;
watcher.eventReceived(action, t);
}

void updateResourceVersion(final String newResourceVersion) {
Expand Down Expand Up @@ -223,29 +225,26 @@ public void close() {
cancelReconnect();
}

private WatchEvent contextAwareWatchEventDeserializer(String messageSource) {
private WatchEvent contextAwareWatchEventDeserializer(String messageSource)
throws JsonProcessingException {
try {
return Serialization.unmarshal(messageSource, WatchEvent.class);
} catch (Exception ex1) {
try {
JsonNode json = Serialization.jsonMapper().readTree(messageSource);
JsonNode objectJson = null;
if (json instanceof ObjectNode && json.has("object")) {
objectJson = ((ObjectNode) json).remove("object");
}
JsonNode json = Serialization.jsonMapper().readTree(messageSource);
JsonNode objectJson = null;
if (json instanceof ObjectNode && json.has("object")) {
objectJson = ((ObjectNode) json).remove("object");
}

WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class);
KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType());
WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class);
KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType());

watchEvent.setObject(object);
return watchEvent;
} catch (JsonProcessingException ex2) {
throw new IllegalArgumentException("Failed to deserialize WatchEvent", ex2);
}
watchEvent.setObject(object);
return watchEvent;
}
}

protected WatchEvent readWatchEvent(String messageSource) {
protected WatchEvent readWatchEvent(String messageSource) throws JsonProcessingException {
WatchEvent event = contextAwareWatchEventDeserializer(messageSource);
KubernetesResource object = null;
if (event != null) {
Expand Down Expand Up @@ -277,32 +276,34 @@ protected void onMessage(String message) {
Status status = (Status) object;

onStatus(status);
} else if (object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
KubernetesResourceList list = (KubernetesResourceList) object;
updateResourceVersion(list.getMetadata().getResourceVersion());
} else if (object instanceof HasMetadata) {
HasMetadata hasMetadata = (HasMetadata) object;
updateResourceVersion(hasMetadata.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, item);

if(object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
@SuppressWarnings({"rawtypes"})
KubernetesResourceList list = (KubernetesResourceList) hasMetadata;
@SuppressWarnings("unchecked")
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, item);
}
}
} else {
eventReceived(action, hasMetadata);
}
} else if (object instanceof HasMetadata) {
@SuppressWarnings("unchecked")
T obj = (T) object;
updateResourceVersion(obj.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());
eventReceived(action, obj);
} else {
logger.error("Unknown message received: {}", message);
}
} catch (ClassCastException e) {
final String msg = "Received wrong type of object for watch";
close(new WatcherException(msg, e));
} catch (IllegalArgumentException e) {
final String msg = "Invalid event type";
close(new WatcherException(msg, e));
} catch (JsonProcessingException e) {
final String msg = "Couldn't deserialize watch event: " + message;
close(new WatcherException(msg, e, message));
} catch (Exception e) {
final String msg = "Unhandled exception encountered in watcher event handler";
close(new WatcherException(msg, e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile CompletableFuture<?> reconnectFuture;
private volatile CompletableFuture<Void> stopFuture = new CompletableFuture<Void>();
private final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
Expand Down

0 comments on commit 2c8b5fa

Please sign in to comment.