Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠️ Support global controller options in component config #1371

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/builder/controller.go
Expand Up @@ -287,6 +287,8 @@ func (blder *Builder) getControllerName(gvk schema.GroupVersionKind) string {
}

func (blder *Builder) doController(r reconcile.Reconciler) error {
globalOpts := blder.mgr.GetControllerOptions()

ctrlOptions := blder.ctrlOptions
if ctrlOptions.Reconciler == nil {
ctrlOptions.Reconciler = r
Expand All @@ -299,6 +301,20 @@ func (blder *Builder) doController(r reconcile.Reconciler) error {
return err
}

// Setup concurrency.
if ctrlOptions.MaxConcurrentReconciles == 0 {
groupKind := gvk.GroupKind().String()

if concurrency, ok := globalOpts.GroupKindConcurrency[groupKind]; ok && concurrency > 0 {
ctrlOptions.MaxConcurrentReconciles = concurrency
}
}

// Setup cache sync timeout.
if ctrlOptions.CacheSyncTimeout == 0 && globalOpts.CacheSyncTimeout != nil {
ctrlOptions.CacheSyncTimeout = *globalOpts.CacheSyncTimeout
}

// Setup the logger.
if ctrlOptions.Log == nil {
ctrlOptions.Log = blder.mgr.GetLogger()
Expand Down
29 changes: 29 additions & 0 deletions pkg/builder/controller_test.go
Expand Up @@ -36,6 +36,7 @@ import (

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -172,6 +173,34 @@ var _ = Describe("application", func() {
Expect(instance).NotTo(BeNil())
})

It("should override max concurrent reconcilers during creation of controller, when using", func() {
const maxConcurrentReconciles = 10
newController = func(name string, mgr manager.Manager, options controller.Options) (
controller.Controller, error) {
if options.MaxConcurrentReconciles == maxConcurrentReconciles {
return controller.New(name, mgr, options)
}
return nil, fmt.Errorf("max concurrent reconcilers expected %d but found %d", maxConcurrentReconciles, options.MaxConcurrentReconciles)
}

By("creating a controller manager")
m, err := manager.New(cfg, manager.Options{
Controller: v1alpha1.ControllerConfigurationSpec{
GroupKindConcurrency: map[string]int{
"ReplicaSet.apps": maxConcurrentReconciles,
},
},
})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
For(&appsv1.ReplicaSet{}).
Owns(&appsv1.ReplicaSet{}).
Build(noop)
Expect(err).NotTo(HaveOccurred())
Expect(instance).NotTo(BeNil())
})

It("should override rate limiter during creation of controller", func() {
rateLimiter := workqueue.DefaultItemBasedRateLimiter()
newController = func(name string, mgr manager.Manager, options controller.Options) (controller.Controller, error) {
Expand Down
32 changes: 31 additions & 1 deletion pkg/config/v1alpha1/types.go
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package v1alpha1

import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

configv1alpha1 "k8s.io/component-base/config/v1alpha1"
Expand Down Expand Up @@ -50,9 +52,14 @@ type ControllerManagerConfigurationSpec struct {
// GracefulShutdownTimeout is the duration given to runnable to stop before the manager actually returns on stop.
// To disable graceful shutdown, set to time.Duration(0)
// To use graceful shutdown without timeout, set to a negative duration, e.G. time.Duration(-1)
// The graceful shutdown is skipped for safety reasons in case the leadere election lease is lost.
// The graceful shutdown is skipped for safety reasons in case the leader election lease is lost.
GracefulShutdownTimeout *metav1.Duration `json:"gracefulShutDown,omitempty"`

// Controller contains global configuration options for controllers
// registered within this manager.
// +optional
Controller *ControllerConfigurationSpec `json:"controller,omitempty"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the description, you called this out as a breaking change but all the fields have optional? Am I missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manager's interface new method addition is a breaking change

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. For the most part the docs (at least the ones in Kubebuilder on master) point to embedding the core type so hopefully, anyone that has gone and implemented this would be able to receive the update to resolve this but for folks that aren't that makes sense. luckily we're still alpha 🎉


// Metrics contains thw controller metrics configuration
// +optional
Metrics ControllerMetrics `json:"metrics,omitempty"`
Expand All @@ -66,6 +73,29 @@ type ControllerManagerConfigurationSpec struct {
Webhook ControllerWebhook `json:"webhook,omitempty"`
}

// ControllerConfigurationSpec defines the global configuration for
// controllers registered with the manager.
type ControllerConfigurationSpec struct {
// GroupKindConcurrency is a map from a Kind to the number of concurrent reconciliation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love this idea, this will make it much easier to make per type configs.

// allowed for that controller.
//
// When a controller is registered within this manager using the builder utilities,
// users have to specify the type the controller reconciles in the For(...) call.
// If the object's kind passed matches one of the keys in this map, the concurrency
// for that controller is set to the number specified.
//
// The key is expected to be consistent in form with GroupKind.String(),
// e.g. ReplicaSet in apps group (regardless of version) would be `ReplicaSet.apps`.
//
// +optional
GroupKindConcurrency map[string]int `json:"groupKindConcurrency,omitempty"`

// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
// Defaults to 2 minutes if not set.
// +optional
CacheSyncTimeout *time.Duration `json:"cacheSyncTimeout,omitempty"`
}

// ControllerMetrics defines the metrics configs
type ControllerMetrics struct {
// BindAddress is the TCP address that the controller should bind to
Expand Down
33 changes: 33 additions & 0 deletions pkg/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions pkg/manager/internal.go
Expand Up @@ -38,6 +38,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/healthz"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
"sigs.k8s.io/controller-runtime/pkg/metrics"
Expand Down Expand Up @@ -108,6 +109,9 @@ type controllerManager struct {
healthzStarted bool
errChan chan error

// controllerOptions are the global controller options.
controllerOptions v1alpha1.ControllerConfigurationSpec

// Logger is the logger that should be used by this manager.
// If none is set, it defaults to log.Log global logger.
logger logr.Logger
Expand Down Expand Up @@ -355,6 +359,10 @@ func (cm *controllerManager) GetLogger() logr.Logger {
return cm.logger
}

func (cm *controllerManager) GetControllerOptions() v1alpha1.ControllerConfigurationSpec {
return cm.controllerOptions
}

func (cm *controllerManager) serveMetrics() {
handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{
ErrorHandling: promhttp.HTTPErrorOnError,
Expand Down
19 changes: 19 additions & 0 deletions pkg/manager/manager.go
Expand Up @@ -90,6 +90,9 @@ type Manager interface {

// GetLogger returns this manager's logger.
GetLogger() logr.Logger

// GetControllerOptions returns controller global configuration options.
GetControllerOptions() v1alpha1.ControllerConfigurationSpec
}

// Options are the arguments for creating a new Manager
Expand Down Expand Up @@ -230,6 +233,11 @@ type Options struct {
// The graceful shutdown is skipped for safety reasons in case the leader election lease is lost.
GracefulShutdownTimeout *time.Duration

// Controller contains global configuration options for controllers
// registered within this manager.
// +optional
Controller v1alpha1.ControllerConfigurationSpec

// makeBroadcaster allows deferring the creation of the broadcaster to
// avoid leaking goroutines if we never call Start on this manager. It also
// returns whether or not this is a "owned" broadcaster, and as such should be
Expand Down Expand Up @@ -337,6 +345,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
resourceLock: resourceLock,
metricsListener: metricsListener,
metricsExtraHandlers: metricsExtraHandlers,
controllerOptions: options.Controller,
logger: options.Logger,
elected: make(chan struct{}),
port: options.Port,
Expand Down Expand Up @@ -407,6 +416,16 @@ func (o Options) AndFrom(loader config.ControllerManagerConfiguration) (Options,
o.CertDir = newObj.Webhook.CertDir
}

if newObj.Controller != nil {
if o.Controller.CacheSyncTimeout == nil && newObj.Controller.CacheSyncTimeout != nil {
o.Controller.CacheSyncTimeout = newObj.Controller.CacheSyncTimeout
}

if len(o.Controller.GroupKindConcurrency) == 0 && len(newObj.Controller.GroupKindConcurrency) > 0 {
o.Controller.GroupKindConcurrency = newObj.Controller.GroupKindConcurrency
}
}

return o, nil
}

Expand Down