Skip to content

Commit

Permalink
Merge pull request #1327 from alvaroaleman/start-clusters
Browse files Browse the repository at this point in the history
✨ Manager: Start all caches before other Runnables
  • Loading branch information
k8s-ci-robot committed Jan 21, 2021
2 parents 73c52e8 + 70df377 commit ad68b43
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 11 deletions.
24 changes: 16 additions & 8 deletions pkg/manager/internal.go
Expand Up @@ -125,7 +125,7 @@ type controllerManager struct {
// election was configured.
elected chan struct{}

startCache func(ctx context.Context) error
caches []hasCache

// port is the port that the webhook server serves at.
port int
Expand Down Expand Up @@ -173,6 +173,11 @@ type controllerManager struct {
internalProceduresStop chan struct{}
}

type hasCache interface {
Runnable
GetCache() cache.Cache
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
Expand All @@ -192,6 +197,8 @@ func (cm *controllerManager) Add(r Runnable) error {
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
Expand Down Expand Up @@ -423,6 +430,9 @@ func (cm *controllerManager) serveHealthProbes() {
}

func (cm *controllerManager) Start(ctx context.Context) (err error) {
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
Expand Down Expand Up @@ -590,17 +600,15 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
return
}

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cluster.Start
for _, cache := range cm.caches {
cm.startRunnable(cache)
}
cm.startRunnable(RunnableFunc(func(ctx context.Context) error {
return cm.startCache(ctx)
}))

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cluster.GetCache().WaitForCacheSync(ctx)
for _, cache := range cm.caches {
cache.GetCache().WaitForCacheSync(ctx)
}
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
Expand Down
130 changes: 127 additions & 3 deletions pkg/manager/manager_test.go
Expand Up @@ -43,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
Expand Down Expand Up @@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() {
}
mgr, ok := m.(*controllerManager)
Expect(ok).To(BeTrue())
mgr.startCache = func(context.Context) error {
return fmt.Errorf("expected error")
}
mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -623,6 +622,82 @@ var _ = Describe("manger.Manager", func() {
close(done)
})

It("should start the cache before starting anything else", func(done Done) {
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.wasSynced {
return errors.New("runnable got started before cache was synced")
}
close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted
close(done)
})

It("should start additional clusters before anything else", func(done Done) {
fakeCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

additionalClusterCache := &startSignalingInformer{Cache: &informertest.FakeInformers{}}
additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return additionalClusterCache, nil
}
})
Expect(err).NotTo(HaveOccurred())
Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.wasSynced {
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
}
if !additionalClusterCache.wasSynced {
return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
}
close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted
close(done)
})

It("should return an error if any Components fail to Start", func(done Done) {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1625,3 +1700,52 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration
func (f *fakeDeferredLoader) InjectScheme(scheme *runtime.Scheme) error {
return nil
}

var _ Runnable = &cacheProvider{}

type cacheProvider struct {
cache cache.Cache
}

func (c *cacheProvider) GetCache() cache.Cache {
return c.cache
}

func (c *cacheProvider) Start(ctx context.Context) error {
return c.cache.Start(ctx)
}

type startSignalingInformer struct {
// The manager calls Start and WaitForCacheSync in
// parallel, so we have to protect wasStarted with a Mutex
// and block in WaitForCacheSync until it is true.
wasStartedLock sync.Mutex
wasStarted bool
// was synced will be true once Start was called and
// WaitForCacheSync returned, just like a real cache.
wasSynced bool
cache.Cache
}

func (c *startSignalingInformer) started() bool {
c.wasStartedLock.Lock()
defer c.wasStartedLock.Unlock()
return c.wasStarted
}

func (c *startSignalingInformer) Start(ctx context.Context) error {
c.wasStartedLock.Lock()
c.wasStarted = true
c.wasStartedLock.Unlock()
return c.Cache.Start(ctx)
}

func (c *startSignalingInformer) WaitForCacheSync(ctx context.Context) bool {
defer func() {
for !c.started() {
continue
}
c.wasSynced = true
}()
return c.Cache.WaitForCacheSync(ctx)
}

0 comments on commit ad68b43

Please sign in to comment.