diff --git a/CHANGELOG.md b/CHANGELOG.md index ce05bdfd0c..9e8822820c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,10 +3,12 @@ ### 6.2-SNAPSHOT #### Bugs +* Fix #4369: Informers will retry with a backoff on list/watch failure as they did in 5.12 and prior. * Fix #4350: SchemaSwap annotation is now repeatable and is applied multiple times if classes are used more than once in the class hierarchy. #### Improvements * Fix #4348: Introduce specific annotations for the generators +* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture. #### Dependency Upgrade * Fix #4243: Update Tekton pipeline model to v0.39.0 diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java index 2803fc9393..2b119b421f 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/WatcherException.java @@ -16,27 +16,39 @@ package io.fabric8.kubernetes.client; import java.net.HttpURLConnection; +import java.util.Optional; public class WatcherException extends Exception { + private final String rawWatchMessage; public WatcherException(String message, Throwable cause) { - super(message, cause); + this(message, cause, null); } public WatcherException(String message) { super(message); + rawWatchMessage = null; + } + + public WatcherException(String message, Throwable cause, String rawWatchMessage) { + super(message, cause); + this.rawWatchMessage = rawWatchMessage; } public KubernetesClientException asClientException() { final Throwable cause = getCause(); - return cause instanceof KubernetesClientException ? - (KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause); + return cause instanceof KubernetesClientException ? (KubernetesClientException) cause + : new KubernetesClientException(getMessage(), cause); } public boolean isHttpGone() { final KubernetesClientException cause = asClientException(); - return cause.getCode() == HttpURLConnection.HTTP_GONE - || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE); + return cause != null && (cause.getCode() == HttpURLConnection.HTTP_GONE + || (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE)); } + @SuppressWarnings("unused") + public Optional getRawWatchMessage() { + return Optional.ofNullable(rawWatchMessage); + } } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index d7f5902fcb..264b5222dc 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.informers; +import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ItemStore; @@ -82,7 +83,8 @@ default SharedIndexInformer removeNamespaceIndex() { * @param handle the event handler * @param resyncPeriod the specific resync period */ - SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, long resyncPeriod); + SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, + long resyncPeriod); /** * Starts the shared informer, which will be stopped when {@link #stop()} is called. @@ -165,4 +167,13 @@ default boolean hasSynced() { */ CompletableFuture start(); + /** + * Return a future that will allow notification of informer stopping. + *

