Skip to content

Commit

Permalink
fix fabric8io#4408 allowing for runnable informers to retry starting
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Oct 4, 2022
1 parent 3eb233f commit 9f7b4a9
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 31 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -15,6 +15,7 @@
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.
* Fix #4396: Provide more error context when @Group/@Version annotations are missing
* Fix #4384: The Java generator now supports the generation of specific annotations (min, max, pattern, etc.), as defined by #4348
* Fix #4408: Allowing informers started via the start() method to retry indefinitely rather than making only a single attempt.
* Fix #3864: Change ManagedOpenShiftClient OSGi ConfigurationPolicy to REQUIRE

#### Dependency Upgrade
Expand Down
Expand Up @@ -159,21 +159,43 @@ default boolean hasSynced() {
SharedIndexInformer<T> initialState(Stream<T> items);

SharedIndexInformer<T> itemStore(ItemStore<T> itemStore);

/**
* A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called.
* <br>
* Only one start attempt is made - subsequent calls will not re-start the informer.
* Only one start attempt is made - subsequent calls will not re-start the informer.
* The future will be completed normally once the Informer starts watching successfully for the first time.
* If the informer does not start successfully the first time, the future will complete exceptionally.
* <p>
* Canceling this future has no effect on the informer.
* <p>
* If the informer has already been started, then a completed future will be returned.
* <p>
* see also {@link #forceStart()}
*/
CompletableFuture<Void> start();

/**
* A non-blocking alternative to run. Starts the shared informer, which will be stopped when {@link #stop()} is called.
* For situations where it's not known if list or watch operations will complete normally on their first attempt - such as the
* server being unavailable, permissions not yet available, or CRD not yet installed.
* <p>
* The future will be completed normally once the Informer starts watching successfully for the first time.
* If the informer stops manually or exceptionally before starting, the future will be completed with an exception.
* <p>
* Canceling this future has no effect on the informer.
*/
CompletableFuture<Void> forceStart();

/**
* Return a future that will allow notification of informer stopping.
* <p>
* If {@link #stop()} is called, the future will be completed with a null value.
* <p>
* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception
* - typically a {@link WatcherException}
* <p>
* Canceling this future has no effect on the informer.
*/
CompletableFuture<Void> stopped();
}
Expand Up @@ -868,7 +868,7 @@ public CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition
CompletableFuture<List<T>> future = new CompletableFuture<>();

// create an informer that supplies the tester with events and empty list handling
SharedIndexInformer<T> informer = this.createInformer(0, Runnable::run);
DefaultSharedIndexInformer<T, L> informer = this.createInformer(0, Runnable::run);

// prevent unnecessary watches and handle closure
future.whenComplete((r, t) -> informer.stop());
Expand Down Expand Up @@ -905,7 +905,7 @@ public void onUpdate(T oldObj, T newObj) {
public void onNothing() {
test.accept(informer.getStore().list());
}
}).start().whenComplete((v, t) -> {
}).start(false).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
}
Expand Down
Expand Up @@ -96,7 +96,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
* @param handler event handler
*/
@Override
public SharedIndexInformer<T> addEventHandler(ResourceEventHandler<? super T> handler) {
public DefaultSharedIndexInformer<T, L> addEventHandler(ResourceEventHandler<? super T> handler) {
addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod);
return this;
}
Expand Down Expand Up @@ -144,6 +144,21 @@ public String lastSyncResourceVersion() {

@Override
public CompletableFuture<Void> start() {
return start(false);
}

@Override
public CompletableFuture<Void> forceStart() {
return start(true);
}

/**
* Start the informer
*
* @param reconnect if true, keep trying to start. if false, try only once
* @return the started future from the reflector
*/
public CompletableFuture<Void> start(boolean reconnect) {
if (stopped) {
throw new IllegalStateException("Cannot restart a stopped informer");
}
Expand All @@ -161,19 +176,12 @@ public CompletableFuture<Void> start() {

scheduleResync(processor::shouldResync);

return reflector.start().whenComplete((v, t) -> {
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
if (stopped && reflector.isRunning()) {
stop();
}
}
});
return reflector.start(reconnect);
}

@Override
public SharedIndexInformer<T> run() {
Utils.waitUntilReadyOrFail(start(), -1, TimeUnit.MILLISECONDS);
Utils.waitUntilReadyOrFail(start(false), -1, TimeUnit.MILLISECONDS);
return this;
}

Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers.impl.cache;

import com.fasterxml.jackson.core.JsonProcessingException;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.ListOptionsBuilder;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class Reflector<T extends HasMetadata, L extends KubernetesResourceList<T
private volatile boolean watching;
private volatile CompletableFuture<Watch> watchFuture;
private volatile CompletableFuture<?> reconnectFuture;
private final CompletableFuture<Void> startFuture = new CompletableFuture<>();
private final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
private final ExponentialBackoffIntervalCalculator retryIntervalCalculator;

Expand All @@ -65,20 +67,14 @@ public CompletableFuture<Void> start() {

public CompletableFuture<Void> start(boolean reconnect) {
this.running = true;
CompletableFuture<Void> result = listSyncAndWatch(reconnect);
if (!reconnect) {
result.whenComplete((v, t) -> {
if (t != null) {
stopFuture.completeExceptionally(t);
}
});
}
return result;
listSyncAndWatch(reconnect);
return startFuture;
}

public void stop() {
running = false;
stopFuture.complete(null);
startFuture.completeExceptionally(new KubernetesClientException("informer manually stopped before starting"));
Future<?> future = reconnectFuture;
if (future != null) {
future.cancel(true);
Expand Down Expand Up @@ -129,16 +125,23 @@ public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
}
}
});
if (reconnect) {
theFuture.whenComplete((v, t) -> {
if (t != null) {
theFuture.whenComplete((v, t) -> {
if (t != null) {
if (reconnect) {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
reconnect();
} else {
running = false;
startFuture.completeExceptionally(t);
stopFuture.completeExceptionally(t);
}
} else {
startFuture.complete(null);
if (reconnect) {
retryIntervalCalculator.resetReconnectAttempts();
}
});
}
}
});
return theFuture;
}

