Skip to content

Commit

Permalink
fix #4369: narrowing the changes to only a stopped future
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 6, 2022
1 parent 401b2e3 commit 1b76eae
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 105 deletions.

This file was deleted.

Expand Up @@ -166,6 +166,9 @@ default boolean hasSynced() {
*/
CompletableFuture<Void> start();

default void setExceptionHandler(InformerExceptionHandler handler) {
}
/**
* Return a future that will allow notification of informer stopping.
*
*/
CompletableFuture<Void> stopped();
}
Expand Up @@ -18,7 +18,6 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand Down Expand Up @@ -291,8 +290,8 @@ public String toString() {
}

@Override
public void setExceptionHandler(InformerExceptionHandler handler) {
this.reflector.setExceptionHandler(handler);
this.processor.setExceptionHandler(handler);
public CompletableFuture<Void> stopped() {
return this.reflector.getStopFuture();
}

}
Expand Up @@ -23,8 +23,6 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.internal.WatchConnectionManager;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.impl.ListerWatcher;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.kubernetes.client.utils.internal.ExponentialBackoffIntervalCalculator;
Expand All @@ -51,7 +49,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile CompletableFuture<?> reconnectFuture;
private volatile InformerExceptionHandler handler;
private volatile CompletableFuture<Void> stopFuture = new CompletableFuture<Void>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;

public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
Expand All @@ -68,6 +66,7 @@ public CompletableFuture<Void> start() {

public void stop() {
running = false;
stopFuture.complete(null);
Future<?> future = reconnectFuture;
if (future != null) {
future.cancel(true);
Expand Down Expand Up @@ -118,10 +117,6 @@ public CompletableFuture<Void> listSyncAndWatch() {
log.debug("Watch started for {}", Reflector.this);
}
watching = true;
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onWatching();
}
} else {
stopWatch(w);
}
Expand Down Expand Up @@ -223,23 +218,16 @@ public void onClose(WatcherException exception) {
// this close was triggered by an exception,
// not the user, it is expected that the watch retry will handle this
watchStopped();
InformerExceptionHandler theHandler = handler;
boolean reconnect = false;
if (exception.isHttpGone()) {
if (log.isDebugEnabled()) {
log.debug("Watch restarting due to http gone for {}", Reflector.this);
}
reconnect = true;
} else if (theHandler != null) {
reconnect = !theHandler.shouldStop(exception.getCause());
} else {
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
if (reconnect) {
// start a whole new list/watch cycle
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
stopFuture.completeExceptionally(exception.getCause());
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}

Expand All @@ -253,12 +241,7 @@ private void reconnect() {
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onException(EventType.LIST_OR_WATCH, t);
} else {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
}
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
reconnect();
} else {
retryIntervalCalculator.resetReconnectAttempts();
Expand Down Expand Up @@ -287,7 +270,8 @@ public String toString() {
return listerWatcher.getApiEndpointPath();
}

public void setExceptionHandler(InformerExceptionHandler handler) {
this.handler = handler;
public CompletableFuture<Void> getStopFuture() {
return stopFuture;
}

}
Expand Up @@ -15,8 +15,6 @@
*/
package io.fabric8.kubernetes.client.informers.impl.cache;

import io.fabric8.kubernetes.client.informers.InformerExceptionHandler;
import io.fabric8.kubernetes.client.informers.InformerExceptionHandler.EventType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.utils.internal.SerialExecutor;
import org.slf4j.Logger;
Expand Down Expand Up @@ -53,8 +51,6 @@ public class SharedProcessor<T> {
private final SerialExecutor executor;
private final String informerDescription;

private volatile InformerExceptionHandler handler;

public SharedProcessor() {
this(Runnable::run, "informer");
}
Expand Down Expand Up @@ -113,13 +109,8 @@ public void distribute(Consumer<ProcessorListener<T>> operation, boolean isSync)
try {
operation.accept(listener);
} catch (Exception ex) {
InformerExceptionHandler theHandler = handler;
if (theHandler != null) {
theHandler.onException(EventType.HANDLER, ex);
} else {
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
log.error("{} failed invoking {} event handler: {}", informerDescription, listener.getHandler(), ex.getMessage(),
ex);
}
}
});
Expand Down Expand Up @@ -179,9 +170,4 @@ public ProcessorListener<T> addProcessorListener(ResourceEventHandler<? super T>
lock.writeLock().unlock();
}
}

public void setExceptionHandler(InformerExceptionHandler handler) {
this.handler = handler;
}

}

0 comments on commit 1b76eae

Please sign in to comment.