diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index d670723e66..85d8d6d54c 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "time" "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" @@ -47,6 +48,10 @@ type Options struct { // Log is the logger used for this controller and passed to each reconciliation // request via the context field. Log logr.Logger + + // CacheSyncTimeout refers to the time limit set to wait for syncing caches. + // Defaults to 2 minutes if not set. + CacheSyncTimeout time.Duration } // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests @@ -104,6 +109,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.MaxConcurrentReconciles = 1 } + if options.CacheSyncTimeout == 0 { + options.CacheSyncTimeout = 2 * time.Minute + } + if options.RateLimiter == nil { options.RateLimiter = workqueue.DefaultControllerRateLimiter() } @@ -120,6 +129,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name) }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, SetFields: mgr.SetFields, Name: name, Log: options.Log.WithName("controller").WithName(name), diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 113b6ff91b..0a92b20d4d 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -79,6 +79,10 @@ type Controller struct { // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync + // Defaults to 2 minutes if not set. + CacheSyncTimeout time.Duration + // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription @@ -156,7 +160,10 @@ func (c *Controller) Start(ctx context.Context) error { // caches. for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil { + + watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } @@ -169,9 +176,14 @@ func (c *Controller) Start(ctx context.Context) error { if !ok { continue } - if err := syncingSource.WaitForSync(ctx); err != nil { - // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error - // Leaving it here because that could happen in the future + + // use a context with timeout for launching sources and syncing caches. + sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + + // WaitForSync waits for a definitive timeout, and returns if there + // is an error or a timeout + if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.Log.Error(err, "Could not wait for Cache to sync") return err diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 10c6e34d88..a5b6f7d852 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -122,6 +122,53 @@ var _ = Describe("controller", func() { close(done) }) + It("should error when cache sync timeout occurs", func(done Done) { + ctrl.CacheSyncTimeout = 10 * time.Nanosecond + + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + ctrl.startWatches = []watchDescription{{ + src: source.NewKindWithCache(&appsv1.Deployment{}, c), + }} + + err = ctrl.Start(context.TODO()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("cache did not sync")) + + close(done) + }) + + It("should not error when cache sync timeout is of sufficiently high", func(done Done) { + ctrl.CacheSyncTimeout = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sourceSynced := make(chan struct{}) + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + ctrl.startWatches = []watchDescription{{ + src: &singnallingSourceWrapper{ + SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), + cacheSyncDone: sourceSynced, + }, + }} + + go func() { + defer GinkgoRecover() + Expect(c.Start(ctx)).To(Succeed()) + }() + + go func() { + defer GinkgoRecover() + Expect(ctrl.Start(ctx)).To(Succeed()) + }() + + <-sourceSynced + close(done) + }, 10.0) + It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { pr1 := &predicate.Funcs{} pr2 := &predicate.Funcs{} @@ -811,3 +858,15 @@ func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reco } return res.Result, res.Err } + +type singnallingSourceWrapper struct { + cacheSyncDone chan struct{} + source.SyncingSource +} + +func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error { + defer func() { + close(s.cacheSyncDone) + }() + return s.SyncingSource.WaitForSync(ctx) +}