From 21128d141a571d4d7c5022a96cbfeae1772c1568 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Tue, 12 Oct 2021 06:58:15 -0700 Subject: [PATCH] Refactor manager internal around RunnableGroup(s) Signed-off-by: Vince Prignano --- go.mod | 1 + pkg/manager/internal.go | 321 ++++++++++++----------------- pkg/manager/manager.go | 5 + pkg/manager/manager_test.go | 27 ++- pkg/manager/runnable_group.go | 241 ++++++++++++++++++++++ pkg/manager/runnable_group_test.go | 172 ++++++++++++++++ 6 files changed, 567 insertions(+), 200 deletions(-) create mode 100644 pkg/manager/runnable_group.go create mode 100644 pkg/manager/runnable_group_test.go diff --git a/go.mod b/go.mod index fabf9b8f66..a3909cbadb 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module sigs.k8s.io/controller-runtime go 1.16 require ( + github.com/davecgh/go-spew v1.1.1 github.com/evanphx/json-patch v4.11.0+incompatible github.com/fsnotify/fsnotify v1.4.9 github.com/go-logr/logr v0.4.0 diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index cd01715b4e..836707a33d 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -30,6 +30,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" kerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" @@ -64,14 +65,6 @@ type controllerManager struct { // cluster holds a variety of methods to interact with a cluster. Required. cluster cluster.Cluster - // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. - // These Runnables are managed by lead election. - leaderElectionRunnables []Runnable - - // nonLeaderElectionRunnables is the set of webhook servers that the controllerManager injects deps into and Starts. - // These Runnables will not be blocked by lead election. - nonLeaderElectionRunnables []Runnable - // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider @@ -105,10 +98,9 @@ type controllerManager struct { healthzHandler *healthz.Handler mu sync.Mutex - started bool - startedLeader bool healthzStarted bool errChan chan error + runnables *runnables // controllerOptions are the global controller options. controllerOptions v1alpha1.ControllerConfigurationSpec @@ -134,8 +126,6 @@ type controllerManager struct { // election was configured. elected chan struct{} - caches []hasCache - // port is the port that the webhook server serves at. port int // host is the hostname that the webhook server binds to. @@ -160,10 +150,6 @@ type controllerManager struct { // between tries of actions. retryPeriod time.Duration - // waitForRunnable is holding the number of runnables currently running so that - // we can wait for them to exit before quitting the manager - waitForRunnable sync.WaitGroup - // gracefulShutdownTimeout is the duration given to runnable to stop // before the manager actually returns on stop. gracefulShutdownTimeout time.Duration @@ -194,6 +180,7 @@ type hasCache interface { func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() defer cm.mu.Unlock() + if cm.stopProcedureEngaged { return errors.New("can't accept new runnable as stop procedure is already engaged") } @@ -203,31 +190,12 @@ func (cm *controllerManager) Add(r Runnable) error { return err } - var shouldStart bool - - // Add the runnable to the leader election or the non-leaderelection list - if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { - shouldStart = cm.started - cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) - } else if hasCache, ok := r.(hasCache); ok { - cm.caches = append(cm.caches, hasCache) - if cm.started { - cm.startRunnable(hasCache) - if !hasCache.GetCache().WaitForCacheSync(cm.internalCtx) { - return fmt.Errorf("could not sync cache") - } + return cm.runnables.Add(r, func(ctx context.Context) bool { + if cache, ok := r.(hasCache); ok { + return cache.GetCache().WaitForCacheSync(cm.internalCtx) } - } else { - shouldStart = cm.startedLeader - cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) - } - - if shouldStart { - // If already started, start the controller - cm.startRunnable(r) - } - - return nil + return true + }) } // Deprecated: use the equivalent Options field to set a field. This method will be removed in v0.10. @@ -364,85 +332,93 @@ func (cm *controllerManager) GetControllerOptions() v1alpha1.ControllerConfigura return cm.controllerOptions } -func (cm *controllerManager) serveMetrics() { +func (cm *controllerManager) serveMetrics() error { handler := promhttp.HandlerFor(metrics.Registry, promhttp.HandlerOpts{ ErrorHandling: promhttp.HTTPErrorOnError, }) // TODO(JoelSpeed): Use existing Kubernetes machinery for serving metrics mux := http.NewServeMux() mux.Handle(defaultMetricsEndpoint, handler) + for path, extraHandler := range cm.metricsExtraHandlers { + mux.Handle(path, extraHandler) + } - func() { - cm.mu.Lock() - defer cm.mu.Unlock() - - for path, extraHandler := range cm.metricsExtraHandlers { - mux.Handle(path, extraHandler) - } - }() - - server := http.Server{ + server := &http.Server{ Handler: mux, } // Run the server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { + if err := cm.runnables.Add(RunnableFunc(func(_ context.Context) error { cm.logger.Info("Starting metrics server", "path", defaultMetricsEndpoint) if err := server.Serve(cm.metricsListener); err != nil && err != http.ErrServerClosed { return err } return nil - })) + }), nil); err != nil { + return err + } // Shutdown the server when stop is closed - <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - cm.errChan <- err - } + go func() { + <-cm.internalProceduresStop + if err := server.Shutdown(cm.shutdownCtx); err != nil { + cm.errChan <- err + } + }() + + return nil } -func (cm *controllerManager) serveHealthProbes() { +func (cm *controllerManager) serveHealthProbes() error { mux := http.NewServeMux() - server := http.Server{ + server := &http.Server{ Handler: mux, } - func() { - cm.mu.Lock() - defer cm.mu.Unlock() + if cm.readyzHandler != nil { + mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) + } + if cm.healthzHandler != nil { + mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + // Append '/' suffix to handle subpaths + mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + } - if cm.readyzHandler != nil { - mux.Handle(cm.readinessEndpointName, http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.readinessEndpointName+"/", http.StripPrefix(cm.readinessEndpointName, cm.readyzHandler)) - } - if cm.healthzHandler != nil { - mux.Handle(cm.livenessEndpointName, http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) - // Append '/' suffix to handle subpaths - mux.Handle(cm.livenessEndpointName+"/", http.StripPrefix(cm.livenessEndpointName, cm.healthzHandler)) + // Run server + if err := cm.runnables.Add(RunnableFunc(func(_ context.Context) error { + if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { + return err } + return nil + }), nil); err != nil { + return err + } + cm.healthzStarted = true - // Run server - cm.startRunnable(RunnableFunc(func(_ context.Context) error { - if err := server.Serve(cm.healthProbeListener); err != nil && err != http.ErrServerClosed { - return err - } - return nil - })) - cm.healthzStarted = true + go func() { + // Shutdown the server when stop is closed + <-cm.internalProceduresStop + if err := server.Shutdown(cm.shutdownCtx); err != nil { + cm.errChan <- err + } }() - // Shutdown the server when stop is closed - <-cm.internalProceduresStop - if err := server.Shutdown(cm.shutdownCtx); err != nil { - cm.errChan <- err - } + return nil } +// Start starts the manager and locks indefinitely. +// There is only two ways to have start return: +// An error has occurred during in one of the internal operations, +// such as leader election, cache start, webhooks, and so on. +// Or, the context is cancelled. func (cm *controllerManager) Start(ctx context.Context) (err error) { - if err := cm.Add(cm.cluster); err != nil { - return fmt.Errorf("failed to add cluster to runnables: %w", err) + cm.mu.Lock() + { + // Initialize the internal context. + cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) } - cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) + cm.mu.Unlock() // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) @@ -463,39 +439,72 @@ func (cm *controllerManager) Start(ctx context.Context) (err error) { } }() - // initialize this here so that we reset the signal channel state on every start - // Everything that might write into this channel must be started in a new goroutine, - // because otherwise we might block this routine trying to write into the full channel - // and will not be able to enter the deferred cm.engageStopProcedure() which drains - // it. - cm.errChan = make(chan error) + // Add the cluster runnable. + if err := cm.Add(cm.cluster); err != nil { + return fmt.Errorf("failed to add cluster to runnables: %w", err) + } - // Metrics should be served whether the controller is leader or not. - // (If we don't serve metrics for non-leaders, prometheus will still scrape - // the pod but will get a connection refused) - if cm.metricsListener != nil { - go cm.serveMetrics() + cm.mu.Lock() + { + // Metrics should be served whether the controller is leader or not. + // (If we don't serve metrics for non-leaders, prometheus will still scrape + // the pod but will get a connection refused). + if cm.metricsListener != nil { + if err := cm.serveMetrics(); err != nil { + cm.mu.Unlock() + return err + } + } + + // Serve health probes. + if cm.healthProbeListener != nil { + if err := cm.serveHealthProbes(); err != nil { + cm.mu.Unlock() + return err + } + } } + cm.mu.Unlock() - // Serve health probes - if cm.healthProbeListener != nil { - go cm.serveHealthProbes() + // First start any webhook servers, which includes conversion, validation, and defaulting + // webhooks that are registered. + // + // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition + // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks + // to never start because no cache can be populated. + if err := cm.runnables.webhooks.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } + } + // Start and wait for caches. + if err := cm.runnables.caches.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } } - go cm.startNonLeaderElectionRunnables() + // Start the non-leaderelection Runnables after the cache has synced. + if err := cm.runnables.others.StartAndWaitReady(cm.internalCtx); err != nil { + if err != wait.ErrWaitTimeout { + return err + } + } - go func() { - if cm.resourceLock != nil { - err := cm.startLeaderElection() - if err != nil { - cm.errChan <- err + // Start the leader election and all required runnables. + if cm.resourceLock != nil { + if err := cm.startLeaderElection(); err != nil { + return err + } + } else { + // Treat not having leader election enabled the same as being elected. + if err := cm.startLeaderElectionRunnables(); err != nil { + if err != wait.ErrWaitTimeout { + return err } - } else { - // Treat not having leader election enabled the same as being elected. - cm.startLeaderElectionRunnables() - close(cm.elected) } - }() + close(cm.elected) + } select { case <-ctx.Done(): @@ -568,7 +577,10 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF }() go func() { - cm.waitForRunnable.Wait() + cm.runnables.others.StopAndWait() + cm.runnables.caches.StopAndWait() + cm.runnables.leaderElection.StopAndWait() + cm.runnables.webhooks.StopAndWait() shutdownCancel() }() @@ -579,72 +591,8 @@ func (cm *controllerManager) waitForRunnableToEnd(shutdownCancel context.CancelF return nil } -func (cm *controllerManager) startNonLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - // First start any webhook servers, which includes conversion, validation, and defaulting - // webhooks that are registered. - // - // WARNING: Webhooks MUST start before any cache is populated, otherwise there is a race condition - // between conversion webhooks and the cache sync (usually initial list) which causes the webhooks - // to never start because no cache can be populated. - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - cm.startRunnable(c) - } - } - - // Start and wait for caches. - cm.waitForCache(cm.internalCtx) - - // Start the non-leaderelection Runnables after the cache has synced - for _, c := range cm.nonLeaderElectionRunnables { - if _, ok := c.(*webhook.Server); ok { - continue - } - - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } -} - -func (cm *controllerManager) startLeaderElectionRunnables() { - cm.mu.Lock() - defer cm.mu.Unlock() - - cm.waitForCache(cm.internalCtx) - - // Start the leader election Runnables after the cache has synced - for _, c := range cm.leaderElectionRunnables { - // Controllers block, but we want to return an error if any have an error starting. - // Write any Start errors to a channel so we can return them - cm.startRunnable(c) - } - - cm.startedLeader = true -} - -func (cm *controllerManager) waitForCache(ctx context.Context) { - if cm.started { - return - } - - for _, cache := range cm.caches { - cm.startRunnable(cache) - } - - // Wait for the caches to sync. - // TODO(community): Check the return value and write a test - for _, cache := range cm.caches { - cache.GetCache().WaitForCacheSync(ctx) - } - // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse - // cm.started as check if we already started the cache so it must always become true. - // Making sure that the cache doesn't get started twice is needed to not get a "close - // of closed channel" panic - cm.started = true +func (cm *controllerManager) startLeaderElectionRunnables() error { + return cm.runnables.leaderElection.StartAndWaitReady(cm.internalCtx) } func (cm *controllerManager) startLeaderElection() (err error) { @@ -671,7 +619,10 @@ func (cm *controllerManager) startLeaderElection() (err error) { RetryPeriod: cm.retryPeriod, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { - cm.startLeaderElectionRunnables() + if err := cm.startLeaderElectionRunnables(); err != nil { + cm.errChan <- err + return + } close(cm.elected) }, OnStoppedLeading: cm.onStoppedLeading, @@ -694,13 +645,3 @@ func (cm *controllerManager) startLeaderElection() (err error) { func (cm *controllerManager) Elected() <-chan struct{} { return cm.elected } - -func (cm *controllerManager) startRunnable(r Runnable) { - cm.waitForRunnable.Add(1) - go func() { - defer cm.waitForRunnable.Done() - if err := r.Start(cm.internalCtx); err != nil { - cm.errChan <- err - } - }() -} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 2d2733f0a6..8c1bcf3322 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -365,8 +365,13 @@ func New(config *rest.Config, options Options) (Manager, error) { return nil, err } + errChan := make(chan error) + runnables := newRunnables(errChan) + return &controllerManager{ cluster: cluster, + runnables: runnables, + errChan: errChan, recorderProvider: recorderProvider, resourceLock: resourceLock, metricsListener: metricsListener, diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2cb2c72560..2dbdf35284 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -286,7 +286,7 @@ var _ = Describe("manger.Manager", func() { runnableDone := make(chan struct{}) slowRunnable := RunnableFunc(func(ctx context.Context) error { <-ctx.Done() - time.Sleep(100 * time.Millisecond) + time.Sleep(80 * time.Millisecond) close(runnableDone) return nil }) @@ -306,7 +306,7 @@ var _ = Describe("manger.Manager", func() { Expect(m.Start(ctx)).To(BeNil()) close(mgrDone) }() - <-cm.elected + <-cm.Elected() cancel() select { case <-leaderElectionDone: @@ -435,6 +435,7 @@ var _ = Describe("manger.Manager", func() { Expect(m).To(BeNil()) Expect(err).To(MatchError(ContainSubstring("expected error"))) }) + It("should return an error if namespace not set and not running in cluster", func() { m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) @@ -609,7 +610,7 @@ var _ = Describe("manger.Manager", func() { defer GinkgoRecover() Expect(m.Elected()).ShouldNot(BeClosed()) Expect(m.Start(ctx)).NotTo(HaveOccurred()) - Expect(m.Elected()).Should(BeClosed()) + Eventually(m.Elected()).Should(BeClosed()) }() wgRunnableStarted.Wait() @@ -653,7 +654,9 @@ var _ = Describe("manger.Manager", func() { } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}} + Expect(mgr.Add( + &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}, + )).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -672,14 +675,15 @@ var _ = Describe("manger.Manager", func() { } runnableWasStarted := make(chan struct{}) - Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + runnable := RunnableFunc(func(ctx context.Context) error { defer GinkgoRecover() if !fakeCache.wasSynced { return errors.New("runnable got started before cache was synced") } close(runnableWasStarted) return nil - }))).To(Succeed()) + }) + Expect(m.Add(runnable)).To(Succeed()) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -801,8 +805,9 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) Expect(m.Add(fakeCluster)).NotTo(HaveOccurred()) - Expect(fakeCluster.informer.wasStarted).To(BeTrue()) - Expect(fakeCluster.informer.wasSynced).To(BeTrue()) + Eventually(func() bool { + return fakeCluster.informer.wasStarted && fakeCluster.informer.wasSynced + }).Should(BeTrue()) }) It("should wait for runnables to stop", func() { @@ -1392,7 +1397,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() bool { mgr.mu.Lock() defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.caches.Started() }).Should(BeTrue()) // Add another component after starting @@ -1423,7 +1428,7 @@ var _ = Describe("manger.Manager", func() { Eventually(func() bool { mgr.mu.Lock() defer mgr.mu.Unlock() - return mgr.started + return mgr.runnables.caches.Started() }).Should(BeTrue()) c1 := make(chan struct{}) @@ -1577,6 +1582,8 @@ var _ = Describe("manger.Manager", func() { defer close(doneCh) Expect(m.Start(ctx)).To(Succeed()) }() + <-m.Elected() + Eventually(func() *corev1.Event { evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go new file mode 100644 index 0000000000..dcdf6d4e38 --- /dev/null +++ b/pkg/manager/runnable_group.go @@ -0,0 +1,241 @@ +package manager + +import ( + "context" + "errors" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "sigs.k8s.io/controller-runtime/pkg/webhook" +) + +var ( + errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged") +) + +type readyRunnable struct { + Runnable + ReadyCheck readyCheck +} + +type readyCheck func(ctx context.Context) bool + +type runnables struct { + webhooks *runnableGroup + caches *runnableGroup + leaderElection *runnableGroup + others *runnableGroup +} + +func newRunnables(errChan chan error) *runnables { + return &runnables{ + webhooks: newRunnableGroup(errChan), + caches: newRunnableGroup(errChan), + leaderElection: newRunnableGroup(errChan), + others: newRunnableGroup(errChan), + } +} + +func (r *runnables) Add(fn Runnable, ready readyCheck) error { + switch runnable := fn.(type) { + case hasCache: + return r.caches.Add(fn, ready) + case *webhook.Server: + return r.webhooks.Add(fn, ready) + case LeaderElectionRunnable: + if !runnable.NeedLeaderElection() { + return r.others.Add(fn, ready) + } + return r.leaderElection.Add(fn, ready) + default: + return r.others.Add(fn, ready) + } +} + +func (r *runnables) Start(ctx context.Context) { + r.webhooks.Start(ctx) + r.caches.Start(ctx) + r.leaderElection.Start(ctx) + r.others.Start(ctx) +} + +type runnableGroup struct { + internalCtx context.Context + errChan chan error + + start sync.Mutex + startOnce sync.Once + started bool + + stop sync.RWMutex + stopOnce sync.Once + stopped bool + + ch chan *readyRunnable + wg *sync.WaitGroup + buffer *sync.Map +} + +func newRunnableGroup(errCh chan error) *runnableGroup { + r := &runnableGroup{ + errChan: errCh, + ch: make(chan *readyRunnable, 10), + wg: new(sync.WaitGroup), + buffer: new(sync.Map), + } + + go func() { + for runnable := range r.ch { + // Handle stop. + // If the shutdown has been called we want to avoid + // adding new goroutines to the WaitGroup because Wait() + // panics if Add() is called after it. + { + r.stop.RLock() + if r.stopped { + // Drop any runnables if we're stopped. + errCh <- errRunnableGroupStopped + r.stop.RUnlock() + continue + } + + // Why is this here? + // When StopAndWait is called, if a runnable is in the process + // of being added, we could end up in a situation where + // the WaitGroup is incremented while StopAndWait has called Wait(), + // which would result in a panic. + r.wg.Add(1) + r.stop.RUnlock() + } + + // Start the runnable. + go func(rn *readyRunnable) { + go func() { + // Run the ready check a fixed number of times + // backing off a bit; this is to give time to the runnables + // to start up before their health check returns true. + ready := false + for i := 0; i < 10; i++ { + if ready = rn.ReadyCheck(r.internalCtx); !ready { + continue + } + break + } + if ready { + r.buffer.Store(rn, true) + } + }() + + defer r.wg.Done() + defer r.buffer.Store(rn, true) + if err := rn.Start(r.internalCtx); err != nil { + r.errChan <- err + } + }(runnable) + } + }() + + return r +} + +func (r *runnableGroup) Started() bool { + r.start.Lock() + defer r.start.Unlock() + return r.started +} + +func (r *runnableGroup) StartAndWaitReady(ctx context.Context) error { + r.Start(ctx) + return r.WaitReady(ctx) +} + +func (r *runnableGroup) Start(ctx context.Context) { + r.startOnce.Do(func() { + r.start.Lock() + r.internalCtx = ctx + r.started = true + r.buffer.Range(func(key, _ interface{}) bool { + r.ch <- key.(*readyRunnable) + return true + }) + r.start.Unlock() + }) +} + +// WaitReady polls until the group is ready until the context is cancelled. +func (r *runnableGroup) WaitReady(ctx context.Context) error { + return wait.PollImmediateInfiniteWithContext(ctx, + 100*time.Millisecond, + func(_ context.Context) (bool, error) { + if !r.Started() { + return false, nil + } + ready, total := 0, 0 + r.buffer.Range(func(_, value interface{}) bool { + total++ + if rd, ok := value.(bool); ok && rd { + ready++ + } + return true + }) + return ready == total, nil + }, + ) +} + +// Add should be able to be called before and after Start, but not after shutdown. +// Add should return an error when called during shutdown. +func (r *runnableGroup) Add(rn Runnable, ready readyCheck) error { + r.stop.RLock() + if r.stopped { + r.stop.RUnlock() + return errRunnableGroupStopped + } + r.stop.RUnlock() + + // If we don't have a readiness check, always return true. + if ready == nil { + ready = func(_ context.Context) bool { return true } + } + + readyRunnable := &readyRunnable{ + Runnable: rn, + ReadyCheck: ready, + } + + // Store the runnable in the internal buffer. + r.buffer.Store(readyRunnable, false) + + // Handle start. + // If the overall runnable group isn't started yet + // we want to buffer the runnables and let Start() + // queue them up again later. + { + r.start.Lock() + if !r.started { + r.start.Unlock() + return nil + } + r.start.Unlock() + } + + // Enqueue the runnable. + r.ch <- readyRunnable + return nil +} + +// StopAndWait waits for all the runnables to finish before returning. +func (r *runnableGroup) StopAndWait() { + r.stopOnce.Do(func() { + r.stop.Lock() + // Store the stopped variable so we don't accept any new + // runnables for the time being. + r.stopped = true + r.stop.Unlock() + + // Wait for all the runnables to finish. + r.wg.Wait() + close(r.ch) + }) +} diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go new file mode 100644 index 0000000000..f93a9dda06 --- /dev/null +++ b/pkg/manager/runnable_group_test.go @@ -0,0 +1,172 @@ +package manager + +import ( + "context" + "sync/atomic" + "time" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/utils/pointer" +) + +var _ = Describe("runnableGroup", func() { + errCh := make(chan error) + + Describe("new", func() { + It("should be able to add new runnables before it starts", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + + Expect(rg.Started()).To(BeFalse()) + }) + + It("should be able to add new runnables before and after start", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + rg.Start(ctx) + Expect(rg.Started()).To(BeTrue()) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + Expect(rg.WaitReady(ctx)).To(Succeed()) + }) + + It("should be able to add new runnables before and after start concurrently", func() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + <-time.After(time.Duration(i) * 10 * time.Millisecond) + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), nil)).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Eventually(func() int { + i := 0 + rg.buffer.Range(func(key, value interface{}) bool { + i++ + return true + }) + return i + }).Should(BeNumerically("==", 20)) + }) + + It("should be able to close the group and wait for all runnables to finish", func() { + ctx, cancel := context.WithCancel(context.Background()) + + exited := pointer.Int64(0) + rg := newRunnableGroup(errCh) + for i := 0; i < 10; i++ { + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + defer atomic.AddInt64(exited, 1) + <-ctx.Done() + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return nil + }), nil)).To(Succeed()) + } + rg.Start(ctx) + Expect(rg.WaitReady(ctx)).To(Succeed()) + + // Cancel the context, asking the runnables to exit. + cancel() + + // Watch for stop sign. + stopped := make(chan struct{}) + go func() { + rg.StopAndWait() + close(stopped) + }() + <-stopped + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + return nil + }), nil)).ToNot(Succeed()) + + Expect(*exited).To(BeNumerically("==", 10)) + }) + + It("should be able to wait for all runnables to be ready at different intervals", func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return true + })).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).To(Succeed()) + Eventually(func() int { + i := 0 + rg.buffer.Range(func(key, value interface{}) bool { + i++ + return true + }) + return i + }).Should(BeNumerically("==", 20)) + }) + + It("should not turn ready if some readiness check fail", func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + rg := newRunnableGroup(errCh) + + go func() { + <-time.After(50 * time.Millisecond) + rg.Start(ctx) + }() + + for i := 0; i < 20; i++ { + go func(i int) { + defer GinkgoRecover() + + Expect(rg.Add(RunnableFunc(func(c context.Context) error { + <-ctx.Done() + return nil + }), func(_ context.Context) bool { + <-time.After(time.Duration(i) * 10 * time.Millisecond) + return i%2 == 0 // Return false readiness all uneven indexes. + })).To(Succeed()) + }(i) + } + Expect(rg.WaitReady(ctx)).ToNot(Succeed()) + }) + }) +})