Skip to content

Commit

Permalink
Merge pull request #1247 from varshaprasad96/add/cache-timeout
Browse files Browse the repository at this point in the history
✨ Allow configuring cache sync timeouts
  • Loading branch information
k8s-ci-robot committed Jan 8, 2021
2 parents 6bcef8a + bbfc18c commit 66537ca
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
10 changes: 10 additions & 0 deletions pkg/controller/controller.go
Expand Up @@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -47,6 +48,10 @@ type Options struct {
// Log is the logger used for this controller and passed to each reconciliation
// request via the context field.
Log logr.Logger

// CacheSyncTimeout refers to the time limit set to wait for syncing caches.
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration
}

// Controller implements a Kubernetes API. A Controller manages a work queue fed reconcile.Requests
Expand Down Expand Up @@ -104,6 +109,10 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
options.MaxConcurrentReconciles = 1
}

if options.CacheSyncTimeout == 0 {
options.CacheSyncTimeout = 2 * time.Minute
}

if options.RateLimiter == nil {
options.RateLimiter = workqueue.DefaultControllerRateLimiter()
}
Expand All @@ -120,6 +129,7 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
return workqueue.NewNamedRateLimitingQueue(options.RateLimiter, name)
},
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
CacheSyncTimeout: options.CacheSyncTimeout,
SetFields: mgr.SetFields,
Name: name,
Log: options.Log.WithName("controller").WithName(name),
Expand Down
20 changes: 16 additions & 4 deletions pkg/internal/controller/controller.go
Expand Up @@ -79,6 +79,10 @@ type Controller struct {
// undergo a major refactoring and redesign to allow for context to not be stored in a struct.
ctx context.Context

// CacheSyncTimeout refers to the time limit set on waiting for cache to sync
// Defaults to 2 minutes if not set.
CacheSyncTimeout time.Duration

// startWatches maintains a list of sources, handlers, and predicates to start when the controller is started.
startWatches []watchDescription

Expand Down Expand Up @@ -156,7 +160,10 @@ func (c *Controller) Start(ctx context.Context) error {
// caches.
for _, watch := range c.startWatches {
c.Log.Info("Starting EventSource", "source", watch.src)
if err := watch.src.Start(ctx, watch.handler, c.Queue, watch.predicates...); err != nil {

watchStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
if err := watch.src.Start(watchStartCtx, watch.handler, c.Queue, watch.predicates...); err != nil {
return err
}
}
Expand All @@ -169,9 +176,14 @@ func (c *Controller) Start(ctx context.Context) error {
if !ok {
continue
}
if err := syncingSource.WaitForSync(ctx); err != nil {
// This code is unreachable in case of kube watches since WaitForCacheSync will never return an error
// Leaving it here because that could happen in the future

// use a context with timeout for launching sources and syncing caches.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()

// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.Log.Error(err, "Could not wait for Cache to sync")
return err
Expand Down
59 changes: 59 additions & 0 deletions pkg/internal/controller/controller_test.go
Expand Up @@ -122,6 +122,53 @@ var _ = Describe("controller", func() {
close(done)
})

It("should error when cache sync timeout occurs", func(done Done) {
ctrl.CacheSyncTimeout = 10 * time.Nanosecond

c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

ctrl.startWatches = []watchDescription{{
src: source.NewKindWithCache(&appsv1.Deployment{}, c),
}}

err = ctrl.Start(context.TODO())
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cache did not sync"))

close(done)
})

It("should not error when cache sync timeout is of sufficiently high", func(done Done) {
ctrl.CacheSyncTimeout = 1 * time.Second

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

sourceSynced := make(chan struct{})
c, err := cache.New(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
ctrl.startWatches = []watchDescription{{
src: &singnallingSourceWrapper{
SyncingSource: source.NewKindWithCache(&appsv1.Deployment{}, c),
cacheSyncDone: sourceSynced,
},
}}

go func() {
defer GinkgoRecover()
Expect(c.Start(ctx)).To(Succeed())
}()

go func() {
defer GinkgoRecover()
Expect(ctrl.Start(ctx)).To(Succeed())
}()

<-sourceSynced
close(done)
}, 10.0)

It("should call Start on sources with the appropriate EventHandler, Queue, and Predicates", func() {
pr1 := &predicate.Funcs{}
pr2 := &predicate.Funcs{}
Expand Down Expand Up @@ -811,3 +858,15 @@ func (f *fakeReconciler) Reconcile(_ context.Context, r reconcile.Request) (reco
}
return res.Result, res.Err
}

type singnallingSourceWrapper struct {
cacheSyncDone chan struct{}
source.SyncingSource
}

func (s *singnallingSourceWrapper) WaitForSync(ctx context.Context) error {
defer func() {
close(s.cacheSyncDone)
}()
return s.SyncingSource.WaitForSync(ctx)
}

0 comments on commit 66537ca

Please sign in to comment.