diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d670723e66..85d8d6d54c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "time" "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" @@ -47,6 +48,10 @@ type Options struct { // Log is the logger used for this controller and passed to each reconciliation // request via the context field. Log logr.Logger + + // CacheSyncTimeout refers to the time limit set to wait for syncing caches. + // Defaults to 2 minutes if not set. + CacheSyncTimeout time.Duration } // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests @@ -104,6 +109,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.MaxConcurrentReconciles = 1 } + if options.CacheSyncTimeout == 0 { + options.CacheSyncTimeout = 2 * time.Minute + } + if options.RateLimiter == nil { options.RateLimiter = workqueue.DefaultControllerRateLimiter() } @@ -120,6 +129,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name) }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, SetFields: mgr.SetFields, Name: name, Log: options.Log.WithName("controller").WithName(name), diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 113b6ff91b..fdf495815f 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -79,6 +79,10 @@ type Controller struct { // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context + // CacheSyncTimeout refers to the time limit set on waiting cache to sync + // Defaults to 2 minutes if not set. + CacheSyncTimeout time.Duration + // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription @@ -142,6 +146,10 @@ func (c *Controller) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx + // use a context with timeout for launching sources and syncing caches. + ct, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed @@ -156,7 +164,7 @@ func (c *Controller) Start(ctx context.Context) error { // caches. for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + if err := watch.src.Start(ct, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } @@ -169,7 +177,7 @@ func (c *Controller) Start(ctx context.Context) error { if !ok { continue } - if err := syncingSource.WaitForSync(ctx); err != nil { + if err := syncingSource.WaitForSync(ct); err != nil { // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 10c6e34d88..819a67ddf5 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -122,6 +122,60 @@ var _ = Describe("controller", func() { close(done) }) + It("should error when cache sync timeout occurs", func(done Done) { + ctrl.CacheSyncTimeout = 10 * time.Nanosecond + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + _, err = c.GetInformer(context.TODO(), &appsv1.Deployment{}) + Expect(err).NotTo(HaveOccurred()) + sync := false + ctrl.startWatches = []watchDescription{{ + src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{ + Synced: &sync, + }), + }} + + err = ctrl.Start(context.TODO()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cache did not sync")) + + close(done) + }) + + It("should not error when cache sync time out is of reasonable value", func(done Done) { + ctrl.CacheSyncTimeout = 1 * time.Second + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + ctrl.startWatches = []watchDescription{{ + src: source.NewKindWithCache(&appsv1.Deployment{}, c), + }} + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(c.Start(context.TODO())).To(Succeed()) + }() + close(done) + }) + + It("should error when timeout is set to a very low value such that cache cannot sync", func(done Done) { + ctrl.CacheSyncTimeout = 1 * time.Nanosecond + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + ctrl.startWatches = []watchDescription{{ + src: source.NewKindWithCache(&appsv1.Deployment{}, c), + }} + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + err = ctrl.Start(context.TODO()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cache did not sync")) + }() + close(done) + }) + It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { pr1 := &predicate.Funcs{} pr2 := &predicate.Funcs{}