Skip to content

Commit

Permalink
Merge pull request #1846 from atlassian-forks/master
Browse files Browse the repository at this point in the history
✨ [WIP] Add BaseContext to manager Options for use with Runnables
  • Loading branch information
k8s-ci-robot committed Mar 30, 2022
2 parents c46b410 + 7f57edc commit b921952
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 18 deletions.
21 changes: 20 additions & 1 deletion pkg/manager/manager.go
Expand Up @@ -239,6 +239,11 @@ type Options struct {
// use the cache for reads and the client for writes.
NewClient cluster.NewClientFunc

// BaseContext is the function that provides Context values to Runnables
// managed by the Manager. If a BaseContext function isn't provided, Runnables
// will receive a new Background Context instead.
BaseContext BaseContextFunc

// ClientDisableCacheFor tells the client that, if any cache is used, to bypass it
// for the given objects.
ClientDisableCacheFor []client.Object
Expand Down Expand Up @@ -278,6 +283,10 @@ type Options struct {
newHealthProbeListener func(addr string) (net.Listener, error)
}

// BaseContextFunc is a function used to provide a base Context to Runnables
// managed by a Manager.
type BaseContextFunc func() context.Context

// Runnable allows a component to be started.
// It's very important that Start blocks until
// it's done running.
Expand Down Expand Up @@ -377,7 +386,7 @@ func New(config *rest.Config, options Options) (Manager, error) {
}

errChan := make(chan error)
runnables := newRunnables(errChan)
runnables := newRunnables(options.BaseContext, errChan)

return &controllerManager{
stopProcedureEngaged: pointer.Int64(0),
Expand Down Expand Up @@ -529,6 +538,12 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) {
return ln, nil
}

// defaultBaseContext is used as the BaseContext value in Options if one
// has not already been set.
func defaultBaseContext() context.Context {
return context.Background()
}

// setOptionsDefaults set default values for Options fields.
func setOptionsDefaults(options Options) Options {
// Allow newResourceLock to be mocked
Expand Down Expand Up @@ -592,5 +607,9 @@ func setOptionsDefaults(options Options) Options {
options.Logger = log.Log
}

if options.BaseContext == nil {
options.BaseContext = defaultBaseContext
}

return options
}
15 changes: 8 additions & 7 deletions pkg/manager/runnable_group.go
Expand Up @@ -35,12 +35,12 @@ type runnables struct {
}

// newRunnables creates a new runnables object.
func newRunnables(errChan chan error) *runnables {
func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
return &runnables{
Webhooks: newRunnableGroup(errChan),
Caches: newRunnableGroup(errChan),
LeaderElection: newRunnableGroup(errChan),
Others: newRunnableGroup(errChan),
Webhooks: newRunnableGroup(baseContext, errChan),
Caches: newRunnableGroup(baseContext, errChan),
LeaderElection: newRunnableGroup(baseContext, errChan),
Others: newRunnableGroup(baseContext, errChan),
}
}

Expand Down Expand Up @@ -100,14 +100,15 @@ type runnableGroup struct {
wg *sync.WaitGroup
}

func newRunnableGroup(errChan chan error) *runnableGroup {
func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
r := &runnableGroup{
startReadyCh: make(chan *readyRunnable),
errChan: errChan,
ch: make(chan *readyRunnable),
wg: new(sync.WaitGroup),
}
r.ctx, r.cancel = context.WithCancel(context.Background())

r.ctx, r.cancel = context.WithCancel(baseContext())
return r
}

Expand Down
20 changes: 10 additions & 10 deletions pkg/manager/runnable_group_test.go
Expand Up @@ -18,19 +18,19 @@ var _ = Describe("runnables", func() {
errCh := make(chan error)

It("should be able to create a new runnables object", func() {
Expect(newRunnables(errCh)).ToNot(BeNil())
Expect(newRunnables(defaultBaseContext, errCh)).ToNot(BeNil())
})

It("should add caches to the appropriate group", func() {
cache := &cacheProvider{cache: &informertest.FakeInformers{Error: fmt.Errorf("expected error")}}
r := newRunnables(errCh)
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(cache)).To(Succeed())
Expect(r.Caches.startQueue).To(HaveLen(1))
})

It("should add webhooks to the appropriate group", func() {
webhook := &webhook.Server{}
r := newRunnables(errCh)
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(webhook)).To(Succeed())
Expect(r.Webhooks.startQueue).To(HaveLen(1))
})
Expand All @@ -41,7 +41,7 @@ var _ = Describe("runnables", func() {
return err
})

r := newRunnables(errCh)
r := newRunnables(defaultBaseContext, errCh)
Expect(r.Add(runnable)).To(Succeed())
Expect(r.LeaderElection.startQueue).To(HaveLen(1))
})
Expand All @@ -53,7 +53,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to add new runnables before it starts", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
Expand All @@ -65,7 +65,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to add new runnables before and after start", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
<-ctx.Done()
return nil
Expand All @@ -81,7 +81,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to add new runnables before and after start concurrently", func() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
Expand All @@ -106,7 +106,7 @@ var _ = Describe("runnableGroup", func() {
ctx, cancel := context.WithCancel(context.Background())

exited := pointer.Int64(0)
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)
for i := 0; i < 10; i++ {
Expect(rg.Add(RunnableFunc(func(c context.Context) error {
defer atomic.AddInt64(exited, 1)
Expand All @@ -131,7 +131,7 @@ var _ = Describe("runnableGroup", func() {
It("should be able to wait for all runnables to be ready at different intervals", func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
Expand All @@ -157,7 +157,7 @@ var _ = Describe("runnableGroup", func() {
It("should not turn ready if some readiness check fail", func() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
rg := newRunnableGroup(errCh)
rg := newRunnableGroup(defaultBaseContext, errCh)

go func() {
defer GinkgoRecover()
Expand Down

0 comments on commit b921952

Please sign in to comment.