Skip to content

Commit

Permalink
fix #3569: removing all api references to customResource
Browse files Browse the repository at this point in the history
further minimizing SharedInformerFactoryImpl concerns and removing the
use of the executor
  • Loading branch information
shawkins committed May 17, 2022
1 parent ecc27c3 commit 202253a
Show file tree
Hide file tree
Showing 52 changed files with 701 additions and 891 deletions.
2 changes: 1 addition & 1 deletion doc/CHEATSHEET.md
Expand Up @@ -2032,7 +2032,7 @@ podInformer.addEventHandler(new ResourceEventHandler<Pod>() {
```
- Create `SharedIndexInformer` for some Custom Resource(in our case, `Dummy` resource provided in our [examples](https://github.com/fabric8io/kubernetes-client/tree/master/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/crds). By default it watches in all namespaces.
```java
SharedIndexInformer<Dummy> dummyInformer = sharedInformerFactory.sharedIndexInformerForCustomResource(Dummy.class, 60 * 1000L);
SharedIndexInformer<Dummy> dummyInformer = sharedInformerFactory.sharedIndexInformerFor(Dummy.class, 60 * 1000L);
dummyInformer.addEventHandler(new ResourceEventHandler<Dummy>() {
@Override
public void onAdd(Dummy dummy) {
Expand Down
18 changes: 17 additions & 1 deletion doc/FAQ.md
Expand Up @@ -16,4 +16,20 @@ Further more you will also have choices in the HttpClient that is utilized.

By default kubenetes-client has a runtime dependency on OkHttp (kubernetes-httpclient-okhttp). If you need to directly manipulate OkHttp, you add a compile dependency to kubernetes-httpclient-okhttp.

If you wish to use another HttpClient implementation typically you will exclude kubernetes-httpclient-okhttp and include the other runtime dependency instead.
If you wish to use another HttpClient implementation typically you will exclude kubernetes-httpclient-okhttp and include the other runtime dependency instead.

### What threading concerns are there?

There has been a lot of changes under the covers with thread utilization in the fabric8 client over the 5.x and 6.x releases. So the exact details of what threads are created / used where will depend on the particular release version.

At the core the thread utilization will depend upon the http client implementation. Per client OkHttp maintains a pool of threads for task execution. It will dedicate 2 threads out of that pool per WebSocket connection. If you have a lot of WebSocket usage (Informer or Watches) with OkHttp, you can expect to see a large number of threads in use - which can potentially exhaust the OkHttp defaults.

With the JDK http client it will only maintain a selector thread and a small worker pool which will be based upon your available processors per client. It does not matter how many Informers or Watches you run, the same worker pool is shared.

It is recommended with either http client that logic you supply via Watchers, ExecListeners, ResourceEventHandlers, Predicates, etc. do not execute long running tasks.

For non-ResourceEventHandlers call backs long-running operation can be a problem. When using the OkHttp client and default settings holding a IO thread inhibits websocket processing that can timeout the ping and may prevent additional requests since the okhttp client defaults to only 5 concurrent requests per host. When using the JDK http client the long running task will inhibit the use of that IO thread for ALL http processing. Note that calling other KubernetesClient operations, especially those with waits, can be long-running. We are working towards providing non-blocking mode for many of these operations, but until that is available consider using a separate task queue for such work.

On top of the http client threads the fabric8 client maintains a task thread pool for scheduled tasks and for potentially long-running tasks that are called from WebSocket operations, such as handling input and output streams and ResourceEventHandler call backs. This thread pool defaults to an unlimited number of cached threads, which will be shutdown when the client is closed - that is a sensible default with either http client as the amount of concurrently running async tasks will typically be low. If you would rather take full control over the threading use KubernetesClientBuilder.withExecutor or KubernetesClientBuilder.withExecutorSupplier - however note that constraining this thread pool too much will result in a build up of event processing queues.

Finally the fabric8 client will use 1 thread per PortForward and an additional thread per socket connection - this may be improved upon in the future.
Expand Up @@ -85,15 +85,13 @@
import io.fabric8.kubernetes.client.dsl.ServiceResource;
import io.fabric8.kubernetes.client.dsl.StorageAPIGroupDSL;
import io.fabric8.kubernetes.client.dsl.V1APIGroupDSL;
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder;
import io.fabric8.kubernetes.client.extended.run.RunOperations;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;

import java.io.InputStream;
import java.util.Collection;
import java.util.concurrent.ExecutorService;

/**
* Main interface for Kubernetes client library.
Expand Down Expand Up @@ -124,25 +122,6 @@ public interface KubernetesClient extends Client {
*/
CertificatesAPIGroupDSL certificates();

/**
* Typed API for managing CustomResources. You would need to provide POJOs for
* CustomResource into this and with it you would be able to instantiate a client
* specific to CustomResource.
*
* <p>
* Note: your CustomResource POJO (T in this context) must implement
* {@link io.fabric8.kubernetes.api.model.Namespaced} if it is a namespace-scoped resource.
* </p>
*
* @param resourceType Class for CustomResource
* @param <T> T type represents CustomResource type. If it's a namespaced resource, it must implement
* {@link io.fabric8.kubernetes.api.model.Namespaced}
* @return returns a MixedOperation object with which you can do basic CustomResource operations
* @deprecated use {@link #resources(Class)} instead
*/
@Deprecated
<T extends CustomResource> MixedOperation<T, KubernetesResourceList<T>, Resource<T>> customResources(Class<T> resourceType);

/**
* Typed API for managing resources. Any properly annotated POJO can be utilized as a resource.
*
Expand All @@ -161,65 +140,15 @@ default <T extends HasMetadata> MixedOperation<T, KubernetesResourceList<T>, Res
return resources(resourceType, null);
}

/**
* Typed API for managing CustomResources. You would need to provide POJOs for
* CustomResource into this and with it you would be able to instantiate a client
* specific to CustomResource.
*
* <p>
* Note: your CustomResource POJO (T in this context) must implement
* {@link io.fabric8.kubernetes.api.model.Namespaced} if it is a namespace-scoped resource.
* </p>
*
* @param resourceType Class for CustomResource
* @param listClass Class for list object for CustomResource
* @param <T> T type represents CustomResource type. If it's a namespace-scoped resource, it must implement
* {@link io.fabric8.kubernetes.api.model.Namespaced}
* @param <L> L type represents CustomResourceList type
* @return returns a MixedOperation object with which you can do basic CustomResource operations
* @deprecated use {@link #resources(Class, Class)} instead
*/
@Deprecated
<T extends CustomResource, L extends KubernetesResourceList<T>> MixedOperation<T, L, Resource<T>> customResources(
Class<T> resourceType, Class<L> listClass);

/**
* Typed API for managing CustomResources. You would need to provide POJOs for
* CustomResource into this and with it you would be able to instantiate a client
* specific to CustomResource.
*
* <p>
* Note: your CustomResource POJO (T in this context) must implement
* <a href=
* "https://github.com/fabric8io/kubernetes-client/blob/master/kubernetes-model-generator/kubernetes-model-core/src/main/java/io/fabric8/kubernetes/api/model/Namespaced.java">
* io.fabric8.kubernetes.api.model.Namespaced
* </a> if it is a Namespaced scoped resource.
* </p>
*
* @deprecated Since 5.x versions of client {@link CustomResourceDefinitionContext} is now configured via annotations
* inside POJOs, no need to provide it explicitly here.
* @param context ResourceDefinitionContext describes the core fields used to search for CustomResources
* @param resourceType Class for CustomResource
* @param listClass Class for list object for CustomResource
* @param <T> T type represents CustomResource type. If it's Namespaced resource, it must implement
* io.fabric8.kubernetes.api.model.Namespaced
* @param <L> L type represents CustomResourceList type
* @return returns a MixedOperation object with which you can do basic CustomResource operations
*/
@Deprecated
<T extends HasMetadata, L extends KubernetesResourceList<T>> MixedOperation<T, L, Resource<T>> customResources(
ResourceDefinitionContext context, Class<T> resourceType, Class<L> listClass);

/**
* Semi-Typed API for managing {@link GenericKubernetesResource}s which can represent any resource.
*
* @param context ResourceDefinitionContext describes the core fields
* @param context ResourceDefinitionContext describes the core metadata
* @return returns a MixedOperation object with which you can do basic operations
* @see #genericKubernetesResources(String, String) if you don't want to supply a complete {@link ResourceDefinitionContext}
*/
default MixedOperation<GenericKubernetesResource, GenericKubernetesResourceList, Resource<GenericKubernetesResource>> genericKubernetesResources(
ResourceDefinitionContext context) {
return customResources(context, GenericKubernetesResource.class, GenericKubernetesResourceList.class);
}
MixedOperation<GenericKubernetesResource, GenericKubernetesResourceList, Resource<GenericKubernetesResource>> genericKubernetesResources(
ResourceDefinitionContext context);

/**
* Semi-typed API for managing resources.
Expand Down Expand Up @@ -539,21 +468,13 @@ NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable<HasMetadata> resourc
* Get an instance of Kubernetes Client informer factory. It allows you to construct and
* cache informers for API types. With it you can subscribe to all the events related to
* your Kubernetes type. It's like watch but a bit organized.
* <p>
* Each call to this method returns a new factory.
*
* @return SharedInformerFactory object
*/
SharedInformerFactory informers();

/**
* Get an instance of Kubernetes Client informer factory. It allows you to construct and
* cache informers for API types. With it you can subscribe to all the events related to
* your Kubernetes type. It's like watch but a bit organized.
*
* @param executorService thread pool for informer factory
* @return SharedInformerFactory object
*/
SharedInformerFactory informers(ExecutorService executorService);

/**
* API entrypoint for <code>LeaderElector</code> implementation for leader election.
*
Expand Down
Expand Up @@ -17,16 +17,33 @@
package io.fabric8.kubernetes.client;

import io.fabric8.kubernetes.client.http.HttpClient;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.utils.Serialization;

import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

/**
* If no {@link Executor} or {@link ExecutorSupplier} is specified, a default {@link ExecutorSupplier} will
* be used which creates an unbounded cached thread pool per client.
*/
public class KubernetesClientBuilder {

@FunctionalInterface
public interface ExecutorSupplier extends Supplier<Executor> {

default void onClose(Executor executor) {

}

}

private Config config;
private HttpClient.Factory factory;
private Class<KubernetesClient> clazz;
private ExecutorSupplier executorSupplier;

public KubernetesClientBuilder() {
// basically the same logic as in KubernetesResourceUtil for finding list types
Expand All @@ -52,7 +69,8 @@ public KubernetesClient build() {
return clazz.getConstructor(Config.class).newInstance(config);
}
HttpClient client = factory.createHttpClient(config);
return clazz.getConstructor(HttpClient.class, Config.class).newInstance(client, config);
return clazz.getConstructor(HttpClient.class, Config.class, ExecutorSupplier.class).newInstance(client, config,
executorSupplier);
} catch (InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException
| NoSuchMethodException | SecurityException e) {
throw KubernetesClientException.launderThrowable(e);
Expand All @@ -79,4 +97,32 @@ public KubernetesClientBuilder withHttpClientFactory(HttpClient.Factory factory)
return this;
}

/**
* Configure the client to use the given executor for async tasks, such as {@link ResourceEventHandler}
* calls and writing to streams.
* <p>
* Only override if you need more control over the number of task threads used by the kubernetes client.
*
* @return this builder
*/
public KubernetesClientBuilder withTaskExecutor(Executor executor) {
this.executorSupplier = () -> executor;
return this;
}

/**
* Configure the client to use the given {@link ExecutorSupplier} for async tasks, such as {@link ResourceEventHandler}
* calls and writing to streams.
* <p>
* There will be a call to {@link ExecutorSupplier#onClose(Executor)} when a client is closed.
* <p>
* Only override if you need more control over the number of task threads used by the kubernetes client.
*
* @return this builder
*/
public KubernetesClientBuilder withTaskExecutorSupplier(ExecutorSupplier executorSupplier) {
this.executorSupplier = executorSupplier;
return this;
}

}
Expand Up @@ -21,12 +21,19 @@ public interface Watcher<T> {

/**
* If the Watcher can reconnect itself after an error
*
* @return
*/
default boolean reconnecting() {
return false;
}

/**
* Handle the given event.
* <p>
* Should not be implemented with long-running logic as this method is called directly from
* an IO thread.
*/
void eventReceived(Action action, T resource);

/**
Expand All @@ -38,13 +45,20 @@ default void onClose() {

/**
* Invoked when the watcher closes due to an Exception.
*
* <p>
* Should not be implemented with long-running logic as this method is called directly from
* an IO thread.
*
* @param cause What caused the watcher to be closed.
*/
void onClose(WatcherException cause);

enum Action {
ADDED, MODIFIED, DELETED, ERROR, BOOKMARK
ADDED,
MODIFIED,
DELETED,
ERROR,
BOOKMARK
}

}
Expand Up @@ -19,6 +19,12 @@

import java.io.IOException;

/**
* Provides callbacks for exec websocket events mainly for logging and testing
* <p>
* long running operations should not be called as these methods are called directly
* from the IO thread.
*/
public interface ExecListener {

public interface Response {
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -67,8 +68,7 @@ public interface Informable<T> {
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
* The processing of handler events will be in the client's {@link Executor}.
*
* @return a running {@link SharedIndexInformer}
*/
Expand All @@ -86,8 +86,7 @@ default SharedIndexInformer<T> inform() {
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
* The processing of handler events will be in the client's {@link Executor}.
*
* @param handler to notify
* @return a running {@link SharedIndexInformer}
Expand All @@ -104,8 +103,7 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
* The processing of handler events will be in the client's {@link Executor}.
*
* @param handler to notify
* @param resync the resync period or 0 for no resync
Expand All @@ -118,9 +116,6 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
* and provides a store of all the current resources.
* <p>
* You are expected to call stop to terminate the underlying Watch.
* <p>
* Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
* @param resync the resync period or 0 for no resync
* @return a non-running {@link SharedIndexInformer}
Expand All @@ -131,6 +126,8 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<? super T> handler) {
* Return a {@link Future} when the list at this context satisfies the given {@link Predicate}.
* The predicate will be tested against the state of the underlying informer store on every event.
* The returned future should be cancelled by the caller if not waiting for completion to close the underlying informer
* <p>
* The processing of events will be in the IO thread, blocking operations should be avoided.
*
* @param condition the {@link Predicate} to test
* @return a {@link CompletableFuture} of the list of items after the condition is met
Expand Down
Expand Up @@ -23,8 +23,10 @@ public interface Waitable<T, P> {
T waitUntilReady(long amount, TimeUnit timeUnit);

/**
* Wait for the given condition to be true. Only non-blocking conditions should be used.
*
* Wait for the given condition to be true.
* <p>
* The processing of events will be in the IO thread, blocking operations should be avoided.
*
* @param condition
* @param amount
* @param timeUnit
Expand Down
Expand Up @@ -15,10 +15,8 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.Watcher;
public interface WatchAndWaitable<T> extends Watchable<T>, Waitable<T, T> {

public interface WatchAndWaitable<T> extends Watchable<Watcher<T>>, Waitable<T, T> {

Watchable<Watcher<T>> withResourceVersion(String resourceVersion);
Watchable<T> withResourceVersion(String resourceVersion);

}

0 comments on commit 202253a

Please sign in to comment.