Skip to content

Commit

Permalink
just logging resourceeventhandler exceptions
Browse files Browse the repository at this point in the history
also stopping messages when a watch closes
  • Loading branch information
shawkins committed Sep 8, 2022
1 parent 64905e1 commit e7eb012
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 13 deletions.
Expand Up @@ -189,7 +189,13 @@ void eventReceived(Watcher.Action action, HasMetadata resource) {
}
@SuppressWarnings("unchecked")
final T t = (T) resource;
watcher.eventReceived(action, t);
try {
watcher.eventReceived(action, t);
} catch (Exception e) {
// for compatibility, this will just log the exception as was done in previous versions
// a case could be made for this to terminate the watch instead
logger.error("Unhandled exception encountered in watcher event handler", e);
}
}

void updateResourceVersion(final String newResourceVersion) {
Expand Down Expand Up @@ -311,7 +317,7 @@ protected void onMessage(String message) {
final String msg = "Couldn't deserialize watch event: " + message;
close(new WatcherException(msg, e, message));
} catch (Exception e) {
final String msg = "Unhandled exception encountered in watcher event handler";
final String msg = "Unexpected exception processing watch event";
close(new WatcherException(msg, e, message));
}
}
Expand Down
Expand Up @@ -33,6 +33,7 @@
import java.net.URI;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

Expand All @@ -50,7 +51,7 @@ public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesR
private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class);

protected WatcherWebSocketListener<T> listener;
private CompletableFuture<WebSocket> websocketFuture;
private volatile CompletableFuture<WebSocket> websocketFuture;
private WebSocket websocket;

private volatile boolean ready;
Expand Down Expand Up @@ -87,14 +88,14 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation<T, L,
@Override
protected synchronized void closeRequest() {
closeWebSocket(websocket);
if (this.websocketFuture != null) {
this.websocketFuture.whenComplete((w, t) -> {
Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> {
this.websocketFuture = null;
theFuture.whenComplete((w, t) -> {
if (w != null) {
closeWebSocket(w);
}
});
websocketFuture = null;
}
});
}

synchronized WatcherWebSocketListener<T> getListener() {
Expand All @@ -105,6 +106,14 @@ public CompletableFuture<WebSocket> getWebsocketFuture() {
return websocketFuture;
}

@Override
protected void onMessage(String message) {
// for consistency we only want to process the message when we're open
if (this.websocketFuture != null) {
super.onMessage(message);
}
}

@Override
protected void start(URL url, Map<String, String> headers) {
this.listener = new WatcherWebSocketListener<>(this);
Expand Down
Expand Up @@ -29,12 +29,14 @@
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> extends AbstractWatchManager<T> {
private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
private CompletableFuture<HttpResponse<AsyncBody>> call;
private volatile AsyncBody body;

public WatchHTTPManager(final HttpClient client,
final BaseOperation<T, L, ?> baseOperation,
Expand Down Expand Up @@ -74,7 +76,7 @@ protected synchronized void start(URL url, Map<String, String> headers) {
scheduleReconnect();
}
if (response != null) {
AsyncBody body = response.body();
body = response.body();
if (!response.isSuccessful()) {
body.cancel();
if (onStatus(OperationSupport.createStatus(response.code(), response.message()))) {
Expand All @@ -101,9 +103,12 @@ protected synchronized void start(URL url, Map<String, String> headers) {

@Override
protected synchronized void closeRequest() {
if (call != null) {
call.cancel(true);
Optional.ofNullable(call).ifPresent(theFuture -> {
call = null;
}
theFuture.cancel(true);
});
Optional.ofNullable(body).ifPresent(theBody -> {
theBody.cancel();
});
}
}
Expand Up @@ -57,7 +57,6 @@ public void onMessage(WebSocket webSocket, String text) {
} finally {
webSocket.request();
}
webSocket.request();
}

@Override
Expand Down
Expand Up @@ -444,7 +444,7 @@ void testWaitUntilCondition() throws InterruptedException {
}

@Test
void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
Pod pod1 = new PodBuilder().withNewMetadata()
.withName("pod1")
.withResourceVersion("1")
Expand All @@ -470,6 +470,15 @@ void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
.open()
.waitFor(500)
.andEmit(new WatchEvent(status, "ERROR"))
.done()
.once();

server.expect()
.get()
.withPath(
"/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
.andUpgradeToWebSocket()
.open()
.waitFor(500)
.andEmit(new WatchEvent(ready, "MODIFIED"))
.done()
Expand Down

0 comments on commit e7eb012

Please sign in to comment.