Skip to content

Commit

Permalink
⚠️ Propagate context on Manager.Start(...)
Browse files Browse the repository at this point in the history
This change reshuffles how the manager accepts and operates on
a context. With this change, the user experience is greatly improved,
users can now use `ctrl.SetupSignalHandler()` to create a context, enrich
it if they want to, and pass it to `manager.Start`.

In addition, this PR changes how the context and stop channel are
handled internally to ensure proper cancellation.

Signed-off-by: Vince Prignano <vincepri@vmware.com>
  • Loading branch information
vincepri committed Oct 2, 2020
1 parent af2f5b1 commit 0f46012
Show file tree
Hide file tree
Showing 22 changed files with 224 additions and 220 deletions.
2 changes: 1 addition & 1 deletion designs/move-cluster-specific-code-out-of-manager.md
Expand Up @@ -203,7 +203,7 @@ func NewSecretMirrorReconciler(mgr manager.Manager, mirrorCluster cluster.Cluste

func main(){

mgr, err := manager.New(context.Background(), cfg1, manager.Options{})
mgr, err := manager.New( cfg1, manager.Options{})
if err != nil {
panic(err)
}
Expand Down
3 changes: 1 addition & 2 deletions example_test.go
Expand Up @@ -38,7 +38,7 @@ import (
func Example() {
var log = controllers.Log.WithName("builder-examples")

manager, err := controllers.NewManager(context.Background(), controllers.GetConfigOrDie(), controllers.Options{})
manager, err := controllers.NewManager(controllers.GetConfigOrDie(), controllers.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
Expand Down Expand Up @@ -79,7 +79,6 @@ func Example_updateLeaderElectionDurations() {
renewDeadline := 80 * time.Second
retryPeriod := 20 * time.Second
manager, err := controllers.NewManager(
context.Background(),
controllers.GetConfigOrDie(),
controllers.Options{
LeaseDuration: &leaseDuration,
Expand Down
3 changes: 1 addition & 2 deletions examples/builtins/main.go
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package main

import (
"context"
"os"

appsv1 "k8s.io/api/apps/v1"
Expand All @@ -43,7 +42,7 @@ func main() {

// Setup a Manager
entryLog.Info("setting up manager")
mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{})
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
entryLog.Error(err, "unable to set up overall controller manager")
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion examples/crd/main.go
Expand Up @@ -104,7 +104,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
func main() {
ctrl.SetLogger(zap.New())

mgr, err := ctrl.NewManager(context.Background(), ctrl.GetConfigOrDie(), ctrl.Options{})
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
Expand Down
6 changes: 4 additions & 2 deletions examples/scratch-env/main.go
Expand Up @@ -3,11 +3,12 @@ package main
import (
goflag "flag"
"fmt"
flag "github.com/spf13/pflag"
"io"
"io/ioutil"
"os"

flag "github.com/spf13/pflag"

"k8s.io/client-go/tools/clientcmd"
kcapi "k8s.io/client-go/tools/clientcmd/api"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -102,7 +103,8 @@ func runMain() int {
log.Info("Wrote kubeconfig")
}

<-ctrl.SetupSignalHandler()
ctx := ctrl.SetupSignalHandler()
<-ctx.Done()

log.Info("Shutting down apiserver & etcd")
err = env.Stop()
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -24,6 +24,6 @@ require (
k8s.io/apiextensions-apiserver v0.19.2
k8s.io/apimachinery v0.19.2
k8s.io/client-go v0.19.2
k8s.io/utils v0.0.0-20200729134348-d5654de09c73
k8s.io/utils v0.0.0-20200912215256-4140de9c8800
sigs.k8s.io/yaml v1.2.0
)
2 changes: 2 additions & 0 deletions go.sum
Expand Up @@ -632,6 +632,8 @@ k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6 h1:+WnxoVtG8TMiudHBSEtrVL
k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K8Hf8whTseBgJcg=
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20200912215256-4140de9c8800 h1:9ZNvfPvVIEsp/T1ez4GQuzCcCTEQWhovSofhqR73A6g=
k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9/go.mod h1:dzAXnQbTRyDlZPJX2SUPEqvnB+j7AJjtlox7PEwigU0=
sigs.k8s.io/structured-merge-diff/v4 v4.0.1 h1:YXTMot5Qz/X1iBRJhAt+vI+HVttY0WkSqqhKxQ0xVbA=
Expand Down
51 changes: 26 additions & 25 deletions pkg/builder/controller_test.go
Expand Up @@ -61,25 +61,18 @@ func (l *testLogger) WithValues(_ ...interface{}) logr.Logger {
}

var _ = Describe("application", func() {
var stop chan struct{}

BeforeEach(func() {
stop = make(chan struct{})
newController = controller.New
})

AfterEach(func() {
close(stop)
})

noop := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) {
return reconcile.Result{}, nil
})

Describe("New", func() {
It("should return success if given valid objects", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -92,7 +85,7 @@ var _ = Describe("application", func() {

It("should return error if given two apiType objects in For function", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -106,7 +99,7 @@ var _ = Describe("application", func() {

It("should return an error if For function is not called", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -118,7 +111,7 @@ var _ = Describe("application", func() {

It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("creating a controller with a bad For type")
Expand All @@ -141,7 +134,7 @@ var _ = Describe("application", func() {
}

By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -164,7 +157,7 @@ var _ = Describe("application", func() {
}

By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -186,7 +179,7 @@ var _ = Describe("application", func() {
}

By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -209,7 +202,7 @@ var _ = Describe("application", func() {
}

By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -230,7 +223,7 @@ var _ = Describe("application", func() {
}

By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

instance, err := ControllerManagedBy(m).
Expand All @@ -244,7 +237,7 @@ var _ = Describe("application", func() {

It("should allow multiple controllers for the same kind", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("registering the type in the Scheme")
Expand Down Expand Up @@ -273,33 +266,39 @@ var _ = Describe("application", func() {

Describe("Start with ControllerManagedBy", func() {
It("should Reconcile Owns objects", func(done Done) {
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

bldr := ControllerManagedBy(m).
For(&appsv1.Deployment{}).
Owns(&appsv1.ReplicaSet{})
doReconcileTest("3", stop, bldr, m, false)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "3", bldr, m, false)
close(done)
}, 10)

It("should Reconcile Watches objects", func(done Done) {
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

bldr := ControllerManagedBy(m).
For(&appsv1.Deployment{}).
Watches( // Equivalent of Owns
&source.Kind{Type: &appsv1.ReplicaSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
doReconcileTest("4", stop, bldr, m, true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "4", bldr, m, true)
close(done)
}, 10)
})

Describe("Set custom predicates", func() {
It("should execute registered predicates only for assigned kind", func(done Done) {
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

var (
Expand Down Expand Up @@ -347,7 +346,9 @@ var _ = Describe("application", func() {
Owns(&appsv1.ReplicaSet{}, WithPredicates(replicaSetPrct)).
WithEventFilter(allPrct)

doReconcileTest("5", stop, bldr, m, true)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "5", bldr, m, true)

Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
Expand All @@ -359,7 +360,7 @@ var _ = Describe("application", func() {

})

func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) {
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix

Expand Down Expand Up @@ -389,7 +390,7 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr
By("Starting the application")
go func() {
defer GinkgoRecover()
Expect(mgr.Start(stop)).NotTo(HaveOccurred())
Expect(mgr.Start(ctx)).NotTo(HaveOccurred())
By("Stopping the application")
}()

Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/example_test.go
Expand Up @@ -45,7 +45,7 @@ func ExampleBuilder() {

var log = logf.Log.WithName("builder-examples")

mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{})
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
Expand Down
3 changes: 1 addition & 2 deletions pkg/builder/example_webhook_test.go
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package builder_test

import (
"context"
"os"

"sigs.k8s.io/controller-runtime/pkg/builder"
Expand All @@ -40,7 +39,7 @@ var _ admission.Validator = &examplegroup.ChaosPod{}
func ExampleWebhookBuilder() {
var log = logf.Log.WithName("webhookbuilder-example")

mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{})
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
log.Error(err, "could not create manager")
os.Exit(1)
Expand Down
8 changes: 4 additions & 4 deletions pkg/builder/webhook_test.go
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("webhook", func() {
Describe("New", func() {
It("should scaffold a defaulting webhook if the type implements the Defaulter interface", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("registering the type in the Scheme")
Expand Down Expand Up @@ -123,7 +123,7 @@ var _ = Describe("webhook", func() {

It("should scaffold a validating webhook if the type implements the Validator interface", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("registering the type in the Scheme")
Expand Down Expand Up @@ -196,7 +196,7 @@ var _ = Describe("webhook", func() {

It("should scaffold defaulting and validating webhooks if the type implements both Defaulter and Validator interfaces", func() {
By("creating a controller manager")
m, err := manager.New(context.Background(), cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("registering the type in the Scheme")
Expand Down Expand Up @@ -273,7 +273,7 @@ var _ = Describe("webhook", func() {
By("creating a controller manager")
ctx, cancel := context.WithCancel(context.Background())

m, err := manager.New(ctx, cfg, manager.Options{})
m, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("registering the type in the Scheme")
Expand Down
12 changes: 4 additions & 8 deletions pkg/controller/controller_integration_test.go
Expand Up @@ -38,25 +38,19 @@ import (

var _ = Describe("controller", func() {
var reconciled chan reconcile.Request
var stop chan struct{}
ctx := context.Background()

BeforeEach(func() {
stop = make(chan struct{})
reconciled = make(chan reconcile.Request)
Expect(cfg).NotTo(BeNil())
})

AfterEach(func() {
close(stop)
})

Describe("controller", func() {
// TODO(directxman12): write a whole suite of controller-client interaction tests

It("should reconcile", func(done Done) {
By("Creating the Manager")
cm, err := manager.New(ctx, cfg, manager.Options{})
cm, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("Creating the Controller")
Expand Down Expand Up @@ -84,9 +78,11 @@ var _ = Describe("controller", func() {
Expect(err).To(Equal(&cache.ErrCacheNotStarted{}))

By("Starting the Manager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(cm.Start(stop)).NotTo(HaveOccurred())
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
}()

deployment := &appsv1.Deployment{
Expand Down

0 comments on commit 0f46012

Please sign in to comment.