Skip to content

Commit

Permalink
✨ Allow configuring cache sync timeouts
Browse files Browse the repository at this point in the history
This PR allows users to configure timeout for cache syncs
while starting the controller.
  • Loading branch information
varshaprasad96 committed Nov 9, 2020
1 parent af24f3b commit 3886222
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 3 deletions.
10 changes: 10 additions & 0 deletions pkg/controller/controller.go
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -47,6 +48,10 @@ type Options struct {
// Log is the logger used for this controller and passed to each reconciliation
// request via the context field.
Log logr.Logger

// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
// Defaults to 10 seconds if not set.
CacheSyncTimeout time.Duration
}

// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -112,6 +117,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.Log = mgr.GetLogger()
}

if options.CacheSyncTimeout == 0 {
options.CacheSyncTimeout = 10 * time.Second
}

// Inject dependencies into Reconciler
if err := mgr.SetFields(options.Reconciler); err != nil {
return nil, err
Expand All @@ -124,6 +133,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
Expand Down
9 changes: 8 additions & 1 deletion pkg/internal/controller/controller.go
Expand Up @@ -79,6 +79,10 @@ type Controller struct {
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context

// CacheSyncTimeout refers to the time limit set on waiting cache to sync
// Defaults to 10 seconds if not set.
CacheSyncTimeout time.Duration

// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription

Expand Down Expand Up @@ -169,7 +173,10 @@ func (c *Controller) Start(ctx context.Context) error {
if !ok {
continue
}
if err := syncingSource.WaitForSync(ctx); err != nil {
ct, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
if err := syncingSource.WaitForSync(ct); err != nil {
c.Log.Info("skipping it")
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
Expand Down
23 changes: 23 additions & 0 deletions pkg/internal/controller/controller_test.go
Expand Up @@ -122,6 +122,29 @@ var _ = Describe("controller", func() {
close(done)
})

It("should wait for each informer to sync", func(done Done) {
// TODO(directxman12): this test doesn't do what it says it does
ctrl.CacheSyncTimeout = 1 * time.Nanosecond
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.Deployment{})
Expect(err).NotTo(HaveOccurred())
_, err = c.GetInformer(context.TODO(), &appsv1.ReplicaSet{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, &informertest.FakeInformers{}),
}, {
src: source.NewKindWithCache(&appsv1.ReplicaSet{}, &informertest.FakeInformers{}),
}}

// Use a cancelled context so Start doesn't block
// ctx, cancel := context.WithCancel(context.Background())
// cancel()
Expect(ctrl.Start(context.Background())).To(HaveOccurred())

close(done)
})

It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
pr1 := &predicate.Funcs{}
pr2 := &predicate.Funcs{}
Expand Down
4 changes: 2 additions & 2 deletions pkg/source/source_test.go
Expand Up @@ -211,7 +211,7 @@ var _ = Describe("Source", func() {
instance := source.Kind{}
f := false
Expect(instance.InjectCache(&informertest.FakeInformers{Synced: &f})).To(Succeed())
err := instance.WaitForSync(nil)
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand Down Expand Up @@ -247,7 +247,7 @@ var _ = Describe("Source", func() {
It("should return an error if syncing fails", func(done Done) {
f := false
instance := source.NewKindWithCache(nil, &informertest.FakeInformers{Synced: &f})
err := instance.WaitForSync(nil)
err := instance.WaitForSync(context.Background())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(Equal("cache did not sync"))

Expand Down

0 comments on commit 3886222

Please sign in to comment.