diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 78202943f1..3937f106dd 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -125,7 +125,7 @@ type controllerManager struct { // election was configured. elected chan struct{} - startCache func(ctx context.Context) error + caches []hasCache // port is the port that the webhook server serves at. port int @@ -173,6 +173,11 @@ type controllerManager struct { internalProceduresStop chan struct{} } +type hasCache interface { + Runnable + GetCache() cache.Cache +} + // Add sets dependencies on i, and adds it to the list of Runnables to start. func (cm *controllerManager) Add(r Runnable) error { cm.mu.Lock() @@ -192,6 +197,8 @@ func (cm *controllerManager) Add(r Runnable) error { if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() { shouldStart = cm.started cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r) + } else if hasCache, ok := r.(hasCache); ok { + cm.caches = append(cm.caches, hasCache) } else { shouldStart = cm.startedLeader cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r) @@ -423,6 +430,9 @@ func (cm *controllerManager) serveHealthProbes() { } func (cm *controllerManager) Start(ctx context.Context) (err error) { + if err := cm.Add(cm.cluster); err != nil { + return fmt.Errorf("failed to add cluster to runnables: %w", err) + } cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request @@ -590,17 +600,15 @@ func (cm *controllerManager) waitForCache(ctx context.Context) { return } - // Start the Cache. Allow the function to start the cache to be mocked out for testing - if cm.startCache == nil { - cm.startCache = cm.cluster.Start + for _, cache := range cm.caches { + cm.startRunnable(cache) } - cm.startRunnable(RunnableFunc(func(ctx context.Context) error { - return cm.startCache(ctx) - })) // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cluster.GetCache().WaitForCacheSync(ctx) + for _, cache := range cm.caches { + cache.GetCache().WaitForCacheSync(ctx) + } // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse // cm.started as check if we already started the cache so it must always become true. // Making sure that the cache doesn't get started twice is needed to not get a "close diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index be5c5c3c0a..8161ad2a70 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -43,6 +43,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" @@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() { } mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - mgr.startCache = func(context.Context) error { - return fmt.Errorf("expected error") - } + mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}} ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -623,6 +622,82 @@ var _ = Describe("manger.Manager", func() { close(done) }) + It("should start the cache before starting anything else", func(done Done) { + fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}} + options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { + return fakeCache, nil + } + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + runnableWasStarted := make(chan struct{}) + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + defer GinkgoRecover() + if !fakeCache.wasSynced { + return errors.New("runnable got started before cache was synced") + } + close(runnableWasStarted) + return nil + }))).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).ToNot(HaveOccurred()) + }() + + <-runnableWasStarted + close(done) + }) + + It("should start additional clusters before anything else", func(done Done) { + fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}} + options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { + return fakeCache, nil + } + m, err := New(cfg, options) + Expect(err).NotTo(HaveOccurred()) + for _, cb := range callbacks { + cb(m) + } + + additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}} + additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) { + o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { + return additionalClusterCache, nil + } + }) + Expect(err).NotTo(HaveOccurred()) + Expect(m.Add(additionalCluster)).NotTo(HaveOccurred()) + + runnableWasStarted := make(chan struct{}) + Expect(m.Add(RunnableFunc(func(ctx context.Context) error { + defer GinkgoRecover() + if !fakeCache.wasSynced { + return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started") + } + if !additionalClusterCache.wasSynced { + return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started") + } + close(runnableWasStarted) + return nil + }))).To(Succeed()) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + defer GinkgoRecover() + Expect(m.Start(ctx)).ToNot(HaveOccurred()) + }() + + <-runnableWasStarted + close(done) + }) + It("should return an error if any Components fail to Start", func(done Done) { m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) @@ -1625,3 +1700,52 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration func (f *fakeDeferredLoader) InjectScheme(scheme *runtime.Scheme) error { return nil } + +var _ Runnable = &cacheProvider{} + +type cacheProvider struct { + cache cache.Cache +} + +func (c *cacheProvider) GetCache() cache.Cache { + return c.cache +} + +func (c *cacheProvider) Start(ctx context.Context) error { + return c.cache.Start(ctx) +} + +type startSignalingInformer struct { + // The manager calls Start and WaitForCacheSync in + // parallel, so we have to protect wasStarted with a Mutex + // and block in WaitForCacheSync until it is true. + wasStartedLock sync.Mutex + wasStarted bool + // was synced will be true once Start was called and + // WaitForCacheSync returned, just like a real cache. + wasSynced bool + cache.Cache +} + +func (c *startSignalingInformer) started() bool { + c.wasStartedLock.Lock() + defer c.wasStartedLock.Unlock() + return c.wasStarted +} + +func (c *startSignalingInformer) Start(ctx context.Context) error { + c.wasStartedLock.Lock() + c.wasStarted = true + c.wasStartedLock.Unlock() + return c.Cache.Start(ctx) +} + +func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool { + defer func() { + for !c.started() { + continue + } + c.wasSynced = true + }() + return c.Cache.WaitForCacheSync(ctx) +}