Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Manager: Start all caches before other Runnables #1327

Merged
merged 1 commit into from Jan 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 16 additions & 8 deletions pkg/manager/internal.go
Expand Up @@ -125,7 +125,7 @@ type controllerManager struct {
// election was configured.
elected chan struct{}

startCache func(ctx context.Context) error
caches []hasCache

// port is the port that the webhook server serves at.
port int
Expand Down Expand Up @@ -173,6 +173,11 @@ type controllerManager struct {
internalProceduresStop chan struct{}
}

type hasCache interface {
Runnable
GetCache() cache.Cache
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
Expand All @@ -192,6 +197,8 @@ func (cm *controllerManager) Add(r Runnable) error {
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
Expand Down Expand Up @@ -423,6 +430,9 @@ func (cm *controllerManager) serveHealthProbes() {
}

func (cm *controllerManager) Start(ctx context.Context) (err error) {
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
Expand Down Expand Up @@ -590,17 +600,15 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
return
}

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cluster.Start
for _, cache := range cm.caches {
cm.startRunnable(cache)
}
cm.startRunnable(RunnableFunc(func(ctx context.Context) error {
return cm.startCache(ctx)
}))

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cluster.GetCache().WaitForCacheSync(ctx)
for _, cache := range cm.caches {
cache.GetCache().WaitForCacheSync(ctx)
}
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
Expand Down
130 changes: 127 additions & 3 deletions pkg/manager/manager_test.go
Expand Up @@ -43,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
Expand Down Expand Up @@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() {
}
mgr, ok := m.(*controllerManager)
Expect(ok).To(BeTrue())
mgr.startCache = func(context.Context) error {
return fmt.Errorf("expected error")
}
mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -623,6 +622,82 @@ var _ = Describe("manger.Manager", func() {
close(done)
})

It("should start the cache before starting anything else", func(done Done) {
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.wasSynced {
return errors.New("runnable got started before cache was synced")
}
close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted
close(done)
})

It("should start additional clusters before anything else", func(done Done) {
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return additionalClusterCache, nil
}
})
Expect(err).NotTo(HaveOccurred())
Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.wasSynced {
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
}
if !additionalClusterCache.wasSynced {
return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
}
close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted
close(done)
})

It("should return an error if any Components fail to Start", func(done Done) {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1625,3 +1700,52 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration
func (f *fakeDeferredLoader) InjectScheme(scheme *runtime.Scheme) error {
return nil
}

var _ Runnable = &cacheProvider{}

type cacheProvider struct {
cache cache.Cache
}

func (c *cacheProvider) GetCache() cache.Cache {
return c.cache
}

func (c *cacheProvider) Start(ctx context.Context) error {
return c.cache.Start(ctx)
}

type startSignalingInformer struct {
// The manager calls Start and WaitForCacheSync in
// parallel, so we have to protect wasStarted with a Mutex
// and block in WaitForCacheSync until it is true.
wasStartedLock sync.Mutex
wasStarted bool
// was synced will be true once Start was called and
// WaitForCacheSync returned, just like a real cache.
wasSynced bool
cache.Cache
}

func (c *startSignalingInformer) started() bool {
c.wasStartedLock.Lock()
defer c.wasStartedLock.Unlock()
return c.wasStarted
}

func (c *startSignalingInformer) Start(ctx context.Context) error {
c.wasStartedLock.Lock()
c.wasStarted = true
c.wasStartedLock.Unlock()
return c.Cache.Start(ctx)
}

func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
defer func() {
for !c.started() {
continue
}
c.wasSynced = true
}()
return c.Cache.WaitForCacheSync(ctx)
}