From e7eb0121a4fcc210ba69155a68db7086b98cd230 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 7 Sep 2022 21:29:30 -0400 Subject: [PATCH] just logging resourceeventhandler exceptions also stopping messages when a watch closes --- .../dsl/internal/AbstractWatchManager.java | 10 ++++++++-- .../dsl/internal/WatchConnectionManager.java | 19 ++++++++++++++----- .../client/dsl/internal/WatchHTTPManager.java | 13 +++++++++---- .../internal/WatcherWebSocketListener.java | 1 - .../kubernetes/client/mock/ResourceTest.java | 11 ++++++++++- 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index e3c9a5454f5..0500300f42b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -189,7 +189,13 @@ void eventReceived(Watcher.Action action, HasMetadata resource) { } @SuppressWarnings("unchecked") final T t = (T) resource; - watcher.eventReceived(action, t); + try { + watcher.eventReceived(action, t); + } catch (Exception e) { + // for compatibility, this will just log the exception as was done in previous versions + // a case could be made for this to terminate the watch instead + logger.error("Unhandled exception encountered in watcher event handler", e); + } } void updateResourceVersion(final String newResourceVersion) { @@ -311,7 +317,7 @@ protected void onMessage(String message) { final String msg = "Couldn't deserialize watch event: " + message; close(new WatcherException(msg, e, message)); } catch (Exception e) { - final String msg = "Unhandled exception encountered in watcher event handler"; + final String msg = "Unexpected exception processing watch event"; close(new WatcherException(msg, e, message)); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 1f1af781053..3404f802be3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -33,6 +33,7 @@ import java.net.URI; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -50,7 +51,7 @@ public class WatchConnectionManager listener; - private CompletableFuture websocketFuture; + private volatile CompletableFuture websocketFuture; private WebSocket websocket; private volatile boolean ready; @@ -87,14 +88,14 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation { + Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> { + this.websocketFuture = null; + theFuture.whenComplete((w, t) -> { if (w != null) { closeWebSocket(w); } }); - websocketFuture = null; - } + }); } synchronized WatcherWebSocketListener getListener() { @@ -105,6 +106,14 @@ public CompletableFuture getWebsocketFuture() { return websocketFuture; } + @Override + protected void onMessage(String message) { + // for consistency we only want to process the message when we're open + if (this.websocketFuture != null) { + super.onMessage(message); + } + } + @Override protected void start(URL url, Map headers) { this.listener = new WatcherWebSocketListener<>(this); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index eaf94fc2a51..1dabc8afcad 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -29,12 +29,14 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); private CompletableFuture> call; + private volatile AsyncBody body; public WatchHTTPManager(final HttpClient client, final BaseOperation baseOperation, @@ -74,7 +76,7 @@ protected synchronized void start(URL url, Map headers) { scheduleReconnect(); } if (response != null) { - AsyncBody body = response.body(); + body = response.body(); if (!response.isSuccessful()) { body.cancel(); if (onStatus(OperationSupport.createStatus(response.code(), response.message()))) { @@ -101,9 +103,12 @@ protected synchronized void start(URL url, Map headers) { @Override protected synchronized void closeRequest() { - if (call != null) { - call.cancel(true); + Optional.ofNullable(call).ifPresent(theFuture -> { call = null; - } + theFuture.cancel(true); + }); + Optional.ofNullable(body).ifPresent(theBody -> { + theBody.cancel(); + }); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 51d99d170b4..29ebee897b4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -57,7 +57,6 @@ public void onMessage(WebSocket webSocket, String text) { } finally { webSocket.request(); } - webSocket.request(); } @Override diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index 25d12c76613..e17bcacedf1 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -444,7 +444,7 @@ void testWaitUntilCondition() throws InterruptedException { } @Test - void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { + void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() .withName("pod1") .withResourceVersion("1") @@ -470,6 +470,15 @@ void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { .open() .waitFor(500) .andEmit(new WatchEvent(status, "ERROR")) + .done() + .once(); + + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() .waitFor(500) .andEmit(new WatchEvent(ready, "MODIFIED")) .done()