diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 7c25bd3c60..7db65e373e 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -104,11 +104,23 @@ type controllerManager struct { // Healthz probe handler healthzHandler *healthz.Handler - mu sync.Mutex - started bool - startedLeader bool + // addMu protects controllerManager from Add, AddHealthzCheck, AddMetricsExtraHandler, AddReadyzCheck being + // called while the data they collect are being read. + addMu sync.RWMutex + + // started tracks if the check has been started. + started bool + + // leader runnable started. + startedLeader bool + + // healthz started. In other words, we should not add healthz or readyz to the manager healthzStarted bool - errChan chan error + + // cacheMu protects waitForCache from being executed twice concurrently. + cacheMu sync.Mutex + + errChan chan error // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -192,8 +204,8 @@ type hasCache interface { // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.Lock() + defer cm.addMu.Unlock() if cm.stopProcedureEngaged { return errors.New("can't accept new runnable as stop procedure is already engaged") } @@ -248,8 +260,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint) } - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.Lock() + defer cm.addMu.Unlock() if _, found := cm.metricsExtraHandlers[path]; found { return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path) @@ -262,8 +274,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha // AddHealthzCheck allows you to add Healthz checker. func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.Lock() + defer cm.addMu.Unlock() if cm.stopProcedureEngaged { return errors.New("can't accept new healthCheck as stop procedure is already engaged") @@ -283,8 +295,8 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) // AddReadyzCheck allows you to add Readyz checker. func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.Lock() + defer cm.addMu.Unlock() if cm.stopProcedureEngaged { return errors.New("can't accept new ready check as stop procedure is already engaged") @@ -367,8 +379,8 @@ func (cm *controllerManager) serveMetrics() { mux.Handle(defaultMetricsEndpoint, handler) func() { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.RLock() + defer cm.addMu.RUnlock() for path, extraHandler := range cm.metricsExtraHandlers { mux.Handle(path, extraHandler) @@ -401,8 +413,8 @@ func (cm *controllerManager) serveHealthProbes() { } func() { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.RLock() + defer cm.addMu.RUnlock() if cm.readyzHandler != nil { mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) @@ -422,6 +434,9 @@ func (cm *controllerManager) serveHealthProbes() { } return nil })) + + // Note: healthzStarted is used by AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because + // addMu.RLock() prevents the above functions to be executed concurrently with this operation. cm.healthzStarted = true }() @@ -535,8 +550,14 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e if cm.gracefulShutdownTimeout == 0 { return nil } - cm.mu.Lock() - defer cm.mu.Unlock() + + // NOTE: it is required to get the addMu write lock, so we are sure that we not starting any runnable + // while we start waitForRunnableToEnd. + cm.addMu.Lock() + defer cm.addMu.Unlock() + + // Note: stopProcedureEngaged is used by Add, AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because + // addMu.RLock() prevents the above functions to be executed concurrently with this operation. cm.stopProcedureEngaged = true // we want to close this after the other runnables stop, because we don't @@ -574,8 +595,8 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF } func (cm *controllerManager) startNonLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.RLock() + defer cm.addMu.RUnlock() // First start any webhook servers, which includes conversion, validation, and defaulting // webhooks that are registered. @@ -605,8 +626,8 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() { } func (cm *controllerManager) startLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() + cm.addMu.RLock() + defer cm.addMu.RUnlock() cm.waitForCache(cm.internalCtx) @@ -617,10 +638,15 @@ func (cm *controllerManager) startLeaderElectionRunnables() { cm.startRunnable(c) } + // Note: startedLeader is used by Add, but it is safe to change here because + // addMu.RLock() prevents the above function to be executed concurrently with this operation. cm.startedLeader = true } func (cm *controllerManager) waitForCache(ctx context.Context) { + cm.cacheMu.Lock() + defer cm.cacheMu.Unlock() + if cm.started { return } @@ -638,14 +664,22 @@ func (cm *controllerManager) waitForCache(ctx context.Context) { // cm.started as check if we already started the cache so it must always become true. // Making sure that the cache doesn't get started twice is needed to not get a "close // of closed channel" panic + + // Note: started is used by Add, so it is required to get an addMu lock/RLock before + // calling this func in order to prevent the above function to be executed concurrently + // with this operation. cm.started = true } func (cm *controllerManager) startLeaderElection() (err error) { ctx, cancel := context.WithCancel(context.Background()) - cm.mu.Lock() + + // Note: leaderElectionCancel is used by engageStopProcedure, which already gets a addMu.Rlock; + // thus, in order to prevent the above function to be executed concurrently with this operation, we + // require and addMu.Lock also here. + cm.addMu.Lock() cm.leaderElectionCancel = cancel - cm.mu.Unlock() + cm.addMu.Unlock() if cm.onStoppedLeading == nil { cm.onStoppedLeading = func() { diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index b697751799..ca55ed1c8f 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1350,8 +1350,8 @@ var _ = Describe("manger.Manager", func() { // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() + mgr.addMu.Lock() + defer mgr.addMu.Unlock() return mgr.started }).Should(BeTrue()) @@ -1381,8 +1381,8 @@ var _ = Describe("manger.Manager", func() { // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() + mgr.addMu.Lock() + defer mgr.addMu.Unlock() return mgr.started }).Should(BeTrue())