From 66670ea64aa8f78462e7fc81aad3376087b9f30b Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Tue, 12 Oct 2021 10:50:42 -0700 Subject: [PATCH] Refactor manager internal around RunnableGroup(s) Signed-off-by: Vince Prignano --- pkg/manager/internal.go | 403 +++++++++++------------------ pkg/manager/manager.go | 8 + pkg/manager/manager_test.go | 61 +++-- pkg/manager/runnable_group.go | 242 +++++++++++++++++ pkg/manager/runnable_group_test.go | 172 ++++++++++++ pkg/webhook/server.go | 5 +- 6 files changed, 621 insertions(+), 270 deletions(-) create mode 100644 pkg/manager/runnable_group.go create mode 100644 pkg/manager/runnable_group_test.go diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index cd01715b4e..e4a19d0ec1 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -23,6 +23,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "time" "github.com/go-logr/logr" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -61,17 +63,14 @@ const ( var _ Runnable = &controllerManager{} type controllerManager struct { + started *int64 + ready *int64 + errChan chan error + runnables *runnables + // cluster holds a variety of methods to interact with a cluster. Required. cluster cluster.Cluster - // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. - // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable - - // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. - // These Runnables will not be blocked by lead election. - nonLeaderElectionRunnables []Runnable - // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider @@ -104,12 +103,6 @@ type controllerManager struct { // Healthz probe handler healthzHandler *healthz.Handler - mu sync.Mutex - started bool - startedLeader bool - healthzStarted bool - errChan chan error - // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -117,25 +110,20 @@ type controllerManager struct { // If none is set, it defaults to log.Log global logger. logger logr.Logger - // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, - // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that - // it must be deferred until after gracefulShutdown is done. - leaderElectionCancel context.CancelFunc - // leaderElectionStopped is an internal channel used to signal the stopping procedure that the // LeaderElection.Run(...) function has returned and the shutdown can proceed. leaderElectionStopped chan struct{} - // stop procedure engaged. In other words, we should not add anything else to the manager - stopProcedureEngaged bool + // leaderElectionCancel is used to cancel the leader election. It is distinct from internalStopper, + // because for safety reasons we need to os.Exit() when we lose the leader election, meaning that + // it must be deferred until after gracefulShutdown is done. + leaderElectionCancel context.CancelFunc // elected is closed when this manager becomes the leader of a group of // managers, either because it won a leader election or because no leader // election was configured. elected chan struct{} - caches []hasCache - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -160,10 +148,6 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration - // waitForRunnable is holding the number of runnables currently running so that - // we can wait for them to exit before quitting the manager - waitForRunnable sync.WaitGroup - // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration @@ -192,42 +176,17 @@ type hasCache interface { // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { - cm.mu.Lock() - defer cm.mu.Unlock() - if cm.stopProcedureEngaged { - return errors.New("can't accept new runnable as stop procedure is already engaged") - } - // Set dependencies on the object if err := cm.SetFields(r); err != nil { return err } - var shouldStart bool - - // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { - shouldStart = cm.started - cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) - } else if hasCache, ok := r.(hasCache); ok { - cm.caches = append(cm.caches, hasCache) - if cm.started { - cm.startRunnable(hasCache) - if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) { - return fmt.Errorf("could not sync cache") - } + return cm.runnables.Add(r, func(ctx context.Context) bool { + if cache, ok := r.(hasCache); ok { + return cache.GetCache().WaitForCacheSync(cm.internalCtx) } - } else { - shouldStart = cm.startedLeader - cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) - } - - if shouldStart { - // If already started, start the controller - cm.startRunnable(r) - } - - return nil + return true + }) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. @@ -250,13 +209,14 @@ func (cm *controllerManager) SetFields(i interface{}) error { // AddMetricsExtraHandler adds extra handler served on path to the http server that serves metrics. func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Handler) error { + if atomic.LoadInt64(cm.started) > 0 { + return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") + } + if path == defaultMetricsEndpoint { return fmt.Errorf("overriding builtin %s endpoint is not allowed", defaultMetricsEndpoint) } - cm.mu.Lock() - defer cm.mu.Unlock() - if _, found := cm.metricsExtraHandlers[path]; found { return fmt.Errorf("can't register extra handler by duplicate path %q on metrics http server", path) } @@ -268,14 +228,7 @@ func (cm *controllerManager) AddMetricsExtraHandler(path string, handler http.Ha // AddHealthzCheck allows you to add Healthz checker. func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() - - if cm.stopProcedureEngaged { - return errors.New("can't accept new healthCheck as stop procedure is already engaged") - } - - if cm.healthzStarted { + if atomic.LoadInt64(cm.started) > 0 { return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } @@ -289,15 +242,8 @@ func (cm *controllerManager) AddHealthzCheck(name string, check healthz.Checker) // AddReadyzCheck allows you to add Readyz checker. func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) error { - cm.mu.Lock() - defer cm.mu.Unlock() - - if cm.stopProcedureEngaged { - return errors.New("can't accept new ready check as stop procedure is already engaged") - } - - if cm.healthzStarted { - return fmt.Errorf("unable to add new checker because readyz endpoint has already been created") + if atomic.LoadInt64(cm.started) > 0 { + return fmt.Errorf("unable to add new checker because healthz endpoint has already been created") } if cm.readyzHandler == nil { @@ -371,77 +317,82 @@ func (cm *controllerManager) serveMetrics() { // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux := http.NewServeMux() mux.Handle(defaultMetricsEndpoint, handler) + for path, extraHandler := range cm.metricsExtraHandlers { + mux.Handle(path, extraHandler) + } - func() { - cm.mu.Lock() - defer cm.mu.Unlock() - - for path, extraHandler := range cm.metricsExtraHandlers { - mux.Handle(path, extraHandler) - } - }() - - server := http.Server{ - Handler: mux, + server := &http.Server{ + Handler: mux, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout + ReadHeaderTimeout: 32 * time.Second, } - // Run the server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { + + // Shutdown the server when stop is closed + go func() { cm.logger.Info("Starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { - return err + cm.logger.Error(err, "error serving on metrics server") } - return nil - })) + }() - // Shutdown the server when stop is closed <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - cm.errChan <- err + <-cm.shutdownCtx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil && !errors.Is(err, context.Canceled) { + cm.logger.Error(err, "error shutting down metrics server") } } func (cm *controllerManager) serveHealthProbes() { mux := http.NewServeMux() - server := http.Server{ - Handler: mux, + server := &http.Server{ + Handler: mux, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout + ReadHeaderTimeout: 32 * time.Second, } - func() { - cm.mu.Lock() - defer cm.mu.Unlock() + if cm.readyzHandler != nil { + mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + } + if cm.healthzHandler != nil { + mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + } - if cm.readyzHandler != nil { - mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - } - if cm.healthzHandler != nil { - mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + go func() { + // Run server. + cm.logger.Info("Starting health probe server") + if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { + cm.logger.Error(err, "error serving health probe server") } - - // Run server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { - if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - return err - } - return nil - })) - cm.healthzStarted = true }() - // Shutdown the server when stop is closed <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - cm.errChan <- err + <-cm.shutdownCtx.Done() + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) + defer cancel() + if err := server.Shutdown(ctx); err != nil && !errors.Is(err, context.Canceled) { + cm.logger.Error(err, "error shutting down health probes server") } } +// Start starts the manager and locks indefinitely. +// There is only two ways to have start return: +// An error has occurred during in one of the internal operations, +// such as leader election, cache start, webhooks, and so on. +// Or, the context is cancelled. func (cm *controllerManager) Start(ctx context.Context) (err error) { - if err := cm.Add(cm.cluster); err != nil { - return fmt.Errorf("failed to add cluster to runnables: %w", err) + if !atomic.CompareAndSwapInt64(cm.started, 0, 1) { + return errors.New("manager already started") } + + // Initialize the internal context. cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request @@ -463,40 +414,69 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() - // initialize this here so that we reset the signal channel state on every start - // Everything that might write into this channel must be started in a new goroutine, - // because otherwise we might block this routine trying to write into the full channel - // and will not be able to enter the deferred cm.engageStopProcedure() which drains - // it. - cm.errChan = make(chan error) + // Add the cluster runnable. + if err := cm.Add(cm.cluster); err != nil { + return fmt.Errorf("failed to add cluster to runnables: %w", err) + } // Metrics should be served whether the controller is leader or not. // (If we don't serve metrics for non-leaders, prometheus will still scrape - // the pod but will get a connection refused) + // the pod but will get a connection refused). if cm.metricsListener != nil { go cm.serveMetrics() } - // Serve health probes + // Serve health probes. if cm.healthProbeListener != nil { go cm.serveHealthProbes() } - go cm.startNonLeaderElectionRunnables() + // First start any webhook servers, which includes conversion, validation, and defaulting + // webhooks that are registered. + // + // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition + // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks + // to never start because no cache can be populated. + if err := cm.runnables.webhooks.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } + } - go func() { - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - cm.errChan <- err - } - } else { - // Treat not having leader election enabled the same as being elected. - cm.startLeaderElectionRunnables() - close(cm.elected) + // Start and wait for caches. + if err := cm.runnables.caches.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err } - }() + } + // Start the non-leaderelection Runnables after the cache has synced. + if err := cm.runnables.others.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } + } + + // Start the leader election and all required runnables. + { + ctx, cancel := context.WithCancel(context.Background()) + cm.leaderElectionCancel = cancel + go func() { + if cm.resourceLock != nil { + if err := cm.startLeaderElection(ctx); err != nil { + cm.errChan <- err + } + } else { + // Treat not having leader election enabled the same as being elected. + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + } + close(cm.elected) + } + }() + } + + atomic.StoreInt64(cm.ready, 1) select { case <-ctx.Done(): // We are done @@ -519,15 +499,18 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } defer shutdownCancel() - // Cancel the internal stop channel and wait for the procedures to stop and complete. - close(cm.internalProceduresStop) - cm.internalCancel() + var closeOnce sync.Once // Start draining the errors before acquiring the lock to make sure we don't deadlock // if something that has the lock is blocked on trying to write into the unbuffered // channel after something else already wrote into it. go func() { for { + closeOnce.Do(func() { + // Cancel the internal stop channel and wait for the procedures to stop and complete. + close(cm.internalProceduresStop) + cm.internalCancel() + }) select { case err, ok := <-cm.errChan: if ok { @@ -538,13 +521,10 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } } }() + if cm.gracefulShutdownTimeout == 0 { return nil } - cm.mu.Lock() - defer cm.mu.Unlock() - cm.stopProcedureEngaged = true - // we want to close this after the other runnables stop, because we don't // want things like leader election to try and emit events on a closed // channel @@ -557,7 +537,7 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelFunc) (retErr error) { // Cancel leader election only after we waited. It will os.Exit() the app for safety. defer func() { - if retErr == nil && cm.leaderElectionCancel != nil { + if retErr == nil && cm.resourceLock != nil { // After asking the context to be cancelled, make sure // we wait for the leader stopped channel to be closed, otherwise // we might encounter race conditions between this code @@ -568,7 +548,10 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF }() go func() { - cm.waitForRunnable.Wait() + cm.runnables.others.StopAndWait() + cm.runnables.caches.StopAndWait() + cm.runnables.leaderElection.StopAndWait() + cm.runnables.webhooks.StopAndWait() shutdownCancel() }() @@ -579,91 +562,11 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF return nil } -func (cm *controllerManager) startNonLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - // First start any webhook servers, which includes conversion, validation, and defaulting - // webhooks that are registered. - // - // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition - // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks - // to never start because no cache can be populated. - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - cm.startRunnable(c) - } - } - - // Start and wait for caches. - cm.waitForCache(cm.internalCtx) - - // Start the non-leaderelection Runnables after the cache has synced - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - continue - } - - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } -} - -func (cm *controllerManager) startLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.waitForCache(cm.internalCtx) - - // Start the leader election Runnables after the cache has synced - for _, c := range cm.leaderElectionRunnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } - - cm.startedLeader = true -} - -func (cm *controllerManager) waitForCache(ctx context.Context) { - if cm.started { - return - } - - for _, cache := range cm.caches { - cm.startRunnable(cache) - } - - // Wait for the caches to sync. - // TODO(community): Check the return value and write a test - for _, cache := range cm.caches { - cache.GetCache().WaitForCacheSync(ctx) - } - // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse - // cm.started as check if we already started the cache so it must always become true. - // Making sure that the cache doesn't get started twice is needed to not get a "close - // of closed channel" panic - cm.started = true +func (cm *controllerManager) startLeaderElectionRunnables() error { + return cm.runnables.leaderElection.StartAndWaitReady(cm.internalCtx) } -func (cm *controllerManager) startLeaderElection() (err error) { - ctx, cancel := context.WithCancel(context.Background()) - cm.mu.Lock() - cm.leaderElectionCancel = cancel - cm.mu.Unlock() - - if cm.onStoppedLeading == nil { - cm.onStoppedLeading = func() { - // Make sure graceful shutdown is skipped if we lost the leader lock without - // intending to. - cm.gracefulShutdownTimeout = time.Duration(0) - // Most implementations of leader election log.Fatal() here. - // Since Start is wrapped in log.Fatal when called, we can just return - // an error here which will cause the program to exit. - cm.errChan <- errors.New("leader election lost") - } - } +func (cm *controllerManager) startLeaderElection(ctx context.Context) (err error) { l, err := leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ Lock: cm.resourceLock, LeaseDuration: cm.leaseDuration, @@ -671,10 +574,24 @@ func (cm *controllerManager) startLeaderElection() (err error) { RetryPeriod: cm.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { - cm.startLeaderElectionRunnables() + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } close(cm.elected) }, - OnStoppedLeading: cm.onStoppedLeading, + OnStoppedLeading: func() { + if cm.onStoppedLeading != nil { + cm.onStoppedLeading() + } + // Make sure graceful shutdown is skipped if we lost the leader lock without + // intending to. + cm.gracefulShutdownTimeout = time.Duration(0) + // Most implementations of leader election log.Fatal() here. + // Since Start is wrapped in log.Fatal when called, we can just return + // an error here which will cause the program to exit. + cm.errChan <- errors.New("leader election lost") + }, }, ReleaseOnCancel: cm.leaderElectionReleaseOnCancel, }) @@ -694,13 +611,3 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } - -func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) - go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err - } - }() -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2d2733f0a6..9f8b9c492c 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" + "k8s.io/utils/pointer" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/cluster" @@ -365,8 +366,15 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + errChan := make(chan error) + runnables := newRunnables(errChan) + return &controllerManager{ + started: pointer.Int64(0), + ready: pointer.Int64(0), cluster: cluster, + runnables: runnables, + errChan: errChan, recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2cb2c72560..4ebd966d89 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -306,7 +306,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.Start(ctx)).To(BeNil()) close(mgrDone) }() - <-cm.elected + <-cm.Elected() cancel() select { case <-leaderElectionDone: @@ -401,8 +401,8 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m1.Elected()).ShouldNot(BeClosed()) Expect(m1.Start(ctx1)).NotTo(HaveOccurred()) - Expect(m1.Elected()).Should(BeClosed()) }() + <-m1.Elected() <-c1 c2 := make(chan struct{}) @@ -435,6 +435,7 @@ var _ = Describe("manger.Manager", func() { Expect(m).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("expected error"))) }) + It("should return an error if namespace not set and not running in cluster", func() { m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) @@ -609,9 +610,9 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Elected()).ShouldNot(BeClosed()) Expect(m.Start(ctx)).NotTo(HaveOccurred()) - Expect(m.Elected()).Should(BeClosed()) }() + <-m.Elected() wgRunnableStarted.Wait() }) @@ -653,7 +654,9 @@ var _ = Describe("manger.Manager", func() { } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}} + Expect(mgr.Add( + &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}, + )).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -672,14 +675,15 @@ var _ = Describe("manger.Manager", func() { } runnableWasStarted := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + runnable := RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() if !fakeCache.wasSynced { return errors.New("runnable got started before cache was synced") } close(runnableWasStarted) return nil - }))).To(Succeed()) + }) + Expect(m.Add(runnable)).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -801,8 +805,11 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.Add(fakeCluster)).NotTo(HaveOccurred()) - Expect(fakeCluster.informer.wasStarted).To(BeTrue()) - Expect(fakeCluster.informer.wasSynced).To(BeTrue()) + Eventually(func() bool { + fakeCluster.informer.mu.Lock() + defer fakeCluster.informer.mu.Unlock() + return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced + }).Should(BeTrue()) }) It("should wait for runnables to stop", func() { @@ -1141,6 +1148,7 @@ var _ = Describe("manger.Manager", func() { endpoint := fmt.Sprintf("http://%s/should-not-exist", listener.Addr().String()) resp, err := http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(404)) }) @@ -1167,6 +1175,7 @@ var _ = Describe("manger.Manager", func() { metricsEndpoint := fmt.Sprintf("http://%s/metrics", listener.Addr().String()) resp, err := http.Get(metricsEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(200)) data, err := ioutil.ReadAll(resp.Body) @@ -1208,6 +1217,7 @@ var _ = Describe("manger.Manager", func() { endpoint := fmt.Sprintf("http://%s/debug", listener.Addr().String()) resp, err := http.Get(endpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) body, err := ioutil.ReadAll(resp.Body) @@ -1261,7 +1271,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() error { _, err = http.Get(endpoint) return err - }).ShouldNot(Succeed()) + }, 10*time.Second).ShouldNot(Succeed()) }) It("should serve readiness endpoint", func() { @@ -1286,12 +1296,14 @@ var _ = Describe("manger.Manager", func() { // Controller is not ready resp, err := http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) // Controller is ready res = nil resp, err = http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path without trailing slash without redirect @@ -1304,6 +1316,7 @@ var _ = Describe("manger.Manager", func() { } resp, err = httpClient.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path for individual check @@ -1311,6 +1324,7 @@ var _ = Describe("manger.Manager", func() { res = nil resp, err = http.Get(readinessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) @@ -1336,12 +1350,14 @@ var _ = Describe("manger.Manager", func() { // Controller is not ready resp, err := http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusInternalServerError)) // Controller is ready res = nil resp, err = http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check liveness path without trailing slash without redirect @@ -1354,6 +1370,7 @@ var _ = Describe("manger.Manager", func() { } resp, err = httpClient.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) // Check readiness path for individual check @@ -1361,6 +1378,7 @@ var _ = Describe("manger.Manager", func() { res = nil resp, err = http.Get(livenessEndpoint) Expect(err).NotTo(HaveOccurred()) + defer resp.Body.Close() Expect(resp.StatusCode).To(Equal(http.StatusOK)) }) }) @@ -1390,9 +1408,7 @@ var _ = Describe("manger.Manager", func() { // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.caches.Started() }).Should(BeTrue()) // Add another component after starting @@ -1421,9 +1437,7 @@ var _ = Describe("manger.Manager", func() { // Wait for the Manager to start Eventually(func() bool { - mgr.mu.Lock() - defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.caches.Started() }).Should(BeTrue()) c1 := make(chan struct{}) @@ -1577,6 +1591,8 @@ var _ = Describe("manger.Manager", func() { defer close(doneCh) Expect(m.Start(ctx)).To(Succeed()) }() + <-m.Elected() + Eventually(func() *corev1.Event { evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns) Expect(err).NotTo(HaveOccurred()) @@ -1765,11 +1781,12 @@ func (c *cacheProvider) Start(ctx context.Context) error { } type startSignalingInformer struct { + mu sync.Mutex + // The manager calls Start and WaitForCacheSync in // parallel, so we have to protect wasStarted with a Mutex // and block in WaitForCacheSync until it is true. - wasStartedLock sync.Mutex - wasStarted bool + wasStarted bool // was synced will be true once Start was called and // WaitForCacheSync returned, just like a real cache. wasSynced bool @@ -1777,15 +1794,15 @@ type startSignalingInformer struct { } func (c *startSignalingInformer) started() bool { - c.wasStartedLock.Lock() - defer c.wasStartedLock.Unlock() + c.mu.Lock() + defer c.mu.Unlock() return c.wasStarted } func (c *startSignalingInformer) Start(ctx context.Context) error { - c.wasStartedLock.Lock() + c.mu.Lock() c.wasStarted = true - c.wasStartedLock.Unlock() + c.mu.Unlock() return c.Cache.Start(ctx) } @@ -1794,7 +1811,9 @@ func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool { for !c.started() { continue } + c.mu.Lock() c.wasSynced = true + c.mu.Unlock() }() return c.Cache.WaitForCacheSync(ctx) } diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go new file mode 100644 index 0000000000..e18a4eb288 --- /dev/null +++ b/pkg/manager/runnable_group.go @@ -0,0 +1,242 @@ +package manager + +import ( + "context" + "errors" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var ( + errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged") +) + +type readyRunnable struct { + Runnable + ReadyCheck readyCheck +} + +type readyCheck func(ctx context.Context) bool + +type runnables struct { + webhooks *runnableGroup + caches *runnableGroup + leaderElection *runnableGroup + others *runnableGroup +} + +func newRunnables(errChan chan error) *runnables { + return &runnables{ + webhooks: newRunnableGroup(errChan), + caches: newRunnableGroup(errChan), + leaderElection: newRunnableGroup(errChan), + others: newRunnableGroup(errChan), + } +} + +func (r *runnables) Add(fn Runnable, ready readyCheck) error { + switch runnable := fn.(type) { + case hasCache: + return r.caches.Add(fn, ready) + case *webhook.Server: + return r.webhooks.Add(fn, ready) + case LeaderElectionRunnable: + if !runnable.NeedLeaderElection() { + return r.others.Add(fn, ready) + } + return r.leaderElection.Add(fn, ready) + default: + return r.others.Add(fn, ready) + } +} + +func (r *runnables) Start(ctx context.Context) { + r.webhooks.Start(ctx) + r.caches.Start(ctx) + r.leaderElection.Start(ctx) + r.others.Start(ctx) +} + +type runnableGroup struct { + internalCtx context.Context + errChan chan error + + start sync.Mutex + startOnce sync.Once + started bool + + stop sync.RWMutex + stopOnce sync.Once + stopped bool + + ch chan *readyRunnable + wg *sync.WaitGroup + buffer *sync.Map +} + +func newRunnableGroup(errChan chan error) *runnableGroup { + r := &runnableGroup{ + errChan: errChan, + ch: make(chan *readyRunnable), + wg: new(sync.WaitGroup), + buffer: new(sync.Map), + } + return r +} + +func (r *runnableGroup) Started() bool { + r.start.Lock() + defer r.start.Unlock() + return r.started +} + +func (r *runnableGroup) StartAndWaitReady(ctx context.Context) error { + r.Start(ctx) + return r.WaitReady(ctx) +} + +func (r *runnableGroup) Start(ctx context.Context) { + r.startOnce.Do(func() { + go r.reconcile() + + r.start.Lock() + r.internalCtx = ctx + r.started = true + r.buffer.Range(func(key, _ interface{}) bool { + r.ch <- key.(*readyRunnable) + return true + }) + r.start.Unlock() + }) +} + +func (r *runnableGroup) reconcile() { + for runnable := range r.ch { + // Handle stop. + // If the shutdown has been called we want to avoid + // adding new goroutines to the WaitGroup because Wait() + // panics if Add() is called after it. + { + r.stop.RLock() + if r.stopped { + // Drop any runnables if we're stopped. + r.errChan <- errRunnableGroupStopped + r.stop.RUnlock() + continue + } + + // Why is this here? + // When StopAndWait is called, if a runnable is in the process + // of being added, we could end up in a situation where + // the WaitGroup is incremented while StopAndWait has called Wait(), + // which would result in a panic. + r.wg.Add(1) + r.stop.RUnlock() + } + + // Start the runnable. + go func(rn *readyRunnable) { + go func() { + // Run the ready check a fixed number of times + // backing off a bit; this is to give time to the runnables + // to start up before their health check returns true. + ready := false + for i := 0; i < 10; i++ { + if ready = rn.ReadyCheck(r.internalCtx); !ready { + continue + } + break + } + if ready { + r.buffer.Store(rn, true) + } + }() + + defer r.wg.Done() + defer r.buffer.Store(rn, true) + if err := rn.Start(r.internalCtx); err != nil { + r.errChan <- err + } + }(runnable) + } +} + +// WaitReady polls until the group is ready until the context is cancelled. +func (r *runnableGroup) WaitReady(ctx context.Context) error { + return wait.PollImmediateInfiniteWithContext(ctx, + 100*time.Millisecond, + func(_ context.Context) (bool, error) { + if !r.Started() { + return false, nil + } + ready, total := 0, 0 + r.buffer.Range(func(_, value interface{}) bool { + total++ + if rd, ok := value.(bool); ok && rd { + ready++ + } + return true + }) + return ready == total, nil + }, + ) +} + +// Add should be able to be called before and after Start, but not after shutdown. +// Add should return an error when called during shutdown. +func (r *runnableGroup) Add(rn Runnable, ready readyCheck) error { + r.stop.RLock() + if r.stopped { + r.stop.RUnlock() + return errRunnableGroupStopped + } + r.stop.RUnlock() + + // If we don't have a readiness check, always return true. + if ready == nil { + ready = func(_ context.Context) bool { return true } + } + + readyRunnable := &readyRunnable{ + Runnable: rn, + ReadyCheck: ready, + } + + // Store the runnable in the internal buffer. + r.buffer.Store(readyRunnable, false) + + // Handle start. + // If the overall runnable group isn't started yet + // we want to buffer the runnables and let Start() + // queue them up again later. + { + r.start.Lock() + if !r.started { + r.start.Unlock() + return nil + } + r.start.Unlock() + } + + // Enqueue the runnable. + r.ch <- readyRunnable + return nil +} + +// StopAndWait waits for all the runnables to finish before returning. +func (r *runnableGroup) StopAndWait() { + r.stopOnce.Do(func() { + r.stop.Lock() + // Store the stopped variable so we don't accept any new + // runnables for the time being. + r.stopped = true + r.stop.Unlock() + + // Wait for all the runnables to finish. + r.wg.Wait() + close(r.ch) + }) +} diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go new file mode 100644 index 0000000000..f93a9dda06 --- /dev/null +++ b/pkg/manager/runnable_group_test.go @@ -0,0 +1,172 @@ +package manager + +import ( + "context" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/utils/pointer" +) + +var _ = Describe("runnableGroup", func() { + errCh := make(chan error) + + Describe("new", func() { + It("should be able to add new runnables before it starts", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + + Expect(rg.Started()).To(BeFalse()) + }) + + It("should be able to add new runnables before and after start", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + rg.Start(ctx) + Expect(rg.Started()).To(BeTrue()) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + Expect(rg.WaitReady(ctx)).To(Succeed()) + }) + + It("should be able to add new runnables before and after start concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + <-time.After(time.Duration(i) * 10 * time.Millisecond) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Eventually(func() int { + i := 0 + rg.buffer.Range(func(key, value interface{}) bool { + i++ + return true + }) + return i + }).Should(BeNumerically("==", 20)) + }) + + It("should be able to close the group and wait for all runnables to finish", func() { + ctx, cancel := context.WithCancel(context.Background()) + + exited := pointer.Int64(0) + rg := newRunnableGroup(errCh) + for i := 0; i < 10; i++ { + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + defer atomic.AddInt64(exited, 1) + <-ctx.Done() + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return nil + }), nil)).To(Succeed()) + } + rg.Start(ctx) + Expect(rg.WaitReady(ctx)).To(Succeed()) + + // Cancel the context, asking the runnables to exit. + cancel() + + // Watch for stop sign. + stopped := make(chan struct{}) + go func() { + rg.StopAndWait() + close(stopped) + }() + <-stopped + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + return nil + }), nil)).ToNot(Succeed()) + + Expect(*exited).To(BeNumerically("==", 10)) + }) + + It("should be able to wait for all runnables to be ready at different intervals", func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return true + })).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Eventually(func() int { + i := 0 + rg.buffer.Range(func(key, value interface{}) bool { + i++ + return true + }) + return i + }).Should(BeNumerically("==", 20)) + }) + + It("should not turn ready if some readiness check fail", func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return i%2 == 0 // Return false readiness all uneven indexes. + })).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).ToNot(Succeed()) + }) + }) +}) diff --git a/pkg/webhook/server.go b/pkg/webhook/server.go index 1db38113f7..15b81ba4c3 100644 --- a/pkg/webhook/server.go +++ b/pkg/webhook/server.go @@ -262,7 +262,10 @@ func (s *Server) Start(ctx context.Context) error { log.Info("Serving webhook server", "host", s.Host, "port", s.Port) srv := &http.Server{ - Handler: s.WebhookMux, + Handler: s.WebhookMux, + MaxHeaderBytes: 1 << 20, + IdleTimeout: 90 * time.Second, // matches http.DefaultTransport keep-alive timeout + ReadHeaderTimeout: 32 * time.Second, } idleConnsClosed := make(chan struct{})