Skip to content

Commit

Permalink
fix #4369: refining the watch and informer reconnect and adding tests
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Sep 7, 2022
1 parent 2c8b5fa commit db5ba0e
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 58 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -9,7 +9,7 @@
* Fix #4256: crd-generator-apt pom.xml includes transitive dependencies
* Fix #4294: crd-generator respects JsonIgnore annotations on enum properties
* Fix #4320: corrected leader transitions field on leader election leases

* Fix #4369: Informers will retry with a backoff on list/watch failure as they did in 5.12 and prior.

#### Improvements
* Fix #887: added KubernetesClient.visitResources to search and perform other operations across all resources.
Expand All @@ -21,6 +21,7 @@
* Fix #4287: added WorkloadGroup for Istio v1alpha3 extension generator
* Fix #4318: implemented LeaderElection releaseOnCancel
* Fix #3960: adding a KubernetesMockServer.expectCustomResource helper method and additional mock crd support
* Fix #4365: The Watch retry logic will handle more cases, as well as perform an exceptional close for events that are not properly handled. Informers can directly provide those exceptional outcomes via the SharedIndexInformer.stopped CompletableFuture.

#### Dependency Upgrade
* Bump Knative model to v0.34.0
Expand Down
Expand Up @@ -37,14 +37,14 @@ public WatcherException(String message, Throwable cause, String rawWatchMessage)

public KubernetesClientException asClientException() {
final Throwable cause = getCause();
return cause instanceof KubernetesClientException ?
(KubernetesClientException) cause : new KubernetesClientException(getMessage(), cause);
return cause instanceof KubernetesClientException ? (KubernetesClientException) cause
: new KubernetesClientException(getMessage(), cause);
}

public boolean isHttpGone() {
final KubernetesClientException cause = asClientException();
return cause.getCode() == HttpURLConnection.HTTP_GONE
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE);
return cause != null && (cause.getCode() == HttpURLConnection.HTTP_GONE
|| (cause.getStatus() != null && cause.getStatus().getCode() == HttpURLConnection.HTTP_GONE));
}

@SuppressWarnings("unused")
Expand Down
Expand Up @@ -15,6 +15,7 @@
*/
package io.fabric8.kubernetes.client.informers;

import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
import io.fabric8.kubernetes.client.informers.cache.ItemStore;
Expand Down Expand Up @@ -168,7 +169,11 @@ default boolean hasSynced() {

/**
* Return a future that will allow notification of informer stopping.
*
* <p>
* If {@link #stop()} is called, the future will be completed with a null value.
* <p>
* If an exception occurs that terminates the informer, then it will be exceptionally completed with that exception
* - typically a {@link WatcherException}
*/
CompletableFuture<Void> stopped();
}
@@ -0,0 +1,41 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client;

import io.fabric8.kubernetes.api.model.Status;
import org.junit.jupiter.api.Test;

import java.net.HttpURLConnection;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

class WatcherExceptionTest {

@Test
void testIsHttpGone() {
WatcherException we = new WatcherException("I've failed");
assertFalse(we.isHttpGone());

we = new WatcherException("I've failed", new ClassCastException());
assertFalse(we.isHttpGone());

we = new WatcherException("I've failed",
new KubernetesClientException("http gone", HttpURLConnection.HTTP_GONE, new Status()));
assertTrue(we.isHttpGone());
}

}
Expand Up @@ -226,7 +226,7 @@ public void close() {
}

