diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index a8de1bbeba..b70a5139c2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "time" "github.com/go-logr/logr" "k8s.io/client-go/util/workqueue" @@ -47,6 +48,10 @@ type Options struct { // Log is the logger used for this controller and passed to each reconciliation // request via the context field. Log logr.Logger + + // CacheSyncTimeout refers to the time limit set to wait for syncing caches. + // Defaults to 10 seconds if not set. + CacheSyncTimeout time.Duration } // Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests @@ -112,6 +117,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller options.Log = mgr.GetLogger() } + if options.CacheSyncTimeout == 0 { + options.CacheSyncTimeout = 10 * time.Second + } + // Inject dependencies into Reconciler if err := mgr.SetFields(options.Reconciler); err != nil { return nil, err @@ -124,6 +133,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name) }, MaxConcurrentReconciles: options.MaxConcurrentReconciles, + CacheSyncTimeout: options.CacheSyncTimeout, SetFields: mgr.SetFields, Name: name, Log: options.Log.WithName("controller").WithName(name), diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index 83866606c2..0ce0966dc0 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -79,6 +79,10 @@ type Controller struct { // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context + // CacheSyncTimeout refers to the time limit set on waiting cache to sync + // Defaults to 10 seconds if not set. + CacheSyncTimeout time.Duration + // startWatches maintains a list of sources, handlers, and predicates to start when the controller is started. startWatches []watchDescription @@ -169,7 +173,10 @@ func (c *Controller) Start(ctx context.Context) error { if !ok { continue } - if err := syncingSource.WaitForSync(ctx); err != nil { + ct, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + if err := syncingSource.WaitForSync(ct); err != nil { + c.Log.Info("skipping it") // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 10c6e34d88..4dddcec725 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -122,6 +122,29 @@ var _ = Describe("controller", func() { close(done) }) + It("should wait for each informer to sync", func(done Done) { + // TODO(directxman12): this test doesn't do what it says it does + ctrl.CacheSyncTimeout = 1 * time.Nanosecond + c, err := cache.New(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + _, err = c.GetInformer(context.TODO(), &appsv1.Deployment{}) + Expect(err).NotTo(HaveOccurred()) + _, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{}) + Expect(err).NotTo(HaveOccurred()) + ctrl.startWatches = []watchDescription{{ + src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}), + }, { + src: source.NewKindWithCache(&appsv1.ReplicaSet{}, &informertest.FakeInformers{}), + }} + + // Use a cancelled context so Start doesn't block + // ctx, cancel := context.WithCancel(context.Background()) + // cancel() + Expect(ctrl.Start(context.Background())).To(HaveOccurred()) + + close(done) + }) + It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { pr1 := &predicate.Funcs{} pr2 := &predicate.Funcs{} diff --git a/pkg/source/source_test.go b/pkg/source/source_test.go index 1a0c6146d6..e32b3e51e6 100644 --- a/pkg/source/source_test.go +++ b/pkg/source/source_test.go @@ -211,7 +211,7 @@ var _ = Describe("Source", func() { instance := source.Kind{} f := false Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed()) - err := instance.WaitForSync(nil) + err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync")) @@ -247,7 +247,7 @@ var _ = Describe("Source", func() { It("should return an error if syncing fails", func(done Done) { f := false instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f}) - err := instance.WaitForSync(nil) + err := instance.WaitForSync(context.Background()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(Equal("cache did not sync"))