Expand All @@ -157,7 +160,20 @@ private CompletableFuture<L> processList(Set<String> nextKeys, String continueVa
.submitList(
new ListOptionsBuilder().withLimit(listerWatcher.getLimit()).withContinue(continueVal)
.build());

futureResult.whenComplete((l, t) -> {
while (t instanceof RuntimeException && !(t instanceof ClassCastException)) {
Throwable cause = ((RuntimeException) t).getCause();
if (cause == t) {
break;
}
t = cause;
}
if (t instanceof ClassCastException || t instanceof JsonProcessingException) {
// we'll interpret this as a hard failure, just like in the watch case
this.stopFuture.completeExceptionally(t);
this.stop();
}
});
return futureResult.thenCompose(result -> {
result.getItems().forEach(i -> {
String key = store.getKey(i);
Expand Down
Expand Up @@ -17,11 +17,15 @@

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.PodListBuilder;
import io.fabric8.kubernetes.client.Watch;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -45,6 +49,15 @@ private DefaultSharedIndexInformer<Pod, PodList> createDefaultSharedIndexInforme
return defaultSharedIndexInformer;
}

@BeforeEach
void beforeEach() {
Mockito.when(listerWatcher.submitWatch(Mockito.any(), Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class)));
PodList result = new PodListBuilder().withNewMetadata().endMetadata().build();
Mockito.when(listerWatcher.submitList(Mockito.any()))
.thenReturn(CompletableFuture.completedFuture(result));
}

@AfterEach
void afterEach() {
if (defaultSharedIndexInformer != null) {
Expand Down
Expand Up @@ -58,7 +58,8 @@ protected void reconnect() {

CompletableFuture<Void> future = reflector.start(true);

assertThrows(CompletionException.class, future::join);
// since we're reconnecting, but have overridden reconnect, this won't be done
assertFalse(future.isDone());

// running but watch failed
assertFalse(reflector.isWatching());
Expand All @@ -69,6 +70,8 @@ protected void reconnect() {
assertTrue(reflector.isWatching());
assertTrue(reflector.isRunning());
assertFalse(reflector.getStopFuture().isDone());
assertTrue(future.isDone());
assertTrue(!future.isCompletedExceptionally());

reflector.stop();

Expand All @@ -77,6 +80,26 @@ protected void reconnect() {
assertTrue(reflector.getStopFuture().isDone());
}

@Test
void testNotRunningAfterStartError() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));

Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, Mockito.mock(SyncableStore.class));

// throw an exception, then watch normally
Mockito.when(mock.submitWatch(Mockito.any(), Mockito.any()))
.thenThrow(new KubernetesClientException("error"));

// single start
CompletableFuture<Void> future = reflector.start(false);

assertThrows(CompletionException.class, future::join);

assertFalse(reflector.isRunning());
}

@Test
void testNonHttpGone() {
ListerWatcher<Pod, PodList> mock = Mockito.mock(ListerWatcher.class);
Expand Down
Expand Up @@ -69,7 +69,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.function.Function;

Expand Down

0 comments on commit 9f7b4a9

Please sign in to comment.