private WatchEvent contextAwareWatchEventDeserializer(String messageSource)
throws JsonProcessingException {
throws JsonProcessingException {
try {
return Serialization.unmarshal(messageSource, WatchEvent.class);
} catch (Exception ex1) {
Expand Down Expand Up @@ -272,18 +272,23 @@ protected void onMessage(String message) {
try {
WatchEvent event = readWatchEvent(message);
Object object = event.getObject();
if (object instanceof Status) {
Status status = (Status) object;
Action action = Action.valueOf(event.getType());
if (action == Action.ERROR) {
if (object instanceof Status) {
Status status = (Status) object;

onStatus(status);
onStatus(status);
} else {
logger.error("Error received, but object is not a status - will retry");
closeRequest();
}
} else if (object instanceof HasMetadata) {
HasMetadata hasMetadata = (HasMetadata) object;
updateResourceVersion(hasMetadata.getMetadata().getResourceVersion());
Action action = Action.valueOf(event.getType());

if(object instanceof KubernetesResourceList) {
if (object instanceof KubernetesResourceList) {
// Dirty cast - should always be valid though
@SuppressWarnings({"rawtypes"})
@SuppressWarnings({ "rawtypes" })
KubernetesResourceList list = (KubernetesResourceList) hasMetadata;
@SuppressWarnings("unchecked")
List<HasMetadata> items = list.getItems();
Expand All @@ -296,17 +301,18 @@ protected void onMessage(String message) {
eventReceived(action, hasMetadata);
}
} else {
logger.error("Unknown message received: {}", message);
final String msg = String.format("Invalid object received: %s", message);
close(new WatcherException(msg, null, message));
}
} catch (ClassCastException e) {
final String msg = "Received wrong type of object for watch";
close(new WatcherException(msg, e));
close(new WatcherException(msg, e, message));
} catch (JsonProcessingException e) {
final String msg = "Couldn't deserialize watch event: " + message;
close(new WatcherException(msg, e, message));
} catch (Exception e) {
final String msg = "Unhandled exception encountered in watcher event handler";
close(new WatcherException(msg, e));
close(new WatcherException(msg, e, message));
}
}

Expand All @@ -317,8 +323,8 @@ protected boolean onStatus(Status status) {
return true;
}

eventReceived(Action.ERROR, null);
logger.error("Error received: {}", status);
logger.error("Error received: {}, will retry", status);
closeRequest();
return false;
}

Expand Down
Expand Up @@ -62,7 +62,7 @@ public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {

public CompletableFuture<Void> start() {
this.running = true;
return listSyncAndWatch();
return listSyncAndWatch(false);
}

public void stop() {
Expand Down Expand Up @@ -98,33 +98,50 @@ private synchronized void stopWatcher() {
*
* @return a future that completes when the list and watch are established
*/
public CompletableFuture<Void> listSyncAndWatch() {
public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
if (!running) {
return CompletableFuture.completedFuture(null);
}
Set<String> nextKeys = new ConcurrentSkipListSet<>();
return processList(nextKeys, null).thenAccept(result -> {
CompletableFuture<Void> theFuture = processList(nextKeys, null).thenCompose(result -> {
store.retainAll(nextKeys);
final String latestResourceVersion = result.getMetadata().getResourceVersion();
lastSyncResourceVersion = latestResourceVersion;
log.debug("Listing items ({}) for {} at v{}", nextKeys.size(), this, latestResourceVersion);
CompletableFuture<Watch> started = startWatcher(latestResourceVersion);
if (started != null) {
// outside of the lock
started.whenComplete((w, t) -> {
if (w != null) {
if (running) {
if (log.isDebugEnabled()) {
log.debug("Watch started for {}", Reflector.this);
}
watching = true;
} else {
stopWatch(w);
}
return startWatcher(latestResourceVersion);
}).thenAccept(w -> {
if (w != null) {
if (running) {
if (log.isDebugEnabled()) {
log.debug("Watch started for {}", Reflector.this);
}
});
watching = true;
} else {
stopWatch(w);
}
}
});
if (reconnect) {
theFuture.whenComplete((v, t) -> {
if (t != null) {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
reconnect();
} else {
retryIntervalCalculator.resetReconnectAttempts();
}
});
}
return theFuture;
}

private void reconnect() {
if (!running) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, () -> listSyncAndWatch(true),
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
}

private CompletableFuture<L> processList(Set<String> nextKeys, String continueVal) {
Expand Down Expand Up @@ -155,7 +172,7 @@ private void stopWatch(Watch w) {

private synchronized CompletableFuture<Watch> startWatcher(final String latestResourceVersion) {
if (!running) {
return null;
return CompletableFuture.completedFuture(null);
}
log.debug("Starting watcher for {} at v{}", this, latestResourceVersion);
// there's no need to stop the old watch, that will happen automatically when this call completes
Expand Down Expand Up @@ -227,29 +244,11 @@ public void onClose(WatcherException exception) {
reconnect();
} else {
running = false; // shouldn't happen, but it means the watch won't restart
stopFuture.completeExceptionally(exception.getCause());
stopFuture.completeExceptionally(exception);
log.warn("Watch closing with exception for {}", Reflector.this, exception);
}
}

private void reconnect() {
if (!running) {
return;
}
// this can be run in the scheduler thread because
// any further operations will happen on the io thread
reconnectFuture = Utils.schedule(Runnable::run, Reflector.this::listSyncAndWatch,
retryIntervalCalculator.nextReconnectInterval(), TimeUnit.MILLISECONDS);
reconnectFuture.whenComplete((v, t) -> {
if (t != null) {
log.warn("listSyncAndWatch failed for {}, will retry", Reflector.this, t);
reconnect();
} else {
retryIntervalCalculator.resetReconnectAttempts();
}
});
}

@Override
public void onClose() {
watchStopped();
Expand Down
Expand Up @@ -59,15 +59,17 @@ void testStateFlags() {
assertFalse(reflector.isWatching());
assertTrue(reflector.isRunning());

reflector.listSyncAndWatch().join();
reflector.listSyncAndWatch(false).join();

assertTrue(reflector.isWatching());
assertTrue(reflector.isRunning());
assertFalse(reflector.getStopFuture().isDone());

reflector.stop();

assertFalse(reflector.isWatching());
assertFalse(reflector.isRunning());
assertTrue(reflector.getStopFuture().isDone());
}

@Test
Expand Down
Expand Up @@ -36,6 +36,7 @@
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder;
import io.fabric8.kubernetes.client.CustomResourceList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
Expand Down Expand Up @@ -83,7 +84,8 @@ class DefaultSharedIndexInformerTest {
.withMessage(
"410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
.build();
static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build();
static final WatchEvent outdatedEvent = new WatchEventBuilder().withType(Watcher.Action.ERROR.name())
.withStatusObject(outdatedStatus).build();
static final Long WATCH_EVENT_EMIT_TIME = 1L;
static final Long OUTDATED_WATCH_EVENT_EMIT_TIME = 1L;
static final long RESYNC_PERIOD = 5L;
Expand Down Expand Up @@ -844,6 +846,11 @@ void testReconnectAfterOnCloseException() throws InterruptedException {
.andEmit(outdatedEvent)
.done().always();

// re-list errors
server.expect().withPath("/api/v1/pods")
.andReturn(HttpURLConnection.HTTP_FORBIDDEN, new Status())
.times(2);

// re-list
server.expect().withPath("/api/v1/pods")
.andReturn(200,
Expand Down
Expand Up @@ -315,7 +315,7 @@ public void onClose(WatcherException cause) {
}

private static WatchEvent outdatedEvent() {
return new WatchEventBuilder().withStatusObject(
return new WatchEventBuilder().withType(Watcher.Action.ERROR.name()).withStatusObject(
new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE)
.withMessage(
"410: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
Expand Down

0 comments on commit db5ba0e

Please sign in to comment.