From db5ba0e26b3ef6d33c4c30b8af48443ff5675cf2 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Wed, 7 Sep 2022 07:14:09 -0400 Subject: [PATCH] fix #4369: refining the watch and informer reconnect and adding tests --- CHANGELOG.md | 3 +- .../kubernetes/client/WatcherException.java | 8 +- .../client/informers/SharedIndexInformer.java | 7 +- .../client/WatcherExceptionTest.java | 41 +++++++++++ .../dsl/internal/AbstractWatchManager.java | 30 +++++--- .../informers/impl/cache/Reflector.java | 73 +++++++++---------- .../informers/impl/cache/ReflectorTest.java | 4 +- .../mock/DefaultSharedIndexInformerTest.java | 9 ++- .../kubernetes/client/mock/WatchTest.java | 2 +- 9 files changed, 119 insertions(+), 58 deletions(-) create mode 100644 kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index f7b01fde5f..3915e9aa06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ * Fix #4256: crd-generator-apt pom.xml includes transitive dependencies * Fix #4294: crd-generator respects JsonIgnore annotations on enum properties * Fix #4320: corrected leader transitions field on leader election leases - +* Fix #4369: Informers will retry with a backoff on list/watch failure as they did in 5.12 and prior. #### Improvements * Fix #887: added KubernetesClient.visitResources to search and perform other operations across all resources. @@ -21,6 +21,7 @@ * Fix #4287: added WorkloadGroup for Istio v1alpha3 extension generator * Fix #4318: implemented LeaderElection releaseOnCancel * Fix #3960: adding a KubernetesMockServer.expectCustomResource helper method and additional mock crd support +* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture. #### Dependency Upgrade * Bump Knative model to v0.34.0 diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java index b921849e81..2b119b421f 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java @@ -37,14 +37,14 @@ public WatcherException(String message, Throwable cause, String rawWatchMessage) public KubernetesClientException asClientException() { final Throwable cause = getCause(); - return cause instanceof KubernetesClientException ? - (KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause); + return cause instanceof KubernetesClientException ? (KubernetesClientException) cause + : new KubernetesClientException(getMessage(), cause); } public boolean isHttpGone() { final KubernetesClientException cause = asClientException(); - return cause.getCode() == HttpURLConnection.HTTP_GONE - || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); + return cause != null && (cause.getCode() == HttpURLConnection.HTTP_GONE + || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE)); } @SuppressWarnings("unused") diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index c9082164e5..264b5222dc 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.informers; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ItemStore; @@ -168,7 +169,11 @@ default boolean hasSynced() { /** * Return a future that will allow notification of informer stopping. - * + *

