From 7f57edcf611232b438f2c5700d225c338235f0a4 Mon Sep 17 00:00:00 2001 From: Kristin Davidson Date: Wed, 23 Mar 2022 18:02:34 -0500 Subject: [PATCH] Add BaseContext to manager Options --- pkg/manager/manager.go | 21 ++++++++++++++++++++- pkg/manager/runnable_group.go | 15 ++++++++------- pkg/manager/runnable_group_test.go | 20 ++++++++++---------- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index a198889e20..f6c4d6f144 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -239,6 +239,11 @@ type Options struct { // use the cache for reads and the client for writes. NewClient cluster.NewClientFunc + // BaseContext is the function that provides Context values to Runnables + // managed by the Manager. If a BaseContext function isn't provided, Runnables + // will receive a new Background Context instead. + BaseContext BaseContextFunc + // ClientDisableCacheFor tells the client that, if any cache is used, to bypass it // for the given objects. ClientDisableCacheFor []client.Object @@ -278,6 +283,10 @@ type Options struct { newHealthProbeListener func(addr string) (net.Listener, error) } +// BaseContextFunc is a function used to provide a base Context to Runnables +// managed by a Manager. +type BaseContextFunc func() context.Context + // Runnable allows a component to be started. // It's very important that Start blocks until // it's done running. @@ -377,7 +386,7 @@ func New(config *rest.Config, options Options) (Manager, error) { } errChan := make(chan error) - runnables := newRunnables(errChan) + runnables := newRunnables(options.BaseContext, errChan) return &controllerManager{ stopProcedureEngaged: pointer.Int64(0), @@ -529,6 +538,12 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) { return ln, nil } +// defaultBaseContext is used as the BaseContext value in Options if one +// has not already been set. +func defaultBaseContext() context.Context { + return context.Background() +} + // setOptionsDefaults set default values for Options fields. func setOptionsDefaults(options Options) Options { // Allow newResourceLock to be mocked @@ -592,5 +607,9 @@ func setOptionsDefaults(options Options) Options { options.Logger = log.Log } + if options.BaseContext == nil { + options.BaseContext = defaultBaseContext + } + return options } diff --git a/pkg/manager/runnable_group.go b/pkg/manager/runnable_group.go index ded8aed221..f7b91a209f 100644 --- a/pkg/manager/runnable_group.go +++ b/pkg/manager/runnable_group.go @@ -35,12 +35,12 @@ type runnables struct { } // newRunnables creates a new runnables object. -func newRunnables(errChan chan error) *runnables { +func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables { return &runnables{ - Webhooks: newRunnableGroup(errChan), - Caches: newRunnableGroup(errChan), - LeaderElection: newRunnableGroup(errChan), - Others: newRunnableGroup(errChan), + Webhooks: newRunnableGroup(baseContext, errChan), + Caches: newRunnableGroup(baseContext, errChan), + LeaderElection: newRunnableGroup(baseContext, errChan), + Others: newRunnableGroup(baseContext, errChan), } } @@ -100,14 +100,15 @@ type runnableGroup struct { wg *sync.WaitGroup } -func newRunnableGroup(errChan chan error) *runnableGroup { +func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup { r := &runnableGroup{ startReadyCh: make(chan *readyRunnable), errChan: errChan, ch: make(chan *readyRunnable), wg: new(sync.WaitGroup), } - r.ctx, r.cancel = context.WithCancel(context.Background()) + + r.ctx, r.cancel = context.WithCancel(baseContext()) return r } diff --git a/pkg/manager/runnable_group_test.go b/pkg/manager/runnable_group_test.go index 57e0ad6387..db23eeae95 100644 --- a/pkg/manager/runnable_group_test.go +++ b/pkg/manager/runnable_group_test.go @@ -18,19 +18,19 @@ var _ = Describe("runnables", func() { errCh := make(chan error) It("should be able to create a new runnables object", func() { - Expect(newRunnables(errCh)).ToNot(BeNil()) + Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil()) }) It("should add caches to the appropriate group", func() { cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}} - r := newRunnables(errCh) + r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(cache)).To(Succeed()) Expect(r.Caches.startQueue).To(HaveLen(1)) }) It("should add webhooks to the appropriate group", func() { webhook := &webhook.Server{} - r := newRunnables(errCh) + r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(webhook)).To(Succeed()) Expect(r.Webhooks.startQueue).To(HaveLen(1)) }) @@ -41,7 +41,7 @@ var _ = Describe("runnables", func() { return err }) - r := newRunnables(errCh) + r := newRunnables(defaultBaseContext, errCh) Expect(r.Add(runnable)).To(Succeed()) Expect(r.LeaderElection.startQueue).To(HaveLen(1)) }) @@ -53,7 +53,7 @@ var _ = Describe("runnableGroup", func() { It("should be able to add new runnables before it starts", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rg := newRunnableGroup(errCh) + rg := newRunnableGroup(defaultBaseContext, errCh) Expect(rg.Add(RunnableFunc(func(c context.Context) error { <-ctx.Done() return nil @@ -65,7 +65,7 @@ var _ = Describe("runnableGroup", func() { It("should be able to add new runnables before and after start", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rg := newRunnableGroup(errCh) + rg := newRunnableGroup(defaultBaseContext, errCh) Expect(rg.Add(RunnableFunc(func(c context.Context) error { <-ctx.Done() return nil @@ -81,7 +81,7 @@ var _ = Describe("runnableGroup", func() { It("should be able to add new runnables before and after start concurrently", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - rg := newRunnableGroup(errCh) + rg := newRunnableGroup(defaultBaseContext, errCh) go func() { defer GinkgoRecover() @@ -106,7 +106,7 @@ var _ = Describe("runnableGroup", func() { ctx, cancel := context.WithCancel(context.Background()) exited := pointer.Int64(0) - rg := newRunnableGroup(errCh) + rg := newRunnableGroup(defaultBaseContext, errCh) for i := 0; i < 10; i++ { Expect(rg.Add(RunnableFunc(func(c context.Context) error { defer atomic.AddInt64(exited, 1) @@ -131,7 +131,7 @@ var _ = Describe("runnableGroup", func() { It("should be able to wait for all runnables to be ready at different intervals", func() { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - rg := newRunnableGroup(errCh) + rg := newRunnableGroup(defaultBaseContext, errCh) go func() { defer GinkgoRecover() @@ -157,7 +157,7 @@ var _ = Describe("runnableGroup", func() { It("should not turn ready if some readiness check fail", func() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - rg := newRunnableGroup(errCh) + rg := newRunnableGroup(defaultBaseContext, errCh) go func() { defer GinkgoRecover()