Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] 馃悰 Avoid deadlock on start #1689

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 58 additions & 24 deletions pkg/manager/internal.go
Expand Up @@ -104,11 +104,23 @@ type controllerManager struct {
// Healthz probe handler
healthzHandler *healthz.Handler

mu sync.Mutex
started bool
startedLeader bool
// addMu protects controllerManager from Add, AddHealthzCheck, AddMetricsExtraHandler, AddReadyzCheck being
// called while the data they collect are being read.
addMu sync.RWMutex

// started tracks if the check has been started.
started bool

// leader runnable started.
startedLeader bool

// healthz started. In other words, we should not add healthz or readyz to the manager
healthzStarted bool
errChan chan error

// cacheMu protects waitForCache from being executed twice concurrently.
cacheMu sync.Mutex

errChan chan error

// controllerOptions are the global controller options.
controllerOptions v1alpha1.ControllerConfigurationSpec
Expand Down Expand Up @@ -192,8 +204,8 @@ type hasCache interface {

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()
if cm.stopProcedureEngaged {
return errors.New("can't accept new runnable as stop procedure is already engaged")
}
Expand Down Expand Up @@ -248,8 +260,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha
return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint)
}

cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()

if _, found := cm.metricsExtraHandlers[path]; found {
return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path)
Expand All @@ -262,8 +274,8 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha

// AddHealthzCheck allows you to add Healthz checker.
func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()

if cm.stopProcedureEngaged {
return errors.New("can't accept new healthCheck as stop procedure is already engaged")
Expand All @@ -283,8 +295,8 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker)

// AddReadyzCheck allows you to add Readyz checker.
func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.Lock()
defer cm.addMu.Unlock()

if cm.stopProcedureEngaged {
return errors.New("can't accept new ready check as stop procedure is already engaged")
Expand Down Expand Up @@ -367,8 +379,8 @@ func (cm *controllerManager) serveMetrics() {
mux.Handle(defaultMetricsEndpoint, handler)

func() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

for path, extraHandler := range cm.metricsExtraHandlers {
mux.Handle(path, extraHandler)
Expand Down Expand Up @@ -401,8 +413,8 @@ func (cm *controllerManager) serveHealthProbes() {
}

func() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

if cm.readyzHandler != nil {
mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler))
Expand All @@ -422,6 +434,9 @@ func (cm *controllerManager) serveHealthProbes() {
}
return nil
}))

// Note: healthzStarted is used by AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because
// addMu.RLock() prevents the above functions to be executed concurrently with this operation.
cm.healthzStarted = true
}()

Expand Down Expand Up @@ -535,8 +550,14 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e
if cm.gracefulShutdownTimeout == 0 {
return nil
}
cm.mu.Lock()
defer cm.mu.Unlock()

// NOTE: it is required to get the addMu write lock, so we are sure that we not starting any runnable
// while we start waitForRunnableToEnd.
cm.addMu.Lock()
defer cm.addMu.Unlock()

// Note: stopProcedureEngaged is used by Add, AddMetricsExtraHandler, AddReadyzCheck, but it is safe to change here because
// addMu.RLock() prevents the above functions to be executed concurrently with this operation.
cm.stopProcedureEngaged = true

// we want to close this after the other runnables stop, because we don't
Expand Down Expand Up @@ -574,8 +595,8 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF
}

func (cm *controllerManager) startNonLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

// First start any webhook servers, which includes conversion, validation, and defaulting
// webhooks that are registered.
Expand Down Expand Up @@ -605,8 +626,8 @@ func (cm *controllerManager) startNonLeaderElectionRunnables() {
}

func (cm *controllerManager) startLeaderElectionRunnables() {
cm.mu.Lock()
defer cm.mu.Unlock()
cm.addMu.RLock()
defer cm.addMu.RUnlock()

cm.waitForCache(cm.internalCtx)

Expand All @@ -617,10 +638,15 @@ func (cm *controllerManager) startLeaderElectionRunnables() {
cm.startRunnable(c)
}

// Note: startedLeader is used by Add, but it is safe to change here because
// addMu.RLock() prevents the above function to be executed concurrently with this operation.
cm.startedLeader = true
}

func (cm *controllerManager) waitForCache(ctx context.Context) {
cm.cacheMu.Lock()
defer cm.cacheMu.Unlock()

if cm.started {
return
}
Expand All @@ -638,14 +664,22 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
// of closed channel" panic

// Note: started is used by Add, so it is required to get an addMu lock/RLock before
// calling this func in order to prevent the above function to be executed concurrently
// with this operation.
cm.started = true
}

func (cm *controllerManager) startLeaderElection() (err error) {
ctx, cancel := context.WithCancel(context.Background())
cm.mu.Lock()

// Note: leaderElectionCancel is used by engageStopProcedure, which already gets a addMu.Rlock;
// thus, in order to prevent the above function to be executed concurrently with this operation, we
// require and addMu.Lock also here.
cm.addMu.Lock()
cm.leaderElectionCancel = cancel
cm.mu.Unlock()
cm.addMu.Unlock()

if cm.onStoppedLeading == nil {
cm.onStoppedLeading = func() {
Expand Down
8 changes: 4 additions & 4 deletions pkg/manager/manager_test.go
Expand Up @@ -1350,8 +1350,8 @@ var _ = Describe("manger.Manager", func() {

// Wait for the Manager to start
Eventually(func() bool {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.addMu.Lock()
defer mgr.addMu.Unlock()
return mgr.started
}).Should(BeTrue())

Expand Down Expand Up @@ -1381,8 +1381,8 @@ var _ = Describe("manger.Manager", func() {

// Wait for the Manager to start
Eventually(func() bool {
mgr.mu.Lock()
defer mgr.mu.Unlock()
mgr.addMu.Lock()
defer mgr.addMu.Unlock()
return mgr.started
}).Should(BeTrue())

Expand Down