+ * If {@link #stop()} is called, the future will be completed with a null value. + *

+ * If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception + * - typically a {@link WatcherException} */ CompletableFuture stopped(); } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java new file mode 100644 index 0000000000..3d500c3a88 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client; + +import io.fabric8.kubernetes.api.model.Status; +import org.junit.jupiter.api.Test; + +import java.net.HttpURLConnection; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class WatcherExceptionTest { + + @Test + void testIsHttpGone() { + WatcherException we = new WatcherException("I've failed"); + assertFalse(we.isHttpGone()); + + we = new WatcherException("I've failed", new ClassCastException()); + assertFalse(we.isHttpGone()); + + we = new WatcherException("I've failed", + new KubernetesClientException("http gone", HttpURLConnection.HTTP_GONE, new Status())); + assertTrue(we.isHttpGone()); + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 1b343790b9..e3c9a5454f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -226,7 +226,7 @@ public void close() { } private WatchEvent contextAwareWatchEventDeserializer(String messageSource) - throws JsonProcessingException { + throws JsonProcessingException { try { return Serialization.unmarshal(messageSource, WatchEvent.class); } catch (Exception ex1) { @@ -272,18 +272,23 @@ protected void onMessage(String message) { try { WatchEvent event = readWatchEvent(message); Object object = event.getObject(); - if (object instanceof Status) { - Status status = (Status) object; + Action action = Action.valueOf(event.getType()); + if (action == Action.ERROR) { + if (object instanceof Status) { + Status status = (Status) object; - onStatus(status); + onStatus(status); + } else { + logger.error("Error received, but object is not a status - will retry"); + closeRequest(); + } } else if (object instanceof HasMetadata) { HasMetadata hasMetadata = (HasMetadata) object; updateResourceVersion(hasMetadata.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - if(object instanceof KubernetesResourceList) { + if (object instanceof KubernetesResourceList) { // Dirty cast - should always be valid though - @SuppressWarnings({"rawtypes"}) + @SuppressWarnings({ "rawtypes" }) KubernetesResourceList list = (KubernetesResourceList) hasMetadata; @SuppressWarnings("unchecked") List items = list.getItems(); @@ -296,17 +301,18 @@ protected void onMessage(String message) { eventReceived(action, hasMetadata); } } else { - logger.error("Unknown message received: {}", message); + final String msg = String.format("Invalid object received: %s", message); + close(new WatcherException(msg, null, message)); } } catch (ClassCastException e) { final String msg = "Received wrong type of object for watch"; - close(new WatcherException(msg, e)); + close(new WatcherException(msg, e, message)); } catch (JsonProcessingException e) { final String msg = "Couldn't deserialize watch event: " + message; close(new WatcherException(msg, e, message)); } catch (Exception e) { final String msg = "Unhandled exception encountered in watcher event handler"; - close(new WatcherException(msg, e)); + close(new WatcherException(msg, e, message)); } } @@ -317,8 +323,8 @@ protected boolean onStatus(Status status) { return true; } - eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); + logger.error("Error received: {}, will retry", status); + closeRequest(); return false; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 7945f674d3..ef336ae6db 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -62,7 +62,7 @@ public Reflector(ListerWatcher listerWatcher, SyncableStore store) { public CompletableFuture start() { this.running = true; - return listSyncAndWatch(); + return listSyncAndWatch(false); } public void stop() { @@ -98,33 +98,50 @@ private synchronized void stopWatcher() { * * @return a future that completes when the list and watch are established */ - public CompletableFuture listSyncAndWatch() { + public CompletableFuture listSyncAndWatch(boolean reconnect) { if (!running) { return CompletableFuture.completedFuture(null); } Set nextKeys = new ConcurrentSkipListSet<>(); - return processList(nextKeys, null).thenAccept(result -> { + CompletableFuture theFuture = processList(nextKeys, null).thenCompose(result -> { store.retainAll(nextKeys); final String latestResourceVersion = result.getMetadata().getResourceVersion(); lastSyncResourceVersion = latestResourceVersion; log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion); - CompletableFuture started = startWatcher(latestResourceVersion); - if (started != null) { - // outside of the lock - started.whenComplete((w, t) -> { - if (w != null) { - if (running) { - if (log.isDebugEnabled()) { - log.debug("Watch started for {}", Reflector.this); - } - watching = true; - } else { - stopWatch(w); - } + return startWatcher(latestResourceVersion); + }).thenAccept(w -> { + if (w != null) { + if (running) { + if (log.isDebugEnabled()) { + log.debug("Watch started for {}", Reflector.this); } - }); + watching = true; + } else { + stopWatch(w); + } } }); + if (reconnect) { + theFuture.whenComplete((v, t) -> { + if (t != null) { + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); + reconnect(); + } else { + retryIntervalCalculator.resetReconnectAttempts(); + } + }); + } + return theFuture; + } + + private void reconnect() { + if (!running) { + return; + } + // this can be run in the scheduler thread because + // any further operations will happen on the io thread + reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(true), + retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); } private CompletableFuture processList(Set nextKeys, String continueVal) { @@ -155,7 +172,7 @@ private void stopWatch(Watch w) { private synchronized CompletableFuture startWatcher(final String latestResourceVersion) { if (!running) { - return null; + return CompletableFuture.completedFuture(null); } log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); // there's no need to stop the old watch, that will happen automatically when this call completes @@ -227,29 +244,11 @@ public void onClose(WatcherException exception) { reconnect(); } else { running = false; // shouldn't happen, but it means the watch won't restart - stopFuture.completeExceptionally(exception.getCause()); + stopFuture.completeExceptionally(exception); log.warn("Watch closing with exception for {}", Reflector.this, exception); } } - private void reconnect() { - if (!running) { - return; - } - // this can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); - reconnectFuture.whenComplete((v, t) -> { - if (t != null) { - log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); - reconnect(); - } else { - retryIntervalCalculator.resetReconnectAttempts(); - } - }); - } - @Override public void onClose() { watchStopped(); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index 2d39dd13c2..a8a3d2a82f 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -59,15 +59,17 @@ void testStateFlags() { assertFalse(reflector.isWatching()); assertTrue(reflector.isRunning()); - reflector.listSyncAndWatch().join(); + reflector.listSyncAndWatch(false).join(); assertTrue(reflector.isWatching()); assertTrue(reflector.isRunning()); + assertFalse(reflector.getStopFuture().isDone()); reflector.stop(); assertFalse(reflector.isWatching()); assertFalse(reflector.isRunning()); + assertTrue(reflector.getStopFuture().isDone()); } @Test diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index a97e69e528..ea60bdce64 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder; import io.fabric8.kubernetes.client.CustomResourceList; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; @@ -83,7 +84,8 @@ class DefaultSharedIndexInformerTest { .withMessage( "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") .build(); - static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build(); + static final WatchEvent outdatedEvent = new WatchEventBuilder().withType(Watcher.Action.ERROR.name()) + .withStatusObject(outdatedStatus).build(); static final Long WATCH_EVENT_EMIT_TIME = 1L; static final Long OUTDATED_WATCH_EVENT_EMIT_TIME = 1L; static final long RESYNC_PERIOD = 5L; @@ -844,6 +846,11 @@ void testReconnectAfterOnCloseException() throws InterruptedException { .andEmit(outdatedEvent) .done().always(); + // re-list errors + server.expect().withPath("/api/v1/pods") + .andReturn(HttpURLConnection.HTTP_FORBIDDEN, new Status()) + .times(2); + // re-list server.expect().withPath("/api/v1/pods") .andReturn(200, diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java index 1d33082ced..ef69c0c108 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java @@ -315,7 +315,7 @@ public void onClose(WatcherException cause) { } private static WatchEvent outdatedEvent() { - return new WatchEventBuilder().withStatusObject( + return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()).withStatusObject( new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE) .withMessage( "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")