Skip to content

Commit

Permalink
fix #4369: allowing the handler to determine if the stop or retry
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 2, 2022
1 parent 90cf0db commit c596971
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 31 deletions.
@@ -1,9 +1,32 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;

public interface InformerExceptionHandler {

void onWatchNonrecoverable(WatcherException e);
/**
* Provides a callback when the informer could terminated with a non-recoverable exception.
*
* @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an
* exception from calling list or watch
* @return true if the informer should retry, false if it should stop
*/
boolean retry(Throwable t);

}
Expand Up @@ -83,7 +83,7 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
* @param resyncPeriod the specific resync period
*/
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle,
long resyncPeriod);
long resyncPeriod);

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
Expand Down
Expand Up @@ -48,7 +48,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean running;
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile Future<?> reconnectFuture;
private volatile CompletableFuture<?> reconnectFuture;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, null);
Expand Down Expand Up @@ -187,7 +187,6 @@ class ReflectorWatcher implements Watcher<T> {
this.exceptionHandlerSupplier = exceptionHandlerSupplier;
}


@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
Expand Down Expand Up @@ -221,38 +220,43 @@ public void eventReceived(Action action, T resource) {
public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
boolean restarted = false;
try {
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
listSyncAndWatch().whenComplete((v, t) -> {
if (t != null) {
watchStopped();
// start a whole new list/watch cycle, can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
}
});
restarted = true;
} else {
running = false; // shouldn't happen, but it means the watch won't restart
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler != null) {
handler.onWatchNonrecoverable(exception);
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
watchStopped();
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
boolean reconnect = false;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
} finally {
if (!restarted) {
watchStopped(); // report the watch as stopped after a problem
reconnect = true;
} else if (handler != null) {
reconnect = handler.retry(exception.getCause());
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
if (handler == null) {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}
}

private void reconnect() {
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler == null || handler.retry(t)) {
reconnect();
}
}
});
}

@Override
public void onClose() {
watchStopped();
Expand Down

0 comments on commit c596971

Please sign in to comment.