Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SharedInformer and SharedInformerFactory setDebugItems #3121

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
package io.kubernetes.client.informer;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.cache.ReflectorRunnable;

/*
* SharedInformer defines basic methods of a informer.
/**
* Defines basic methods of an informer.
*/
public interface SharedInformer<ApiType extends KubernetesObject> {

Expand Down Expand Up @@ -69,4 +70,14 @@ public interface SharedInformer<ApiType extends KubernetesObject> {
* @param transformFunc the transform function
*/
void setTransform(TransformFunc transformFunc);

/**
* Toggles DEBUG of initial and streamed items on the {@link ReflectorRunnable} logger.
*
* <p>Note: This can cause a high volume of logging even in a small cluster, depending on rate of
* change.
*
* @param shouldDebugItems whether initial and streamed items should be logged at DEBUG level.
*/
default void setDebugItems(boolean shouldDebugItems) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public class SharedInformerFactory {

private ApiClient apiClient;

private boolean shouldDebugItems; // guarded by this

/** Constructor w/ default thread pool. */
/** DEPRECATE: In favor of explicit apiClient constructor to avoid misguiding */
@Deprecated
Expand Down Expand Up @@ -170,6 +172,17 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
return sharedIndexInformerFor(listerWatcher, apiTypeClass, resyncPeriodInMillis, null);
}

/**
* Toggles {@linkplain SharedInformer#setDebugItems} on any existing or new
* informer created by this factory.
*
* @param shouldDebugItems whether initial and streamed items should be logged at DEBUG level.
*/
public synchronized void setDebugItems(boolean shouldDebugItems) {
this.informers.values().forEach(i -> i.setDebugItems(shouldDebugItems));
this.shouldDebugItems = shouldDebugItems;
}

public synchronized <ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
SharedIndexInformer<ApiType> sharedIndexInformerFor(
ListerWatcher<ApiType, ApiListType> listerWatcher,
Expand All @@ -180,6 +193,7 @@ SharedIndexInformer<ApiType> sharedIndexInformerFor(
SharedIndexInformer<ApiType> informer =
new DefaultSharedIndexInformer<>(
apiTypeClass, listerWatcher, resyncPeriodInMillis, new Cache<>(), exceptionHandler);
informer.setDebugItems(shouldDebugItems);
this.informers.putIfAbsent(TypeToken.get(apiTypeClass).getType(), informer);
return informer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.informer.ResyncRunnable;
import io.kubernetes.client.informer.SharedInformer;
import io.kubernetes.client.util.Threads;
import java.util.Deque;
import java.util.concurrent.Executors;
Expand All @@ -32,7 +33,7 @@

/**
* Controller is a java port of k/client-go's informer#Controller. It plumbs reflector and the queue
* implementation and it runs re-sync function periodically.
* implementation. Then, it runs re-sync function periodically.
*/
public class Controller<
ApiType extends KubernetesObject, ApiListType extends KubernetesListObject> {
Expand Down Expand Up @@ -66,6 +67,8 @@ public class Controller<

private ScheduledFuture reflectorFuture;

private boolean shouldDebugItems; // guarded by this

/* visible for testing */ BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

public Controller(
Expand Down Expand Up @@ -114,6 +117,18 @@ public Controller(
this(apiTypeClass, queue, listerWatcher, popProcessFunc, null, 0);
}

/**
* Implements {@link SharedInformer#setDebugItems}
*
* @param shouldDebugItems whether initial and streamed items should be logged at DEBUG level.
*/
public synchronized void setDebugItems(boolean shouldDebugItems) {
if (reflector != null) {
reflector.setDebugItems(shouldDebugItems);
}
this.shouldDebugItems = shouldDebugItems;
}

public void run() {
log.info("informer#Controller: ready to run resync & reflector runnable");

Expand All @@ -130,6 +145,7 @@ public void run() {
synchronized (this) {
// TODO(yue9944882): proper naming for reflector
reflector = newReflector();
reflector.setDebugItems(shouldDebugItems);
try {
reflectorFuture =
reflectExecutor.scheduleWithFixedDelay(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.EventType;
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.informer.SharedInformer;
import io.kubernetes.client.informer.exception.WatchExpiredException;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ListMeta;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.CallGeneratorParams;
import io.kubernetes.client.util.Strings;
import io.kubernetes.client.util.Watch.Response;
import io.kubernetes.client.util.Watchable;
import java.io.IOException;
import java.net.ConnectException;
Expand All @@ -38,9 +40,9 @@ public class ReflectorRunnable<
ApiType extends KubernetesObject, ApiListType extends KubernetesListObject>
implements Runnable {

public static Duration REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT = Duration.ofMinutes(5);
public static final Duration REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT = Duration.ofMinutes(5);

public static Duration REFLECTOR_WATCH_CLIENTSIDE_MAX_TIMEOUT = Duration.ofMinutes(5 * 2);
public static final Duration REFLECTOR_WATCH_CLIENTSIDE_MAX_TIMEOUT = Duration.ofMinutes(5 * 2);

private static final Logger log = LoggerFactory.getLogger(ReflectorRunnable.class);

Expand All @@ -50,13 +52,16 @@ public class ReflectorRunnable<

private Watchable<ApiType> watch;

private ListerWatcher<ApiType, ApiListType> listerWatcher;
private final ListerWatcher<ApiType, ApiListType> listerWatcher;

private DeltaFIFO store;
private final DeltaFIFO store;

private Class<ApiType> apiTypeClass;
private final Class<ApiType> apiTypeClass;
private final String apiTypeClassName;

private AtomicBoolean isActive = new AtomicBoolean(true);
private final AtomicBoolean isActive = new AtomicBoolean(true);

private volatile boolean shouldDebugItems; // volatile as set and read from different threads.

/* visible for testing */ final BiConsumer<Class<ApiType>, Throwable> exceptionHandler;

Expand All @@ -75,6 +80,7 @@ public ReflectorRunnable(
this.listerWatcher = listerWatcher;
this.store = store;
this.apiTypeClass = apiTypeClass;
this.apiTypeClassName = apiTypeClass.getSimpleName();
this.exceptionHandler =
exceptionHandler == null ? ReflectorRunnable::defaultWatchErrorHandler : exceptionHandler;
}
Expand All @@ -84,7 +90,7 @@ public ReflectorRunnable(
* resource version to watch.
*/
public void run() {
log.info("{}#Start listing and watching...", apiTypeClass);
log.info("{}#Start listing and watching...", apiTypeClassName);

try {
ApiListType list =
Expand All @@ -95,27 +101,24 @@ public void run() {
String resourceVersion = listMeta.getResourceVersion();
List<? extends KubernetesObject> items = list.getItems();

if (log.isDebugEnabled()) {
log.debug("{}#Extract resourceVersion {} list meta", apiTypeClass, resourceVersion);
log.debug("{}#Extract resourceVersion {} list meta", apiTypeClassName, resourceVersion);
if (shouldDebugItems) {
log.debug("{}#Initial items {}", apiTypeClassName, items);
}
this.syncWith(items, resourceVersion);
this.lastSyncResourceVersion = resourceVersion;
this.isLastSyncResourceVersionUnavailable = false;

if (log.isDebugEnabled()) {
log.debug("{}#Start watching with {}...", apiTypeClass, lastSyncResourceVersion);
}
log.debug("{}#Start watching with {}...", apiTypeClassName, lastSyncResourceVersion);
while (true) {
if (!isActive.get()) {
closeWatch();
return;
}

try {
if (log.isDebugEnabled()) {
log.debug(
"{}#Start watch with resource version {}", apiTypeClass, lastSyncResourceVersion);
}
log.debug(
"{}#Start watch with resource version {}", apiTypeClassName, lastSyncResourceVersion);

long jitteredWatchTimeoutSeconds =
Double.valueOf(REFLECTOR_WATCH_CLIENTSIDE_TIMEOUT.getSeconds() * (1 + Math.random()))
Expand All @@ -135,19 +138,22 @@ public void run() {
watch = newWatch;
}
watchHandler(newWatch);
if (shouldDebugItems) {
log.debug("{}#Exhausted items", apiTypeClassName);
}
} catch (WatchExpiredException e) {
// Watch calls were failed due to expired resource-version. Returning
// to unwind the list-watch loops so that we can respawn a new round
// of list-watching.
log.info("{}#Watch expired, will re-list-watch soon", this.apiTypeClass);
log.info("{}#Watch expired, will re-list-watch soon", this.apiTypeClassName);
return;
} catch (Throwable t) {
if (isConnectException(t)) {
// If this is "connection refused" error, it means that most likely
// apiserver is not responsive. It doesn't make sense to re-list all
// objects because most likely we will be able to restart watch where
// we ended. If that's the case wait and resend watch request.
log.info("{}#Watch get connect exception, retry watch", this.apiTypeClass);
log.info("{}#Watch get connect exception, retry watch", this.apiTypeClassName);
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
Expand All @@ -158,7 +164,7 @@ public void run() {
if ((t instanceof RuntimeException)
&& t.getMessage() != null
&& t.getMessage().contains("IO Exception during hasNext")) {
log.info("{}#Read timeout retry list and watch", this.apiTypeClass);
log.info("{}#Read timeout retry list and watch", apiTypeClassName);
// IO timeout should be taken as a normal case
return;
}
Expand All @@ -171,7 +177,8 @@ public void run() {
} catch (ApiException e) {
if (e.getCode() == HttpURLConnection.HTTP_GONE) {
log.info(
"ResourceVersion {} expired, will retry w/o resourceVersion at the next time",
"{}#ResourceVersion {} expired, will retry w/o resourceVersion at the next time",
apiTypeClassName,
getRelistResourceVersion());
isLastSyncResourceVersionUnavailable = true;
} else {
Expand Down Expand Up @@ -229,26 +236,39 @@ private String getRelistResourceVersion() {
return lastSyncResourceVersion;
}

/**
* Implements {@link SharedInformer#setDebugItems}
*
* @param shouldDebugItems whether initial and streamed items should be logged at DEBUG level.
*/
public void setDebugItems(boolean shouldDebugItems) {
this.shouldDebugItems = shouldDebugItems;
}

private void watchHandler(Watchable<ApiType> watch) {
while (watch.hasNext()) {
io.kubernetes.client.util.Watch.Response<ApiType> item = watch.next();
Response<ApiType> item = watch.next();
if (shouldDebugItems) {
log.debug("{}#Next item {}", apiTypeClassName, item.object);
}

Optional<EventType> eventType = EventType.findByType(item.type);
if (!eventType.isPresent()) {
log.error("unrecognized event {}", item);
log.error("{}#Unrecognized event {}", apiTypeClassName, item);
continue;
}
if (eventType.get() == EventType.ERROR) {
if (item.status != null && item.status.getCode() == HttpURLConnection.HTTP_GONE) {
log.info(
"ResourceVersion {} and Watch connection expired: {} , will retry w/o resourceVersion next time",
"{}#ResourceVersion {} and Watch connection expired: {}, will retry w/o resourceVersion next time",
apiTypeClassName,
getRelistResourceVersion(),
item.status.getMessage());
isLastSyncResourceVersionUnavailable = true;
throw new WatchExpiredException();
} else {
String errorMessage =
String.format("got ERROR event and its status: %s", item.status.toString());
String.format("%s#Got ERROR event and its status: %s", apiTypeClassName, item.status);
log.error(errorMessage);
throw new RuntimeException(errorMessage);
}
Expand All @@ -274,9 +294,7 @@ private void watchHandler(Watchable<ApiType> watch) {
// A `Bookmark` means watch has synced here, just update the resourceVersion
}
lastSyncResourceVersion = newResourceVersion;
if (log.isDebugEnabled()) {
log.debug("{}#Receiving resourceVersion {}", apiTypeClass, lastSyncResourceVersion);
}
log.debug("{}#Receiving resourceVersion {}", apiTypeClassName, lastSyncResourceVersion);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@ public void setTransform(TransformFunc transformFunc) {
this.transform = transformFunc;
}

@Override
public void setDebugItems(boolean shouldDebugItems) {
this.controller.setDebugItems(shouldDebugItems);
}

@Override
public void run() {
if (started) {
Expand Down