Skip to content

Commit

Permalink
fix #4369: allowing the handler to determine if the stop or retry
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 2, 2022
1 parent 90cf0db commit 69aa906
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 40 deletions.
@@ -1,9 +1,32 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;

public interface InformerExceptionHandler {

void onWatchNonrecoverable(WatcherException e);
/**
* Provides a callback when the informer could terminated with a non-recoverable exception.
*
* @param t the {@link Throwable}, which may occur as either the cause of a non-http gone {@link WatcherException} or an
* exception from calling list or watch
* @return true if the informer should retry, false if it should stop
*/
boolean retry(Throwable t);

}
Expand Up @@ -83,7 +83,7 @@ default SharedIndexInformer<T> removeNamespaceIndex() {
* @param resyncPeriod the specific resync period
*/
SharedIndexInformer<T> addEventHandlerWithResyncPeriod(ResourceEventHandler<? super T> handle,
long resyncPeriod);
long resyncPeriod);

/**
* Starts the shared informer, which will be stopped when {@link #stop()} is called.
Expand Down
Expand Up @@ -48,14 +48,14 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean running;
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile Future<?> reconnectFuture;
private volatile CompletableFuture<?> reconnectFuture;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
this(listerWatcher, store, null);
}

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store,
Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
Supplier<InformerExceptionHandler> exceptionHandlerSupplier) {
this.listerWatcher = listerWatcher;
this.store = store;
this.watcher = new ReflectorWatcher(exceptionHandlerSupplier);
Expand Down Expand Up @@ -126,9 +126,9 @@ public CompletableFuture<Void> listSyncAndWatch() {

private CompletableFuture<L> processList(Set<String> nextKeys, String continueVal) {
CompletableFuture<L> futureResult = listerWatcher
.submitList(
new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal)
.build());
.submitList(
new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal)
.build());

return futureResult.thenCompose(result -> {
result.getItems().forEach(i -> {
Expand Down Expand Up @@ -157,9 +157,10 @@ private synchronized CompletableFuture<Watch> startWatcher(final String latestRe
log.debug("Starting watcher for {} at v{}", this, latestResourceVersion);
// there's no need to stop the old watch, that will happen automatically when this call completes
watchFuture = listerWatcher.submitWatch(
new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
.withTimeoutSeconds(null)
.build(), watcher);
new ListOptionsBuilder().withResourceVersion(latestResourceVersion)
.withTimeoutSeconds(null)
.build(),
watcher);
return watchFuture;
}

Expand Down Expand Up @@ -187,7 +188,6 @@ class ReflectorWatcher implements Watcher<T> {
this.exceptionHandlerSupplier = exceptionHandlerSupplier;
}


@Override
public void eventReceived(Action action, T resource) {
if (action == null) {
Expand All @@ -198,8 +198,8 @@ public void eventReceived(Action action, T resource) {
}
if (log.isDebugEnabled()) {
log.debug("Event received {} {} resourceVersion v{} for {}", action.name(),
resource.getKind(),
resource.getMetadata().getResourceVersion(), Reflector.this);
resource.getKind(),
resource.getMetadata().getResourceVersion(), Reflector.this);
}
switch (action) {
case ERROR:
Expand All @@ -221,38 +221,43 @@ public void eventReceived(Action action, T resource) {
public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
boolean restarted = false;
try {
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
listSyncAndWatch().whenComplete((v, t) -> {
if (t != null) {
watchStopped();
// start a whole new list/watch cycle, can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
}
});
restarted = true;
} else {
running = false; // shouldn't happen, but it means the watch won't restart
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler != null) {
handler.onWatchNonrecoverable(exception);
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
watchStopped();
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
boolean reconnect = false;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
} finally {
if (!restarted) {
watchStopped(); // report the watch as stopped after a problem
reconnect = true;
} else if (handler != null) {
reconnect = handler.retry(exception.getCause());
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
if (handler == null) {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}
}

private void reconnect() {
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
listerWatcher.getWatchReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
InformerExceptionHandler handler = exceptionHandlerSupplier.get();
if (handler == null || handler.retry(t)) {
reconnect();
}
}
});
}

@Override
public void onClose() {
watchStopped();
Expand Down

0 comments on commit 69aa906

Please sign in to comment.