From 82fc2564cf3065ceabd9d8c5c97b9bb9b2eef99a Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Fri, 5 Feb 2021 10:47:34 -0800 Subject: [PATCH] :warning: Support global controller options in component config This change adds support for our v1alpha1 ComponentConfig types to expose configuration options for controllers. The only two current exposed options are concurrency (done via a GroupKind map) and the cache sync timeout option. This is a breaking change, given that we're adding a new required method to the Manager's interface. Signed-off-by: Vince Prignano --- pkg/builder/controller.go | 16 ++++++++++ pkg/builder/controller_test.go | 29 +++++++++++++++++ pkg/config/v1alpha1/types.go | 32 ++++++++++++++++++- pkg/config/v1alpha1/zz_generated.deepcopy.go | 33 ++++++++++++++++++++ pkg/manager/internal.go | 8 +++++ pkg/manager/manager.go | 19 +++++++++++ 6 files changed, 136 insertions(+), 1 deletion(-) diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 11bbea3c1d..0a6a7884e3 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -287,6 +287,8 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind) string { } func (blder *Builder) doController(r reconcile.Reconciler) error { + globalOpts := blder.mgr.GetControllerOptions() + ctrlOptions := blder.ctrlOptions if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r @@ -299,6 +301,20 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { return err } + // Setup concurrency. + if ctrlOptions.MaxConcurrentReconciles == 0 { + groupKind := gvk.GroupKind().String() + + if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 { + ctrlOptions.MaxConcurrentReconciles = concurrency + } + } + + // Setup cache sync timeout. + if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil { + ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout + } + // Setup the logger. if ctrlOptions.Log == nil { ctrlOptions.Log = blder.mgr.GetLogger() diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 09ff137638..47bf031b8f 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -172,6 +173,34 @@ var _ = Describe("application", func() { Expect(instance).NotTo(BeNil()) }) + It("should override max concurrent reconcilers during creation of controller, when using", func() { + const maxConcurrentReconciles = 10 + newController = func(name string, mgr manager.Manager, options controller.Options) ( + controller.Controller, error) { + if options.MaxConcurrentReconciles == maxConcurrentReconciles { + return controller.New(name, mgr, options) + } + return nil, fmt.Errorf("max concurrent reconcilers expected %d but found %d", maxConcurrentReconciles, options.MaxConcurrentReconciles) + } + + By("creating a controller manager") + m, err := manager.New(cfg, manager.Options{ + Controller: v1alpha1.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + "ReplicaSet.apps": maxConcurrentReconciles, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + instance, err := ControllerManagedBy(m). + For(&appsv1.ReplicaSet{}). + Owns(&appsv1.ReplicaSet{}). + Build(noop) + Expect(err).NotTo(HaveOccurred()) + Expect(instance).NotTo(BeNil()) + }) + It("should override rate limiter during creation of controller", func() { rateLimiter := workqueue.DefaultItemBasedRateLimiter() newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { diff --git a/pkg/config/v1alpha1/types.go b/pkg/config/v1alpha1/types.go index 25c406375b..e13f1c0090 100644 --- a/pkg/config/v1alpha1/types.go +++ b/pkg/config/v1alpha1/types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" configv1alpha1 "k8s.io/component-base/config/v1alpha1" @@ -50,9 +52,14 @@ type ControllerManagerConfigurationSpec struct { // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. // To disable graceful shutdown, set to time.Duration(0) // To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1) - // The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost. + // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"` + // Controller contains global configuration options for controllers + // registered within this manager. + // +optional + Controller *ControllerConfigurationSpec `json:"controller,omitempty"` + // Metrics contains thw controller metrics configuration // +optional Metrics ControllerMetrics `json:"metrics,omitempty"` @@ -66,6 +73,29 @@ type ControllerManagerConfigurationSpec struct { Webhook ControllerWebhook `json:"webhook,omitempty"` } +// ControllerConfigurationSpec defines the global configuration for +// controllers registered with the manager. +type ControllerConfigurationSpec struct { + // GroupKindConcurrency is a map from a Kind to the number of concurrent reconciliation + // allowed for that controller. + // + // When a controller is registered within this manager using the builder utilities, + // users have to specify the type the controller reconciles in the For(...) call. + // If the object's kind passed matches one of the keys in this map, the concurrency + // for that controller is set to the number specified. + // + // The key is expected to be consistent in form with GroupKind.String(), + // e.g. ReplicaSet in apps group (regardless of version) would be `ReplicaSet.apps`. + // + // +optional + GroupKindConcurrency map[string]int `json:"groupKindConcurrency,omitempty"` + + // CacheSyncTimeout refers to the time limit set to wait for syncing caches. + // Defaults to 2 minutes if not set. + // +optional + CacheSyncTimeout *time.Duration `json:"cacheSyncTimeout,omitempty"` +} + // ControllerMetrics defines the metrics configs type ControllerMetrics struct { // BindAddress is the TCP address that the controller should bind to diff --git a/pkg/config/v1alpha1/zz_generated.deepcopy.go b/pkg/config/v1alpha1/zz_generated.deepcopy.go index 5deb12fad7..752fa9754c 100644 --- a/pkg/config/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/config/v1alpha1/zz_generated.deepcopy.go @@ -8,8 +8,36 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" configv1alpha1 "k8s.io/component-base/config/v1alpha1" + timex "time" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerConfigurationSpec) DeepCopyInto(out *ControllerConfigurationSpec) { + *out = *in + if in.GroupKindConcurrency != nil { + in, out := &in.GroupKindConcurrency, &out.GroupKindConcurrency + *out = make(map[string]int, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.CacheSyncTimeout != nil { + in, out := &in.CacheSyncTimeout, &out.CacheSyncTimeout + *out = new(timex.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerConfigurationSpec. +func (in *ControllerConfigurationSpec) DeepCopy() *ControllerConfigurationSpec { + if in == nil { + return nil + } + out := new(ControllerConfigurationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ControllerHealth) DeepCopyInto(out *ControllerHealth) { *out = *in @@ -68,6 +96,11 @@ func (in *ControllerManagerConfigurationSpec) DeepCopyInto(out *ControllerManage *out = new(v1.Duration) **out = **in } + if in.Controller != nil { + in, out := &in.Controller, &out.Controller + *out = new(ControllerConfigurationSpec) + (*in).DeepCopyInto(*out) + } out.Metrics = in.Metrics out.Health = in.Health in.Webhook.DeepCopyInto(&out.Webhook) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index d253b9a6ad..207bec6e02 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/healthz" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -108,6 +109,9 @@ type controllerManager struct { healthzStarted bool errChan chan error + // controllerOptions are the global controller options. + controllerOptions v1alpha1.ControllerConfigurationSpec + // Logger is the logger that should be used by this manager. // If none is set, it defaults to log.Log global logger. logger logr.Logger @@ -355,6 +359,10 @@ func (cm *controllerManager) GetLogger() logr.Logger { return cm.logger } +func (cm *controllerManager) GetControllerOptions() v1alpha1.ControllerConfigurationSpec { + return cm.controllerOptions +} + func (cm *controllerManager) serveMetrics() { handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2f676ad7a0..b790105a69 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -90,6 +90,9 @@ type Manager interface { // GetLogger returns this manager's logger. GetLogger() logr.Logger + + // GetControllerOptions returns controller global configuration options. + GetControllerOptions() v1alpha1.ControllerConfigurationSpec } // Options are the arguments for creating a new Manager @@ -230,6 +233,11 @@ type Options struct { // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. GracefulShutdownTimeout *time.Duration + // Controller contains global configuration options for controllers + // registered within this manager. + // +optional + Controller v1alpha1.ControllerConfigurationSpec + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -337,6 +345,7 @@ func New(config *rest.Config, options Options) (Manager, error) { resourceLock: resourceLock, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, + controllerOptions: options.Controller, logger: options.Logger, elected: make(chan struct{}), port: options.Port, @@ -407,6 +416,16 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options, o.CertDir = newObj.Webhook.CertDir } + if newObj.Controller != nil { + if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil { + o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout + } + + if len(o.Controller.GroupKindConcurrency) == 0 && len(newObj.Controller.GroupKindConcurrency) > 0 { + o.Controller.GroupKindConcurrency = newObj.Controller.GroupKindConcurrency + } + } + return o, nil }