From c596971d713fa8e7c47aa9317ec79afa5c5c71d7 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Fri, 2 Sep 2022 07:58:01 -0400 Subject: [PATCH] fix #4369: allowing the handler to determine if the stop or retry --- .../informers/InformerExceptionHandler.java | 25 +++++++- .../client/informers/SharedIndexInformer.java | 2 +- .../informers/impl/cache/Reflector.java | 62 ++++++++++--------- 3 files changed, 58 insertions(+), 31 deletions(-) diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java index 70d026ac6bb..21fe65ad922 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/InformerExceptionHandler.java @@ -1,9 +1,32 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package io.fabric8.kubernetes.client.informers; import io.fabric8.kubernetes.client.WatcherException; public interface InformerExceptionHandler { - void onWatchNonrecoverable(WatcherException e); + /** + * Provides a callback when the informer could terminated with a non-recoverable exception. + * + * @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an + * exception from calling list or watch + * @return true if the informer should retry, false if it should stop + */ + boolean retry(Throwable t); } diff --git a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java index 0919e69def4..3250cac2ed9 100644 --- a/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java +++ b/kubernetes-client-api/src/main/java/io/fabric8/kubernetes/client/informers/SharedIndexInformer.java @@ -83,7 +83,7 @@ default SharedIndexInformer removeNamespaceIndex() { * @param resyncPeriod the specific resync period */ SharedIndexInformer addEventHandlerWithResyncPeriod(ResourceEventHandler handle, - long resyncPeriod); + long resyncPeriod); /** * Starts the shared informer, which will be stopped when {@link #stop()} is called. diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java index ab6aa5827c4..68e8a8a9d60 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java @@ -48,7 +48,7 @@ public class Reflector watchFuture; - private volatile Future reconnectFuture; + private volatile CompletableFuture reconnectFuture; public Reflector(ListerWatcher listerWatcher, SyncableStore store) { this(listerWatcher, store, null); @@ -187,7 +187,6 @@ class ReflectorWatcher implements Watcher { this.exceptionHandlerSupplier = exceptionHandlerSupplier; } - @Override public void eventReceived(Action action, T resource) { if (action == null) { @@ -221,38 +220,43 @@ public void eventReceived(Action action, T resource) { public void onClose(WatcherException exception) { // this close was triggered by an exception, // not the user, it is expected that the watch retry will handle this - boolean restarted = false; - try { - if (exception.isHttpGone()) { - if (log.isDebugEnabled()) { - log.debug("Watch restarting due to http gone for {}", Reflector.this); - } - listSyncAndWatch().whenComplete((v, t) -> { - if (t != null) { - watchStopped(); - // start a whole new list/watch cycle, can be run in the scheduler thread because - // any further operations will happen on the io thread - reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, - listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); - } - }); - restarted = true; - } else { - running = false; // shouldn't happen, but it means the watch won't restart - InformerExceptionHandler handler = exceptionHandlerSupplier.get(); - if (handler != null) { - handler.onWatchNonrecoverable(exception); - } else { - log.warn("Watch closing with exception for {}", Reflector.this, exception); - } + watchStopped(); + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + boolean reconnect = false; + if (exception.isHttpGone()) { + if (log.isDebugEnabled()) { + log.debug("Watch restarting due to http gone for {}", Reflector.this); } - } finally { - if (!restarted) { - watchStopped(); // report the watch as stopped after a problem + reconnect = true; + } else if (handler != null) { + reconnect = handler.retry(exception.getCause()); + } + if (reconnect) { + // start a whole new list/watch cycle + reconnect(); + } else { + running = false; // shouldn't happen, but it means the watch won't restart + if (handler == null) { + log.warn("Watch closing with exception for {}", Reflector.this, exception); } } } + private void reconnect() { + // this can be run in the scheduler thread because + // any further operations will happen on the io thread + reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch, + listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS); + reconnectFuture.whenComplete((v, t) -> { + if (t != null) { + InformerExceptionHandler handler = exceptionHandlerSupplier.get(); + if (handler == null || handler.retry(t)) { + reconnect(); + } + } + }); + } + @Override public void onClose() { watchStopped();