Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ [WIP] Add BaseContext to manager Options for use with Runnables #1846

Merged
merged 1 commit into from Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 20 additions & 1 deletion pkg/manager/manager.go
Expand Up @@ -239,6 +239,11 @@ type Options struct {
// use the cache for reads and the client for writes.
NewClient cluster.NewClientFunc

// BaseContext is the function that provides Context values to Runnables
// managed by the Manager. If a BaseContext function isn't provided, Runnables
// will receive a new Background Context instead.
BaseContext BaseContextFunc

// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
// for the given objects.
ClientDisableCacheFor []client.Object
Expand Down Expand Up @@ -278,6 +283,10 @@ type Options struct {
newHealthProbeListener func(addr string) (net.Listener, error)
}

// BaseContextFunc is a function used to provide a base Context to Runnables
// managed by a Manager.
type BaseContextFunc func() context.Context

// Runnable allows a component to be started.
// It's very important that Start blocks until
// it's done running.
Expand Down Expand Up @@ -377,7 +386,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

errChan := make(chan error)
runnables := newRunnables(errChan)
runnables := newRunnables(options.BaseContext, errChan)

return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
Expand Down Expand Up @@ -529,6 +538,12 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) {
return ln, nil
}

// defaultBaseContext is used as the BaseContext value in Options if one
// has not already been set.
func defaultBaseContext() context.Context {
return context.Background()
}

// setOptionsDefaults set default values for Options fields.
func setOptionsDefaults(options Options) Options {
// Allow newResourceLock to be mocked
Expand Down Expand Up @@ -592,5 +607,9 @@ func setOptionsDefaults(options Options) Options {
options.Logger = log.Log
}

if options.BaseContext == nil {
options.BaseContext = defaultBaseContext
}

return options
}
15 changes: 8 additions & 7 deletions pkg/manager/runnable_group.go
Expand Up @@ -35,12 +35,12 @@ type runnables struct {
}

// newRunnables creates a new runnables object.
func newRunnables(errChan chan error) *runnables {
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
return &runnables{
Webhooks: newRunnableGroup(errChan),
Caches: newRunnableGroup(errChan),
LeaderElection: newRunnableGroup(errChan),
Others: newRunnableGroup(errChan),
Webhooks: newRunnableGroup(baseContext, errChan),
Caches: newRunnableGroup(baseContext, errChan),
LeaderElection: newRunnableGroup(baseContext, errChan),
Others: newRunnableGroup(baseContext, errChan),
}
}

Expand Down Expand Up @@ -100,14 +100,15 @@ type runnableGroup struct {
wg *sync.WaitGroup
}

func newRunnableGroup(errChan chan error) *runnableGroup {
func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
r := &runnableGroup{
startReadyCh: make(chan *readyRunnable),
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
}
r.ctx, r.cancel = context.WithCancel(context.Background())

r.ctx, r.cancel = context.WithCancel(baseContext())
return r
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/manager/runnable_group_test.go
Expand Up @@ -18,19 +18,19 @@ var _ = Describe("runnables", func() {
errCh := make(chan error)

It("should be able to create a new runnables object", func() {
Expect(newRunnables(errCh)).ToNot(BeNil())
Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil())
})

It("should add caches to the appropriate group", func() {
cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}
r := newRunnables(errCh)
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(cache)).To(Succeed())
Expect(r.Caches.startQueue).To(HaveLen(1))
})

It("should add webhooks to the appropriate group", func() {
webhook := &webhook.Server{}
r := newRunnables(errCh)
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(webhook)).To(Succeed())
Expect(r.Webhooks.startQueue).To(HaveLen(1))
})
Expand All @@ -41,7 +41,7 @@ var _ = Describe("runnables", func() {
return err
})

r := newRunnables(errCh)
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(runnable)).To(Succeed())
Expect(r.LeaderElection.startQueue).To(HaveLen(1))
})
Expand All @@ -53,7 +53,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to add new runnables before it starts", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
Expand All @@ -65,7 +65,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to add new runnables before and after start", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
Expand All @@ -81,7 +81,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to add new runnables before and after start concurrently", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
Expand All @@ -106,7 +106,7 @@ var _ = Describe("runnableGroup", func() {
ctx, cancel := context.WithCancel(context.Background())

exited := pointer.Int64(0)
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)
for i := 0; i < 10; i++ {
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
defer atomic.AddInt64(exited, 1)
Expand All @@ -131,7 +131,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to wait for all runnables to be ready at different intervals", func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
Expand All @@ -157,7 +157,7 @@ var _ = Describe("runnableGroup", func() {
It("should not turn ready if some readiness check fail", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
Expand Down