Skip to content

Commit

Permalink
✨ Manager: Start all caches before other Runnables
Browse files Browse the repository at this point in the history
This pr implements the second half of the ["Move cluster-specific code
out of the manager" proposal][0]: Making it start all caches before other
Runnables and not just the one of the main Cluster.

[0]: https://github.com/kubernetes-sigs/controller-runtime/blob/66537ca5b7439b06f2f3b08901640f934834c9a1/designs/move-cluster-specific-code-out-of-manager.md#L131-L132
  • Loading branch information
alvaroaleman committed Jan 11, 2021
1 parent 66537ca commit 8e59887
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 11 deletions.
23 changes: 23 additions & 0 deletions pkg/cache/informertest/fake_cache.go
Expand Up @@ -18,6 +18,7 @@ package informertest

import (
"context"
"sync"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -36,6 +37,17 @@ type FakeInformers struct {
Scheme *runtime.Scheme
Error error
Synced *bool
startedLock sync.Mutex
started bool
// WaitForCacheSyncCalled gets set to true after both Start() and WaitForCacheSync(
// were called.
WaitForCacheSyncCalled bool
}

func (c *FakeInformers) isStarted() bool {
c.startedLock.Lock()
defer c.startedLock.Unlock()
return c.started
}

// GetInformerForKind implements Informers
Expand Down Expand Up @@ -81,6 +93,12 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac

// WaitForCacheSync implements Informers
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
defer func() {
for !c.isStarted() {
continue
}
c.WaitForCacheSyncCalled = true
}()
if c.Synced == nil {
return true
}
Expand Down Expand Up @@ -122,6 +140,11 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec

// Start implements Informers
func (c *FakeInformers) Start(ctx context.Context) error {
defer func() {
c.startedLock.Lock()
defer c.startedLock.Unlock()
c.started = true
}()
return c.Error
}

Expand Down
24 changes: 16 additions & 8 deletions pkg/manager/internal.go
Expand Up @@ -125,7 +125,7 @@ type controllerManager struct {
// election was configured.
elected chan struct{}

startCache func(ctx context.Context) error
caches []hasCache

// port is the port that the webhook server serves at.
port int
Expand Down Expand Up @@ -173,6 +173,11 @@ type controllerManager struct {
internalProceduresStop chan struct{}
}

type hasCache interface {
Runnable
GetCache() cache.Cache
}

// Add sets dependencies on i, and adds it to the list of Runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
cm.mu.Lock()
Expand All @@ -192,6 +197,8 @@ func (cm *controllerManager) Add(r Runnable) error {
if leRunnable, ok := r.(LeaderElectionRunnable); ok && !leRunnable.NeedLeaderElection() {
shouldStart = cm.started
cm.nonLeaderElectionRunnables = append(cm.nonLeaderElectionRunnables, r)
} else if hasCache, ok := r.(hasCache); ok {
cm.caches = append(cm.caches, hasCache)
} else {
shouldStart = cm.startedLeader
cm.leaderElectionRunnables = append(cm.leaderElectionRunnables, r)
Expand Down Expand Up @@ -423,6 +430,9 @@ func (cm *controllerManager) serveHealthProbes() {
}

func (cm *controllerManager) Start(ctx context.Context) (err error) {
if err := cm.Add(cm.cluster); err != nil {
return fmt.Errorf("failed to add cluster to runnables: %w", err)
}
cm.internalCtx, cm.internalCancel = context.WithCancel(ctx)

// This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request
Expand Down Expand Up @@ -590,17 +600,15 @@ func (cm *controllerManager) waitForCache(ctx context.Context) {
return
}

// Start the Cache. Allow the function to start the cache to be mocked out for testing
if cm.startCache == nil {
cm.startCache = cm.cluster.Start
for _, cache := range cm.caches {
cm.startRunnable(cache)
}
cm.startRunnable(RunnableFunc(func(ctx context.Context) error {
return cm.startCache(ctx)
}))

// Wait for the caches to sync.
// TODO(community): Check the return value and write a test
cm.cluster.GetCache().WaitForCacheSync(ctx)
for _, cache := range cm.caches {
cache.GetCache().WaitForCacheSync(ctx)
}
// TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse
// cm.started as check if we already started the cache so it must always become true.
// Making sure that the cache doesn't get started twice is needed to not get a "close
Expand Down
97 changes: 94 additions & 3 deletions pkg/manager/manager_test.go
Expand Up @@ -43,6 +43,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/informertest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/cluster"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder"
Expand Down Expand Up @@ -612,9 +613,7 @@ var _ = Describe("manger.Manager", func() {
}
mgr, ok := m.(*controllerManager)
Expect(ok).To(BeTrue())
mgr.startCache = func(context.Context) error {
return fmt.Errorf("expected error")
}
mgr.caches = []hasCache{&cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -623,6 +622,84 @@ var _ = Describe("manger.Manager", func() {
close(done)
})

It("should start the cache before starting anything else", func(done Done) {
fakeCache := &informertest.FakeInformers{}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.WaitForCacheSyncCalled {
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
}

close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted
close(done)
})

It("should start additional clusters before anything else", func(done Done) {
fakeCache := &informertest.FakeInformers{}
options.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return fakeCache, nil
}
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
for _, cb := range callbacks {
cb(m)
}

additionalClusterCache := &informertest.FakeInformers{}
additionalCluster, err := cluster.New(cfg, func(o *cluster.Options) {
o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) {
return additionalClusterCache, nil
}
})
Expect(err).NotTo(HaveOccurred())
Expect(m.Add(additionalCluster)).NotTo(HaveOccurred())

runnableWasStarted := make(chan struct{})
Expect(m.Add(RunnableFunc(func(ctx context.Context) error {
defer GinkgoRecover()
if !fakeCache.WaitForCacheSyncCalled {
return errors.New("WaitForCacheSyncCalled wasn't called before Runnable got started")
}
if !additionalClusterCache.WaitForCacheSyncCalled {
return errors.New("the additional clusters WaitForCacheSync wasn't called before Runnable got started")
}

close(runnableWasStarted)
return nil
}))).To(Succeed())

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(m.Start(ctx)).ToNot(HaveOccurred())
}()

<-runnableWasStarted
close(done)
})

It("should return an error if any Components fail to Start", func(done Done) {
m, err := New(cfg, options)
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -1625,3 +1702,17 @@ func (f *fakeDeferredLoader) Complete() (v1alpha1.ControllerManagerConfiguration
func (f *fakeDeferredLoader) InjectScheme(scheme *runtime.Scheme) error {
return nil
}

var _ Runnable = &cacheProvider{}

type cacheProvider struct {
cache cache.Cache
}

func (c *cacheProvider) GetCache() cache.Cache {
return c.cache
}

func (c *cacheProvider) Start(ctx context.Context) error {
return c.cache.Start(ctx)
}

0 comments on commit 8e59887

Please sign in to comment.