diff --git a/pkg/internal/controller/controller.go b/pkg/internal/controller/controller.go index b8ce007041..0a92b20d4d 100644 --- a/pkg/internal/controller/controller.go +++ b/pkg/internal/controller/controller.go @@ -79,7 +79,7 @@ type Controller struct { // undergo a major refactoring and redesign to allow for context to not be stored in a struct. ctx context.Context - // CacheSyncTimeout refers to the time limit set on waiting cache to sync + // CacheSyncTimeout refers to the time limit set on waiting for cache to sync // Defaults to 2 minutes if not set. CacheSyncTimeout time.Duration @@ -146,10 +146,6 @@ func (c *Controller) Start(ctx context.Context) error { // Set the internal context. c.ctx = ctx - // use a context with timeout for launching sources and syncing caches. - sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) - defer cancel() - c.Queue = c.MakeQueue() defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed @@ -164,7 +160,10 @@ func (c *Controller) Start(ctx context.Context) error { // caches. for _, watch := range c.startWatches { c.Log.Info("Starting EventSource", "source", watch.src) - if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { + + watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil { return err } } @@ -177,9 +176,14 @@ func (c *Controller) Start(ctx context.Context) error { if !ok { continue } + + // use a context with timeout for launching sources and syncing caches. + sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout) + defer cancel() + + // WaitForSync waits for a definitive timeout, and returns if there + // is an error or a timeout if err := syncingSource.WaitForSync(sourceStartCtx); err != nil { - // This code is unreachable in case of kube watches since WaitForCacheSync will never return an error - // Leaving it here because that could happen in the future err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err) c.Log.Error(err, "Could not wait for Cache to sync") return err diff --git a/pkg/internal/controller/controller_test.go b/pkg/internal/controller/controller_test.go index 819a67ddf5..a5b6f7d852 100644 --- a/pkg/internal/controller/controller_test.go +++ b/pkg/internal/controller/controller_test.go @@ -124,15 +124,12 @@ var _ = Describe("controller", func() { It("should error when cache sync timeout occurs", func(done Done) { ctrl.CacheSyncTimeout = 10 * time.Nanosecond + c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) - _, err = c.GetInformer(context.TODO(), &appsv1.Deployment{}) - Expect(err).NotTo(HaveOccurred()) - sync := false + ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{ - Synced: &sync, - }), + src: source.NewKindWithCache(&appsv1.Deployment{}, c), }} err = ctrl.Start(context.TODO()) @@ -142,39 +139,35 @@ var _ = Describe("controller", func() { close(done) }) - It("should not error when cache sync time out is of reasonable value", func(done Done) { + It("should not error when cache sync timeout is of sufficiently high", func(done Done) { ctrl.CacheSyncTimeout = 1 * time.Second + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sourceSynced := make(chan struct{}) c, err := cache.New(cfg, cache.Options{}) Expect(err).NotTo(HaveOccurred()) ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, c), + src: &singnallingSourceWrapper{ + SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c), + cacheSyncDone: sourceSynced, + }, }} - By("running the cache and waiting for it to sync") go func() { defer GinkgoRecover() - Expect(c.Start(context.TODO())).To(Succeed()) + Expect(c.Start(ctx)).To(Succeed()) }() - close(done) - }) - It("should error when timeout is set to a very low value such that cache cannot sync", func(done Done) { - ctrl.CacheSyncTimeout = 1 * time.Nanosecond - c, err := cache.New(cfg, cache.Options{}) - Expect(err).NotTo(HaveOccurred()) - ctrl.startWatches = []watchDescription{{ - src: source.NewKindWithCache(&appsv1.Deployment{}, c), - }} - - By("running the cache and waiting for it to sync") go func() { defer GinkgoRecover() - err = ctrl.Start(context.TODO()) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("cache did not sync")) + Expect(ctrl.Start(ctx)).To(Succeed()) }() + + <-sourceSynced close(done) - }) + }, 10.0) It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() { pr1 := &predicate.Funcs{} @@ -865,3 +858,15 @@ func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reco } return res.Result, res.Err } + +type singnallingSourceWrapper struct { + cacheSyncDone chan struct{} + source.SyncingSource +} + +func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error { + defer func() { + close(s.cacheSyncDone) + }() + return s.SyncingSource.WaitForSync(ctx) +}