diff --git a/pkg/builder/controller.go b/pkg/builder/controller.go index 11bbea3c1d..0a6a7884e3 100644 --- a/pkg/builder/controller.go +++ b/pkg/builder/controller.go @@ -287,6 +287,8 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind) string { } func (blder *Builder) doController(r reconcile.Reconciler) error { + globalOpts := blder.mgr.GetControllerOptions() + ctrlOptions := blder.ctrlOptions if ctrlOptions.Reconciler == nil { ctrlOptions.Reconciler = r @@ -299,6 +301,20 @@ func (blder *Builder) doController(r reconcile.Reconciler) error { return err } + // Setup concurrency. + if ctrlOptions.MaxConcurrentReconciles == 0 { + groupKind := gvk.GroupKind().String() + + if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 { + ctrlOptions.MaxConcurrentReconciles = concurrency + } + } + + // Setup cache sync timeout. + if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil { + ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout + } + // Setup the logger. if ctrlOptions.Log == nil { ctrlOptions.Log = blder.mgr.GetLogger() diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 09ff137638..47bf031b8f 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -36,6 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -172,6 +173,34 @@ var _ = Describe("application", func() { Expect(instance).NotTo(BeNil()) }) + It("should override max concurrent reconcilers during creation of controller, when using", func() { + const maxConcurrentReconciles = 10 + newController = func(name string, mgr manager.Manager, options controller.Options) ( + controller.Controller, error) { + if options.MaxConcurrentReconciles == maxConcurrentReconciles { + return controller.New(name, mgr, options) + } + return nil, fmt.Errorf("max concurrent reconcilers expected %d but found %d", maxConcurrentReconciles, options.MaxConcurrentReconciles) + } + + By("creating a controller manager") + m, err := manager.New(cfg, manager.Options{ + Controller: v1alpha1.ControllerConfigurationSpec{ + GroupKindConcurrency: map[string]int{ + "ReplicaSet.apps": maxConcurrentReconciles, + }, + }, + }) + Expect(err).NotTo(HaveOccurred()) + + instance, err := ControllerManagedBy(m). + For(&appsv1.ReplicaSet{}). + Owns(&appsv1.ReplicaSet{}). + Build(noop) + Expect(err).NotTo(HaveOccurred()) + Expect(instance).NotTo(BeNil()) + }) + It("should override rate limiter during creation of controller", func() { rateLimiter := workqueue.DefaultItemBasedRateLimiter() newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) { diff --git a/pkg/config/v1alpha1/types.go b/pkg/config/v1alpha1/types.go index 25c406375b..e13f1c0090 100644 --- a/pkg/config/v1alpha1/types.go +++ b/pkg/config/v1alpha1/types.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" configv1alpha1 "k8s.io/component-base/config/v1alpha1" @@ -50,9 +52,14 @@ type ControllerManagerConfigurationSpec struct { // GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop. // To disable graceful shutdown, set to time.Duration(0) // To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1) - // The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost. + // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"` + // Controller contains global configuration options for controllers + // registered within this manager. + // +optional + Controller *ControllerConfigurationSpec `json:"controller,omitempty"` + // Metrics contains thw controller metrics configuration // +optional Metrics ControllerMetrics `json:"metrics,omitempty"` @@ -66,6 +73,29 @@ type ControllerManagerConfigurationSpec struct { Webhook ControllerWebhook `json:"webhook,omitempty"` } +// ControllerConfigurationSpec defines the global configuration for +// controllers registered with the manager. +type ControllerConfigurationSpec struct { + // GroupKindConcurrency is a map from a Kind to the number of concurrent reconciliation + // allowed for that controller. + // + // When a controller is registered within this manager using the builder utilities, + // users have to specify the type the controller reconciles in the For(...) call. + // If the object's kind passed matches one of the keys in this map, the concurrency + // for that controller is set to the number specified. + // + // The key is expected to be consistent in form with GroupKind.String(), + // e.g. ReplicaSet in apps group (regardless of version) would be `ReplicaSet.apps`. + // + // +optional + GroupKindConcurrency map[string]int `json:"groupKindConcurrency,omitempty"` + + // CacheSyncTimeout refers to the time limit set to wait for syncing caches. + // Defaults to 2 minutes if not set. + // +optional + CacheSyncTimeout *time.Duration `json:"cacheSyncTimeout,omitempty"` +} + // ControllerMetrics defines the metrics configs type ControllerMetrics struct { // BindAddress is the TCP address that the controller should bind to diff --git a/pkg/config/v1alpha1/zz_generated.deepcopy.go b/pkg/config/v1alpha1/zz_generated.deepcopy.go index 5deb12fad7..752fa9754c 100644 --- a/pkg/config/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/config/v1alpha1/zz_generated.deepcopy.go @@ -8,8 +8,36 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" configv1alpha1 "k8s.io/component-base/config/v1alpha1" + timex "time" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ControllerConfigurationSpec) DeepCopyInto(out *ControllerConfigurationSpec) { + *out = *in + if in.GroupKindConcurrency != nil { + in, out := &in.GroupKindConcurrency, &out.GroupKindConcurrency + *out = make(map[string]int, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } + if in.CacheSyncTimeout != nil { + in, out := &in.CacheSyncTimeout, &out.CacheSyncTimeout + *out = new(timex.Duration) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ControllerConfigurationSpec. +func (in *ControllerConfigurationSpec) DeepCopy() *ControllerConfigurationSpec { + if in == nil { + return nil + } + out := new(ControllerConfigurationSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ControllerHealth) DeepCopyInto(out *ControllerHealth) { *out = *in @@ -68,6 +96,11 @@ func (in *ControllerManagerConfigurationSpec) DeepCopyInto(out *ControllerManage *out = new(v1.Duration) **out = **in } + if in.Controller != nil { + in, out := &in.Controller, &out.Controller + *out = new(ControllerConfigurationSpec) + (*in).DeepCopyInto(*out) + } out.Metrics = in.Metrics out.Health = in.Health in.Webhook.DeepCopyInto(&out.Webhook) diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index d253b9a6ad..207bec6e02 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -38,6 +38,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" + "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/healthz" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -108,6 +109,9 @@ type controllerManager struct { healthzStarted bool errChan chan error + // controllerOptions are the global controller options. + controllerOptions v1alpha1.ControllerConfigurationSpec + // Logger is the logger that should be used by this manager. // If none is set, it defaults to log.Log global logger. logger logr.Logger @@ -355,6 +359,10 @@ func (cm *controllerManager) GetLogger() logr.Logger { return cm.logger } +func (cm *controllerManager) GetControllerOptions() v1alpha1.ControllerConfigurationSpec { + return cm.controllerOptions +} + func (cm *controllerManager) serveMetrics() { handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2f676ad7a0..b790105a69 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -90,6 +90,9 @@ type Manager interface { // GetLogger returns this manager's logger. GetLogger() logr.Logger + + // GetControllerOptions returns controller global configuration options. + GetControllerOptions() v1alpha1.ControllerConfigurationSpec } // Options are the arguments for creating a new Manager @@ -230,6 +233,11 @@ type Options struct { // The graceful shutdown is skipped for safety reasons in case the leader election lease is lost. GracefulShutdownTimeout *time.Duration + // Controller contains global configuration options for controllers + // registered within this manager. + // +optional + Controller v1alpha1.ControllerConfigurationSpec + // makeBroadcaster allows deferring the creation of the broadcaster to // avoid leaking goroutines if we never call Start on this manager. It also // returns whether or not this is a "owned" broadcaster, and as such should be @@ -337,6 +345,7 @@ func New(config *rest.Config, options Options) (Manager, error) { resourceLock: resourceLock, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, + controllerOptions: options.Controller, logger: options.Logger, elected: make(chan struct{}), port: options.Port, @@ -407,6 +416,16 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options, o.CertDir = newObj.Webhook.CertDir } + if newObj.Controller != nil { + if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil { + o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout + } + + if len(o.Controller.GroupKindConcurrency) == 0 && len(newObj.Controller.GroupKindConcurrency) > 0 { + o.Controller.GroupKindConcurrency = newObj.Controller.GroupKindConcurrency + } + } + return o, nil }