Skip to content

Commit

Permalink
refactor: non leader controllers management (#4831)
Browse files Browse the repository at this point in the history
  • Loading branch information
eddycharly committed Oct 6, 2022
1 parent 74172f2 commit 1509fa6
Show file tree
Hide file tree
Showing 16 changed files with 173 additions and 127 deletions.
13 changes: 12 additions & 1 deletion cmd/kyverno/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@ package main
import (
"context"

"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/controllers"
)

type controller struct {
name string
controller controllers.Controller
workers int
}

func (c *controller) run(ctx context.Context) {
func newController(name string, c controllers.Controller, w int) controller {
return controller{
name: name,
controller: c,
workers: w,
}
}

func (c controller) run(ctx context.Context, logger logr.Logger) {
logger.Info("start controller...", "name", c.name)
c.controller.Run(ctx, c.workers)
}
200 changes: 121 additions & 79 deletions cmd/kyverno/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,53 @@ func sanityChecks(dynamicClient dclient.Interface) error {
return nil
}

func createNonLeaderControllers(
kubeInformer kubeinformers.SharedInformerFactory,
kubeKyvernoInformer kubeinformers.SharedInformerFactory,
kyvernoInformer kyvernoinformer.SharedInformerFactory,
kubeClient kubernetes.Interface,
kyvernoClient versioned.Interface,
dynamicClient dclient.Interface,
configuration config.Configuration,
policyCache policycache.Cache,
eventGenerator event.Interface,
manager *openapi.Controller,
) ([]controller, func() error) {
policyCacheController := policycachecontroller.NewController(
policyCache,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
)
openApiController := openapi.NewCRDSync(
dynamicClient,
manager,
)
configurationController := configcontroller.NewController(
configuration,
kubeKyvernoInformer.Core().V1().ConfigMaps(),
)
updateRequestController := background.NewController(
kyvernoClient,
dynamicClient,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
kubeInformer.Core().V1().Namespaces(),
kubeKyvernoInformer.Core().V1().Pods(),
eventGenerator,
configuration,
)
return []controller{
newController(policycachecontroller.ControllerName, policyCacheController, policycachecontroller.Workers),
newController("openapi-controller", openApiController, 1),
newController(configcontroller.ControllerName, configurationController, configcontroller.Workers),
newController("update-request-controller", updateRequestController, genWorkers),
},
func() error {
return policyCacheController.WarmUp()
}
}

func main() {
// parse flags
if err := parseFlags(); err != nil {
Expand Down Expand Up @@ -358,14 +405,6 @@ func main() {
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
metadataInformer := metadatainformers.NewSharedInformerFactory(metadataClient, 15*time.Minute)

// utils
kyvernoV1 := kyvernoInformer.Kyverno().V1()
kyvernoV1beta1 := kyvernoInformer.Kyverno().V1beta1()

// EVENT GENERATOR
// - generate event with retry mechanism
eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), maxQueuedEvents, logging.WithName("EventGenerator"))

webhookCfg := webhookconfig.NewRegister(
signalCtx,
clientConfig,
Expand All @@ -375,27 +414,41 @@ func main() {
kubeInformer.Admissionregistration().V1().MutatingWebhookConfigurations(),
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
kubeKyvernoInformer.Apps().V1().Deployments(),
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
metricsConfig,
serverIP,
int32(webhookTimeout),
autoUpdateWebhooks,
logging.GlobalLogger(),
)

webhookMonitor, err := webhookconfig.NewMonitor(kubeClient, logging.GlobalLogger())
configuration, err := config.NewConfiguration(
kubeClient,
webhookCfg.UpdateWebhookChan,
)
if err != nil {
logger.Error(err, "failed to initialize webhookMonitor")
logger.Error(err, "failed to initialize configuration")
os.Exit(1)
}
openApiManager, err := openapi.NewOpenAPIController()
if err != nil {
logger.Error(err, "Failed to create openapi manager")
os.Exit(1)
}
policyCache := policycache.NewCache()
eventGenerator := event.NewEventGenerator(
dynamicClient,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
maxQueuedEvents,
logging.WithName("EventGenerator"),
)

configuration, err := config.NewConfiguration(kubeClient, webhookCfg.UpdateWebhookChan)
webhookMonitor, err := webhookconfig.NewMonitor(kubeClient, logging.GlobalLogger())
if err != nil {
logger.Error(err, "failed to initialize configuration")
logger.Error(err, "failed to initialize webhookMonitor")
os.Exit(1)
}
configurationController := configcontroller.NewController(configuration, kubeKyvernoInformer.Core().V1().ConfigMaps())

// POLICY CONTROLLER
// - reconciliation policy and policy violation
Expand All @@ -404,9 +457,9 @@ func main() {
policyCtrl, err := policy.NewPolicyController(
kyvernoClient,
dynamicClient,
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
kyvernoV1beta1.UpdateRequests(),
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
kyvernoInformer.Kyverno().V1().Policies(),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
configuration,
eventGenerator,
kubeInformer.Core().V1().Namespaces(),
Expand All @@ -419,22 +472,7 @@ func main() {
os.Exit(1)
}

urgen := webhookgenerate.NewGenerator(kyvernoClient, kyvernoV1beta1.UpdateRequests())

urc := background.NewController(
kyvernoClient,
dynamicClient,
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
kyvernoV1beta1.UpdateRequests(),
kubeInformer.Core().V1().Namespaces(),
kubeKyvernoInformer.Core().V1().Pods(),
eventGenerator,
configuration,
)

policyCache := policycache.NewCache()
policyCacheController := policycachecontroller.NewController(policyCache, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies())
urgen := webhookgenerate.NewGenerator(kyvernoClient, kyvernoInformer.Kyverno().V1beta1().UpdateRequests())

certRenewer, err := tls.NewCertRenewer(
metrics.ObjectClient[*corev1.Secret](
Expand Down Expand Up @@ -472,16 +510,13 @@ func main() {
kubeInformer.Admissionregistration().V1().ValidatingWebhookConfigurations(),
)

// the webhook server runs across all instances
openAPIController := startOpenAPIController(signalCtx, logger, dynamicClient)

// WEBHOOK
// - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration
// - reports the results based on the response from the policy engine:
// -- annotations on resources with update details on mutation JSON patches
// -- generate policy violation resource
// -- generate events on policy and resource
policyHandlers := webhookspolicy.NewHandlers(dynamicClient, openAPIController)
policyHandlers := webhookspolicy.NewHandlers(dynamicClient, openApiManager)
resourceHandlers := webhooksresource.NewHandlers(
dynamicClient,
kyvernoClient,
Expand All @@ -491,10 +526,10 @@ func main() {
kubeInformer.Core().V1().Namespaces().Lister(),
kubeInformer.Rbac().V1().RoleBindings().Lister(),
kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
kyvernoV1beta1.UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
kyvernoInformer.Kyverno().V1beta1().UpdateRequests().Lister().UpdateRequests(config.KyvernoNamespace()),
urgen,
eventGenerator,
openAPIController,
openApiManager,
admissionReports,
)

Expand All @@ -518,6 +553,7 @@ func main() {
// start them once by the leader
registerWrapperRetry := common.RetryFunc(time.Second, webhookRegistrationTimeout, webhookCfg.Register, "failed to register webhook", logger)
run := func(context.Context) {
logger := logger.WithName("leader")
if err := certRenewer.InitTLSPemPair(); err != nil {
logger.Error(err, "tls initialization error")
os.Exit(1)
Expand Down Expand Up @@ -564,7 +600,7 @@ func main() {
}

for i := range reportControllers {
go reportControllers[i].run(signalCtx)
go reportControllers[i].run(signalCtx, logger.WithName("controllers"))
}
}

Expand Down Expand Up @@ -596,26 +632,40 @@ func main() {
defer signalCancel()
<-signalCtx.Done()
}()

// create non leader controllers
nonLeaderControllers, nonLeaderBootstrap := createNonLeaderControllers(
kubeInformer,
kubeKyvernoInformer,
kyvernoInformer,
kubeClient,
kyvernoClient,
dynamicClient,
configuration,
policyCache,
eventGenerator,
openApiManager,
)
// start informers and wait for cache sync
if !startInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer) {
logger.Error(err, "Failed to wait for cache sync")
logger.Error(err, "failed to wait for cache sync")
os.Exit(1)
}

// warmup policy cache
if err := policyCacheController.WarmUp(); err != nil {
logger.Error(err, "Failed to warm up policy cache")
os.Exit(1)
// bootstrap non leader controllers
if nonLeaderBootstrap != nil {
if err := nonLeaderBootstrap(); err != nil {
logger.Error(err, "failed to bootstrap non leader controllers")
os.Exit(1)
}
}

// init events handlers
// start Kyverno controllers
go policyCacheController.Run(signalCtx, policycachecontroller.Workers)
go urc.Run(signalCtx, genWorkers)
go le.Run(signalCtx)
go configurationController.Run(signalCtx, configcontroller.Workers)
// start event generator
go eventGenerator.Run(signalCtx, 3)

// start leader election
go le.Run(signalCtx)
// start non leader controllers
for _, controller := range nonLeaderControllers {
go controller.run(signalCtx, logger.WithName("controllers"))
}
// start monitor (only when running in cluster)
if serverIP == "" {
go webhookMonitor.Run(signalCtx, webhookCfg, certRenewer, eventGenerator)
}
Expand All @@ -631,21 +681,6 @@ func main() {
logger.V(2).Info("Kyverno shutdown successful")
}

func startOpenAPIController(ctx context.Context, logger logr.Logger, client dclient.Interface) *openapi.Controller {
logger = logger.WithName("open-api")
openAPIController, err := openapi.NewOpenAPIController()
if err != nil {
logger.Error(err, "Failed to create openAPIController")
os.Exit(1)
}
// Sync openAPI definitions of resources
openAPISync := openapi.NewCRDSync(client, openAPIController)
// start openAPI controller, this is used in admission review
// thus is required in each instance
openAPISync.Run(ctx, 1)
return openAPIController
}

func setupReportControllers(
backgroundScan bool,
admissionReports bool,
Expand All @@ -663,28 +698,35 @@ func setupReportControllers(
kyvernoV1.Policies(),
kyvernoV1.ClusterPolicies(),
)
ctrls = append(ctrls, controller{resourceReportController, resourcereportcontroller.Workers})
ctrls = append(ctrls, controller{
ctrls = append(ctrls, newController(
resourcereportcontroller.ControllerName,
resourceReportController,
resourcereportcontroller.Workers,
))
ctrls = append(ctrls, newController(
aggregatereportcontroller.ControllerName,
aggregatereportcontroller.NewController(
kyvernoClient,
metadataFactory,
resourceReportController,
reportsChunkSize,
),
aggregatereportcontroller.Workers,
})
))
if admissionReports {
ctrls = append(ctrls, controller{
ctrls = append(ctrls, newController(
admissionreportcontroller.ControllerName,
admissionreportcontroller.NewController(
kyvernoClient,
metadataFactory,
resourceReportController,
),
admissionreportcontroller.Workers,
})
))
}
if backgroundScan {
ctrls = append(ctrls, controller{
ctrls = append(ctrls, newController(
backgroundscancontroller.ControllerName,
backgroundscancontroller.NewController(
client,
kyvernoClient,
Expand All @@ -695,7 +737,7 @@ func setupReportControllers(
resourceReportController,
),
backgroundscancontroller.Workers,
})
))
}
}
return ctrls
Expand Down
9 changes: 5 additions & 4 deletions pkg/controllers/config/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (

const (
// Workers is the number of workers for this controller
Workers = 3
maxRetries = 10
Workers = 3
ControllerName = "config-controller"
maxRetries = 10
)

type controller struct {
Expand All @@ -33,14 +34,14 @@ func NewController(configuration config.Configuration, configmapInformer corev1i
c := controller{
configuration: configuration,
configmapLister: configmapInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), controllerName),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
}
controllerutils.AddDefaultEventHandlers(logger.V(3), configmapInformer.Informer(), c.queue)
return &c
}

func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, controllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, ControllerName, logger.V(3), c.queue, workers, maxRetries, c.reconcile)
}

func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
Expand Down
4 changes: 1 addition & 3 deletions pkg/controllers/config/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,4 @@ package config

import "github.com/kyverno/kyverno/pkg/logging"

const controllerName = "config-controller"

var logger = logging.WithName(controllerName)
var logger = logging.WithName(ControllerName)

0 comments on commit 1509fa6

Please sign in to comment.