Skip to content

Commit

Permalink
Add test cases to check cache sync timeouts
Browse files Browse the repository at this point in the history
Co-authored-by: Alvaro Aleman <alvaroaleman@users.noreply.github.com>
  • Loading branch information
varshaprasad96 and alvaroaleman committed Jan 5, 2021
1 parent cdc2e0e commit bbfc18c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 33 deletions.
20 changes: 12 additions & 8 deletions pkg/internal/controller/controller.go
Expand Up @@ -79,7 +79,7 @@ type Controller struct {
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context

// CacheSyncTimeout refers to the time limit set on waiting cache to sync
// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration

Expand Down Expand Up @@ -146,10 +146,6 @@ func (c *Controller) Start(ctx context.Context) error {
// Set the internal context.
c.ctx = ctx

// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

c.Queue = c.MakeQueue()
defer c.Queue.ShutDown() // needs to be outside the iife so that we shutdown after the stop channel is closed

Expand All @@ -164,7 +160,10 @@ func (c *Controller) Start(ctx context.Context) error {
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(sourceStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {

watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
Expand All @@ -177,9 +176,14 @@ func (c *Controller) Start(ctx context.Context) error {
if !ok {
continue
}

// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
return err
Expand Down
55 changes: 30 additions & 25 deletions pkg/internal/controller/controller_test.go
Expand Up @@ -124,15 +124,12 @@ var _ = Describe("controller", func() {

It("should error when cache sync timeout occurs", func(done Done) {
ctrl.CacheSyncTimeout = 10 * time.Nanosecond

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
Expect(err).NotTo(HaveOccurred())
sync := false

ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{
Synced: &sync,
}),
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
}}

err = ctrl.Start(context.TODO())
Expand All @@ -142,39 +139,35 @@ var _ = Describe("controller", func() {
close(done)
})

It("should not error when cache sync time out is of reasonable value", func(done Done) {
It("should not error when cache sync timeout is of sufficiently high", func(done Done) {
ctrl.CacheSyncTimeout = 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sourceSynced := make(chan struct{})
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
src: &singnallingSourceWrapper{
SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c),
cacheSyncDone: sourceSynced,
},
}}

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(c.Start(context.TODO())).To(Succeed())
Expect(c.Start(ctx)).To(Succeed())
}()
close(done)
})

It("should error when timeout is set to a very low value such that cache cannot sync", func(done Done) {
ctrl.CacheSyncTimeout = 1 * time.Nanosecond
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
}}

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cache did not sync"))
Expect(ctrl.Start(ctx)).To(Succeed())
}()

<-sourceSynced
close(done)
})
}, 10.0)

It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
pr1 := &predicate.Funcs{}
Expand Down Expand Up @@ -865,3 +858,15 @@ func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reco
}
return res.Result, res.Err
}

type singnallingSourceWrapper struct {
cacheSyncDone chan struct{}
source.SyncingSource
}

func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
defer func() {
close(s.cacheSyncDone)
}()
return s.SyncingSource.WaitForSync(ctx)
}

0 comments on commit bbfc18c

Please sign in to comment.