+ * If {@link #stop()} is called, the future will be completed with a null value. + *

+ * If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception + * - typically a {@link WatcherException} + */ + CompletableFuture stopped(); } diff --git a/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java new file mode 100644 index 0000000000..3d500c3a88 --- /dev/null +++ b/kubernetes-client-api/src/test/java/io/fabric8/kubernetes/client/WatcherExceptionTest.java @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client; + +import io.fabric8.kubernetes.api.model.Status; +import org.junit.jupiter.api.Test; + +import java.net.HttpURLConnection; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class WatcherExceptionTest { + + @Test + void testIsHttpGone() { + WatcherException we = new WatcherException("I've failed"); + assertFalse(we.isHttpGone()); + + we = new WatcherException("I've failed", new ClassCastException()); + assertFalse(we.isHttpGone()); + + we = new WatcherException("I've failed", + new KubernetesClientException("http gone", HttpURLConnection.HTTP_GONE, new Status())); + assertTrue(we.isHttpGone()); + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 17caf47d51..0500300f42 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -44,7 +44,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -60,13 +59,12 @@ public abstract class AbstractWatchManager implements Wat final AtomicBoolean forceClosed; private final int reconnectLimit; private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; - final AtomicInteger currentReconnectAttempt; private Future reconnectAttempt; protected final HttpClient client; protected BaseOperation baseOperation; - private ListOptions listOptions; - private URL requestUrl; + private final ListOptions listOptions; + private final URL requestUrl; private final boolean receiveBookmarks; @@ -77,7 +75,6 @@ public abstract class AbstractWatchManager implements Wat this.reconnectLimit = reconnectLimit; this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent); this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); - this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); this.receiveBookmarks = Boolean.TRUE.equals(listOptions.getAllowWatchBookmarks()); // opt into bookmarks by default @@ -164,18 +161,16 @@ synchronized void reconnect() { } final boolean cannotReconnect() { - return !watcher.reconnecting() && currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0; + return !watcher.reconnecting() && retryIntervalCalculator.getCurrentReconnectAttempt() >= reconnectLimit + && reconnectLimit >= 0; } final long nextReconnectInterval() { - int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); - long ret = retryIntervalCalculator.getInterval(exponentOfTwo); - logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); - return ret; + return retryIntervalCalculator.nextReconnectInterval(); } void resetReconnectAttempts() { - currentReconnectAttempt.set(0); + retryIntervalCalculator.resetReconnectAttempts(); } boolean isForceClosed() { @@ -192,7 +187,15 @@ void eventReceived(Watcher.Action action, HasMetadata resource) { if (resource != null && !baseOperation.getType().isAssignableFrom(resource.getClass())) { resource = Serialization.jsonMapper().convertValue(resource, baseOperation.getType()); } - watcher.eventReceived(action, (T) resource); + @SuppressWarnings("unchecked") + final T t = (T) resource; + try { + watcher.eventReceived(action, t); + } catch (Exception e) { + // for compatibility, this will just log the exception as was done in previous versions + // a case could be made for this to terminate the watch instead + logger.error("Unhandled exception encountered in watcher event handler", e); + } } void updateResourceVersion(final String newResourceVersion) { @@ -228,29 +231,26 @@ public void close() { cancelReconnect(); } - private WatchEvent contextAwareWatchEventDeserializer(String messageSource) { + private WatchEvent contextAwareWatchEventDeserializer(String messageSource) + throws JsonProcessingException { try { return Serialization.unmarshal(messageSource, WatchEvent.class); } catch (Exception ex1) { - try { - JsonNode json = Serialization.jsonMapper().readTree(messageSource); - JsonNode objectJson = null; - if (json instanceof ObjectNode && json.has("object")) { - objectJson = ((ObjectNode) json).remove("object"); - } + JsonNode json = Serialization.jsonMapper().readTree(messageSource); + JsonNode objectJson = null; + if (json instanceof ObjectNode && json.has("object")) { + objectJson = ((ObjectNode) json).remove("object"); + } - WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class); - KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType()); + WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class); + KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType()); - watchEvent.setObject(object); - return watchEvent; - } catch (JsonProcessingException ex2) { - throw new IllegalArgumentException("Failed to deserialize WatchEvent", ex2); - } + watchEvent.setObject(object); + return watchEvent; } } - protected WatchEvent readWatchEvent(String messageSource) { + protected WatchEvent readWatchEvent(String messageSource) throws JsonProcessingException { WatchEvent event = contextAwareWatchEventDeserializer(messageSource); KubernetesResource object = null; if (event != null) { @@ -278,36 +278,47 @@ protected void onMessage(String message) { try { WatchEvent event = readWatchEvent(message); Object object = event.getObject(); - if (object instanceof Status) { - Status status = (Status) object; - - onStatus(status); - } else if (object instanceof KubernetesResourceList) { - // Dirty cast - should always be valid though - KubernetesResourceList list = (KubernetesResourceList) object; - updateResourceVersion(list.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - eventReceived(action, item); - } + Action action = Action.valueOf(event.getType()); + if (action == Action.ERROR) { + if (object instanceof Status) { + Status status = (Status) object; + + onStatus(status); + } else { + logger.error("Error received, but object is not a status - will retry"); + closeRequest(); } } else if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - updateResourceVersion(obj.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - eventReceived(action, obj); + HasMetadata hasMetadata = (HasMetadata) object; + updateResourceVersion(hasMetadata.getMetadata().getResourceVersion()); + + if (object instanceof KubernetesResourceList) { + // Dirty cast - should always be valid though + @SuppressWarnings({ "rawtypes" }) + KubernetesResourceList list = (KubernetesResourceList) hasMetadata; + @SuppressWarnings("unchecked") + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + eventReceived(action, item); + } + } + } else { + eventReceived(action, hasMetadata); + } } else { - logger.error("Unknown message received: {}", message); + final String msg = String.format("Invalid object received: %s", message); + close(new WatcherException(msg, null, message)); } } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); + final String msg = "Received wrong type of object for watch"; + close(new WatcherException(msg, e, message)); + } catch (JsonProcessingException e) { + final String msg = "Couldn't deserialize watch event: " + message; + close(new WatcherException(msg, e, message)); } catch (Exception e) { - logger.error("Unhandled exception encountered in watcher event handler", e); + final String msg = "Unexpected exception processing watch event"; + close(new WatcherException(msg, e, message)); } } @@ -318,8 +329,8 @@ protected boolean onStatus(Status status) { return true; } - eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); + logger.error("Error received: {}, will retry", status); + closeRequest(); return false; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java index 3a2f322363..7c68ff7633 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/OperationSupport.java @@ -60,7 +60,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; public class OperationSupport { @@ -83,7 +82,6 @@ public class OperationSupport { protected String apiGroupName; protected String apiGroupVersion; protected boolean dryRun; - private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; private final int requestRetryBackoffLimit; public OperationSupport(Client client) { @@ -107,16 +105,11 @@ public OperationSupport(OperationContext ctx) { this.apiGroupVersion = "v1"; } - final int requestRetryBackoffInterval; if (ctx.getConfig() != null) { - requestRetryBackoffInterval = ctx.getConfig().getRequestRetryBackoffInterval(); this.requestRetryBackoffLimit = ctx.getConfig().getRequestRetryBackoffLimit(); } else { - requestRetryBackoffInterval = Config.DEFAULT_REQUEST_RETRY_BACKOFFINTERVAL; this.requestRetryBackoffLimit = Config.DEFAULT_REQUEST_RETRY_BACKOFFLIMIT; } - this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(requestRetryBackoffInterval, - MAX_RETRY_INTERVAL_EXPONENT); } public String getAPIGroupName() { @@ -574,7 +567,9 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest VersionUsageUtils.log(this.resourceT, this.apiGroupVersion); HttpRequest request = requestBuilder.build(); CompletableFuture> futureResponse = new CompletableFuture<>(); - retryWithExponentialBackoff(futureResponse, new AtomicInteger(), Utils.getNonNullOrElse(client, httpClient), request); + retryWithExponentialBackoff(futureResponse, + new ExponentialBackoffIntervalCalculator(requestRetryBackoffLimit, MAX_RETRY_INTERVAL_EXPONENT), + Utils.getNonNullOrElse(client, httpClient), request); return futureResponse.thenApply(response -> { try { @@ -593,13 +588,13 @@ protected CompletableFuture handleResponse(HttpClient client, HttpRequest } protected void retryWithExponentialBackoff(CompletableFuture> result, - AtomicInteger numRetries, + ExponentialBackoffIntervalCalculator retryIntervalCalculator, HttpClient client, HttpRequest request) { client.sendAsync(request, byte[].class) .whenComplete((response, throwable) -> { - int retries = numRetries.getAndIncrement(); + int retries = retryIntervalCalculator.getCurrentReconnectAttempt(); if (retries < requestRetryBackoffLimit) { - long retryInterval = retryIntervalCalculator.getInterval(retries); + long retryInterval = retryIntervalCalculator.nextReconnectInterval(); boolean retry = false; if (response != null && response.code() >= 500) { LOG.debug("HTTP operation on url: {} should be retried as the response code was {}, retrying after {} millis", @@ -612,7 +607,8 @@ protected void retryWithExponentialBackoff(CompletableFuture retryWithExponentialBackoff(result, numRetries, client, request), retryInterval, TimeUnit.MILLISECONDS); + () -> retryWithExponentialBackoff(result, retryIntervalCalculator, client, request), retryInterval, + TimeUnit.MILLISECONDS); return; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 33c43ee456..3404f802be 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -33,6 +33,7 @@ import java.net.URI; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -45,10 +46,12 @@ public class WatchConnectionManager> extends AbstractWatchManager { + public static final int BACKOFF_MAX_EXPONENT = 5; + private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); protected WatcherWebSocketListener listener; - private CompletableFuture websocketFuture; + private volatile CompletableFuture websocketFuture; private WebSocket websocket; private volatile boolean ready; @@ -78,20 +81,21 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { // Default max 32x slowdown from base interval - this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, 5); + this(client, baseOperation, listOptions, watcher, reconnectInterval, reconnectLimit, websocketTimeout, + BACKOFF_MAX_EXPONENT); } @Override protected synchronized void closeRequest() { closeWebSocket(websocket); - if (this.websocketFuture != null) { - this.websocketFuture.whenComplete((w, t) -> { + Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> { + this.websocketFuture = null; + theFuture.whenComplete((w, t) -> { if (w != null) { closeWebSocket(w); } }); - websocketFuture = null; - } + }); } synchronized WatcherWebSocketListener getListener() { @@ -102,6 +106,14 @@ public CompletableFuture getWebsocketFuture() { return websocketFuture; } + @Override + protected void onMessage(String message) { + // for consistency we only want to process the message when we're open + if (this.websocketFuture != null) { + super.onMessage(message); + } + } + @Override protected void start(URL url, Map headers) { this.listener = new WatcherWebSocketListener<>(this); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index eaf94fc2a5..3bfd38c0ef 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -29,12 +29,14 @@ import java.net.MalformedURLException; import java.net.URL; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); private CompletableFuture> call; + private volatile AsyncBody body; public WatchHTTPManager(final HttpClient client, final BaseOperation baseOperation, @@ -74,7 +76,7 @@ protected synchronized void start(URL url, Map headers) { scheduleReconnect(); } if (response != null) { - AsyncBody body = response.body(); + body = response.body(); if (!response.isSuccessful()) { body.cancel(); if (onStatus(OperationSupport.createStatus(response.code(), response.message()))) { @@ -101,9 +103,9 @@ protected synchronized void start(URL url, Map headers) { @Override protected synchronized void closeRequest() { - if (call != null) { - call.cancel(true); - call = null; - } + Optional.ofNullable(call).ifPresent(theFuture -> { + theFuture.cancel(true); + }); + Optional.ofNullable(body).ifPresent(AsyncBody::cancel); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 51d99d170b..29ebee897b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -57,7 +57,6 @@ public void onMessage(WebSocket webSocket, String text) { } finally { webSocket.request(); } - webSocket.request(); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 5178879ebd..c162033016 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -164,7 +164,7 @@ public CompletableFuture start() { return reflector.start().whenComplete((v, t) -> { // stop called while run is called could be ineffective, check for it afterwards synchronized (this) { - if (stopped) { + if (stopped && reflector.isRunning()) { stop(); } } @@ -289,4 +289,9 @@ public String toString() { return this.description; } + @Override + public CompletableFuture stopped() { + return this.reflector.getStopFuture(); + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index 52d9126f42..627051a48f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -22,15 +22,16 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager; import io.fabric8.kubernetes.client.informers.impl.ListerWatcher; import io.fabric8.kubernetes.client.utils.Utils; +import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.Set; -import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -46,21 +47,26 @@ public class Reflector watchFuture; - private volatile Future reconnectFuture; + private volatile CompletableFuture reconnectFuture; + private final CompletableFuture stopFuture = new CompletableFuture<>(); + private final ExponentialBackoffIntervalCalculator retryIntervalCalculator; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this.listerWatcher = listerWatcher; this.store = store; this.watcher = new ReflectorWatcher(); + this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(listerWatcher.getWatchReconnectInterval(), + WatchConnectionManager.BACKOFF_MAX_EXPONENT); } public CompletableFuture start() { this.running = true; - return listSyncAndWatch(); + return listSyncAndWatch(false); } public void stop() { running = false; + stopFuture.complete(null); Future future = reconnectFuture; if (future != null) { future.cancel(true); @@ -69,18 +75,15 @@ public void stop() { } private synchronized void stopWatcher() { - if (watchFuture != null) { - watchFuture.cancel(true); - try { - Watch w = watchFuture.getNow(null); + Optional.ofNullable(watchFuture).ifPresent(theFuture -> { + watchFuture = null; + theFuture.cancel(true); + theFuture.whenComplete((w, t) -> { if (w != null) { stopWatch(w); } - } catch (CompletionException | CancellationException e) { - // do nothing - } - watchFuture = null; - } + }); + }); } /** @@ -91,35 +94,57 @@ private synchronized void stopWatcher() { * * @return a future that completes when the list and watch are established */ - public CompletableFuture listSyncAndWatch() { + public CompletableFuture listSyncAndWatch(boolean reconnect) { if (!running) { return CompletableFuture.completedFuture(null); } Set nextKeys = new ConcurrentSkipListSet<>(); - return processList(nextKeys, null).thenAccept(result -> { + CompletableFuture theFuture = processList(nextKeys, null).thenCompose(result -> { store.retainAll(nextKeys); final String latestResourceVersion = result.getMetadata().getResourceVersion(); lastSyncResourceVersion = latestResourceVersion; log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion); - CompletableFuture started = startWatcher(latestResourceVersion); - if (started != null) { - // outside of the lock - started.whenComplete((w, t) -> { - if (w != null) { - if (running) { - watching = true; - } else { - stopWatch(w); - } + return startWatcher(latestResourceVersion); + }).thenAccept(w -> { + if (w != null) { + if (running) { + if (log.isDebugEnabled()) { + log.debug("Watch started for {}", Reflector.this); } - }); + watching = true; + } else { + stopWatch(w); + } } }); + if (reconnect) { + theFuture.whenComplete((v, t) -> { + if (t != null) { + log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t); + reconnect(); + } else { + retryIntervalCalculator.resetReconnectAttempts(); + } + }); + } + return theFuture; + } + + private void reconnect() { + if (!running) { + return; + } + // this can be run in the scheduler thread because + // any further operations will happen on the io thread + reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(true), + retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS); } private CompletableFuture processList(Set nextKeys, String continueVal) { CompletableFuture futureResult = listerWatcher - .submitList(new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal).build()); + .submitList( + new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal) + .build()); return futureResult.thenCompose(result -> { result.getItems().forEach(i -> { @@ -143,13 +168,15 @@ private void stopWatch(Watch w) { private synchronized CompletableFuture startWatcher(final String latestResourceVersion) { if (!running) { - return null; + return CompletableFuture.completedFuture(null); } log.debug("Starting watcher for {} at v{}", this, latestResourceVersion); // there's no need to stop the old watch, that will happen automatically when this call completes - watchFuture = listerWatcher.submitWatch(new ListOptionsBuilder().withResourceVersion(latestResourceVersion) - .withTimeoutSeconds(null) - .build(), watcher); + watchFuture = listerWatcher.submitWatch( + new ListOptionsBuilder().withResourceVersion(latestResourceVersion) + .withTimeoutSeconds(null) + .build(), + watcher); return watchFuture; } @@ -180,7 +207,8 @@ public void eventReceived(Action action, T resource) { throw new KubernetesClientException("Unrecognized resource for " + Reflector.this); } if (log.isDebugEnabled()) { - log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), resource.getKind(), + log.debug("Event received {} {} resourceVersion v{} for {}", action.name(), + resource.getKind(), resource.getMetadata().getResourceVersion(), Reflector.this); } switch (action) { @@ -203,30 +231,17 @@ public void eventReceived(Action action, T resource) { public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this - boolean restarted = false; - try { - if (exception.isHttpGone()) { - if (log.isDebugEnabled()) { - log.debug("Watch restarting due to http gone for {}", Reflector.this); - } - listSyncAndWatch().whenComplete((v, t) -> { - if (t != null) { - watchStopped(); - // start a whole new list/watch cycle, can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); - } - }); - restarted = true; - } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - running = false; // shouldn't happen, but it means the watch won't restart - } - } finally { - if (!restarted) { - watchStopped(); // report the watch as stopped after a problem + watchStopped(); + if (exception.isHttpGone()) { + if (log.isDebugEnabled()) { + log.debug("Watch restarting due to http gone for {}", Reflector.this); } + // start a whole new list/watch cycle + reconnect(); + } else { + running = false; // shouldn't happen, but it means the watch won't restart + stopFuture.completeExceptionally(exception); + log.warn("Watch closing with exception for {}", Reflector.this, exception); } } @@ -240,7 +255,6 @@ public void onClose() { public boolean reconnecting() { return true; } - } ReflectorWatcher getWatcher() { @@ -252,4 +266,8 @@ public String toString() { return listerWatcher.getApiEndpointPath(); } + public CompletableFuture getStopFuture() { + return stopFuture; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java index 269e2528e2..4dc34a5a27 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/internal/ExponentialBackoffIntervalCalculator.java @@ -15,10 +15,18 @@ */ package io.fabric8.kubernetes.client.utils.internal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + public class ExponentialBackoffIntervalCalculator { + private static final Logger logger = LoggerFactory.getLogger(ExponentialBackoffIntervalCalculator.class); + private final int initialInterval; private final int maxRetryIntervalExponent; + final AtomicInteger currentReconnectAttempt = new AtomicInteger(); public ExponentialBackoffIntervalCalculator(int initialInterval, int maxRetryIntervalExponent) { this.initialInterval = initialInterval; @@ -30,7 +38,22 @@ public long getInterval(int retryIndex) { if (exponentOfTwo > maxRetryIntervalExponent) { exponentOfTwo = maxRetryIntervalExponent; } - return (long)initialInterval * (1 << exponentOfTwo); + return (long) initialInterval * (1 << exponentOfTwo); + } + + public void resetReconnectAttempts() { + currentReconnectAttempt.set(0); + } + + public final long nextReconnectInterval() { + int exponentOfTwo = currentReconnectAttempt.getAndIncrement(); + long ret = getInterval(exponentOfTwo); + logger.debug("Current reconnect backoff is {} milliseconds (T{})", ret, exponentOfTwo); + return ret; + } + + public int getCurrentReconnectAttempt() { + return currentReconnectAttempt.get(); } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java index 2d39dd13c2..a8a3d2a82f 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java @@ -59,15 +59,17 @@ void testStateFlags() { assertFalse(reflector.isWatching()); assertTrue(reflector.isRunning()); - reflector.listSyncAndWatch().join(); + reflector.listSyncAndWatch(false).join(); assertTrue(reflector.isWatching()); assertTrue(reflector.isRunning()); + assertFalse(reflector.getStopFuture().isDone()); reflector.stop(); assertFalse(reflector.isWatching()); assertFalse(reflector.isRunning()); + assertTrue(reflector.getStopFuture().isDone()); } @Test diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index a97e69e528..ea60bdce64 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -36,6 +36,7 @@ import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder; import io.fabric8.kubernetes.client.CustomResourceList; import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext; import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; @@ -83,7 +84,8 @@ class DefaultSharedIndexInformerTest { .withMessage( "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]") .build(); - static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build(); + static final WatchEvent outdatedEvent = new WatchEventBuilder().withType(Watcher.Action.ERROR.name()) + .withStatusObject(outdatedStatus).build(); static final Long WATCH_EVENT_EMIT_TIME = 1L; static final Long OUTDATED_WATCH_EVENT_EMIT_TIME = 1L; static final long RESYNC_PERIOD = 5L; @@ -844,6 +846,11 @@ void testReconnectAfterOnCloseException() throws InterruptedException { .andEmit(outdatedEvent) .done().always(); + // re-list errors + server.expect().withPath("/api/v1/pods") + .andReturn(HttpURLConnection.HTTP_FORBIDDEN, new Status()) + .times(2); + // re-list server.expect().withPath("/api/v1/pods") .andReturn(200, diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java index 25d12c7661..e17bcacedf 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java @@ -444,7 +444,7 @@ void testWaitUntilCondition() throws InterruptedException { } @Test - void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { + void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { Pod pod1 = new PodBuilder().withNewMetadata() .withName("pod1") .withResourceVersion("1") @@ -470,6 +470,15 @@ void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException { .open() .waitFor(500) .andEmit(new WatchEvent(status, "ERROR")) + .done() + .once(); + + server.expect() + .get() + .withPath( + "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true") + .andUpgradeToWebSocket() + .open() .waitFor(500) .andEmit(new WatchEvent(ready, "MODIFIED")) .done() diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java index 1d33082ced..ef69c0c108 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/WatchTest.java @@ -315,7 +315,7 @@ public void onClose(WatcherException cause) { } private static WatchEvent outdatedEvent() { - return new WatchEventBuilder().withStatusObject( + return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()).withStatusObject( new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE) .withMessage( "410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")