From 0f46012940198449f517d8cadfa27255fc785cf4 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Fri, 2 Oct 2020 08:30:19 -0700 Subject: [PATCH] :warning: Propagate context on Manager.Start(...) This change reshuffles how the manager accepts and operates on a context. With this change, the user experience is greatly improved, users can now use `ctrl.SetupSignalHandler()` to create a context, enrich it if they want to, and pass it to `manager.Start`. In addition, this PR changes how the context and stop channel are handled internally to ensure proper cancellation. Signed-off-by: Vince Prignano --- ...ve-cluster-specific-code-out-of-manager.md | 2 +- example_test.go | 3 +- examples/builtins/main.go | 3 +- examples/crd/main.go | 2 +- examples/scratch-env/main.go | 6 +- go.mod | 2 +- go.sum | 2 + pkg/builder/controller_test.go | 51 ++-- pkg/builder/example_test.go | 2 +- pkg/builder/example_webhook_test.go | 3 +- pkg/builder/webhook_test.go | 8 +- pkg/controller/controller_integration_test.go | 12 +- pkg/controller/controller_test.go | 19 +- pkg/envtest/webhook_test.go | 2 +- .../recorder/recorder_integration_test.go | 18 +- pkg/manager/example_test.go | 4 +- pkg/manager/internal.go | 20 +- pkg/manager/manager.go | 12 +- pkg/manager/manager_test.go | 257 +++++++++--------- pkg/manager/signals/signal.go | 10 +- pkg/manager/signals/signal_test.go | 4 +- pkg/webhook/example_test.go | 2 +- 22 files changed, 224 insertions(+), 220 deletions(-) diff --git a/designs/move-cluster-specific-code-out-of-manager.md b/designs/move-cluster-specific-code-out-of-manager.md index 8bd7efebc3..67b7a419a5 100644 --- a/designs/move-cluster-specific-code-out-of-manager.md +++ b/designs/move-cluster-specific-code-out-of-manager.md @@ -203,7 +203,7 @@ func NewSecretMirrorReconciler(mgr manager.Manager, mirrorCluster cluster.Cluste func main(){ - mgr, err := manager.New(context.Background(), cfg1, manager.Options{}) + mgr, err := manager.New( cfg1, manager.Options{}) if err != nil { panic(err) } diff --git a/example_test.go b/example_test.go index 9e529db18b..5b23a88fd6 100644 --- a/example_test.go +++ b/example_test.go @@ -38,7 +38,7 @@ import ( func Example() { var log = controllers.Log.WithName("builder-examples") - manager, err := controllers.NewManager(context.Background(), controllers.GetConfigOrDie(), controllers.Options{}) + manager, err := controllers.NewManager(controllers.GetConfigOrDie(), controllers.Options{}) if err != nil { log.Error(err, "could not create manager") os.Exit(1) @@ -79,7 +79,6 @@ func Example_updateLeaderElectionDurations() { renewDeadline := 80 * time.Second retryPeriod := 20 * time.Second manager, err := controllers.NewManager( - context.Background(), controllers.GetConfigOrDie(), controllers.Options{ LeaseDuration: &leaseDuration, diff --git a/examples/builtins/main.go b/examples/builtins/main.go index 2dafd6906f..ff1f0dfa3b 100644 --- a/examples/builtins/main.go +++ b/examples/builtins/main.go @@ -17,7 +17,6 @@ limitations under the License. package main import ( - "context" "os" appsv1 "k8s.io/api/apps/v1" @@ -43,7 +42,7 @@ func main() { // Setup a Manager entryLog.Info("setting up manager") - mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{}) + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { entryLog.Error(err, "unable to set up overall controller manager") os.Exit(1) diff --git a/examples/crd/main.go b/examples/crd/main.go index 058dc463bc..1f6cd5fac2 100644 --- a/examples/crd/main.go +++ b/examples/crd/main.go @@ -104,7 +104,7 @@ func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu func main() { ctrl.SetLogger(zap.New()) - mgr, err := ctrl.NewManager(context.Background(), ctrl.GetConfigOrDie(), ctrl.Options{}) + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) if err != nil { setupLog.Error(err, "unable to start manager") os.Exit(1) diff --git a/examples/scratch-env/main.go b/examples/scratch-env/main.go index 79d0057b21..32c6ffcd48 100644 --- a/examples/scratch-env/main.go +++ b/examples/scratch-env/main.go @@ -3,11 +3,12 @@ package main import ( goflag "flag" "fmt" - flag "github.com/spf13/pflag" "io" "io/ioutil" "os" + flag "github.com/spf13/pflag" + "k8s.io/client-go/tools/clientcmd" kcapi "k8s.io/client-go/tools/clientcmd/api" ctrl "sigs.k8s.io/controller-runtime" @@ -102,7 +103,8 @@ func runMain() int { log.Info("Wrote kubeconfig") } - <-ctrl.SetupSignalHandler() + ctx := ctrl.SetupSignalHandler() + <-ctx.Done() log.Info("Shutting down apiserver & etcd") err = env.Stop() diff --git a/go.mod b/go.mod index 881e3194ab..3cf8801920 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,6 @@ require ( k8s.io/apiextensions-apiserver v0.19.2 k8s.io/apimachinery v0.19.2 k8s.io/client-go v0.19.2 - k8s.io/utils v0.0.0-20200729134348-d5654de09c73 + k8s.io/utils v0.0.0-20200912215256-4140de9c8800 sigs.k8s.io/yaml v1.2.0 ) diff --git a/go.sum b/go.sum index 1185f052cf..418b319378 100644 --- a/go.sum +++ b/go.sum @@ -632,6 +632,8 @@ k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6 h1:+WnxoVtG8TMiudHBSEtrVL k8s.io/kube-openapi v0.0.0-20200805222855-6aeccd4b50c6/go.mod h1:UuqjUnNftUyPE5H64/qeyjQoUZhGpeFDVdxjTeEVN2o= k8s.io/utils v0.0.0-20200729134348-d5654de09c73 h1:uJmqzgNWG7XyClnU/mLPBWwfKKF1K8Hf8whTseBgJcg= k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= +k8s.io/utils v0.0.0-20200912215256-4140de9c8800 h1:9ZNvfPvVIEsp/T1ez4GQuzCcCTEQWhovSofhqR73A6g= +k8s.io/utils v0.0.0-20200912215256-4140de9c8800/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.0.9/go.mod h1:dzAXnQbTRyDlZPJX2SUPEqvnB+j7AJjtlox7PEwigU0= sigs.k8s.io/structured-merge-diff/v4 v4.0.1 h1:YXTMot5Qz/X1iBRJhAt+vI+HVttY0WkSqqhKxQ0xVbA= diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index 057efa4a36..06a62a7e2b 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -61,17 +61,10 @@ func (l *testLogger) WithValues(_ ...interface{}) logr.Logger { } var _ = Describe("application", func() { - var stop chan struct{} - BeforeEach(func() { - stop = make(chan struct{}) newController = controller.New }) - AfterEach(func() { - close(stop) - }) - noop := reconcile.Func(func(context.Context, reconcile.Request) (reconcile.Result, error) { return reconcile.Result{}, nil }) @@ -79,7 +72,7 @@ var _ = Describe("application", func() { Describe("New", func() { It("should return success if given valid objects", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -92,7 +85,7 @@ var _ = Describe("application", func() { It("should return error if given two apiType objects in For function", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -106,7 +99,7 @@ var _ = Describe("application", func() { It("should return an error if For function is not called", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -118,7 +111,7 @@ var _ = Describe("application", func() { It("should return an error if there is no GVK for an object, and thus we can't default the controller name", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("creating a controller with a bad For type") @@ -141,7 +134,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -164,7 +157,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -186,7 +179,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -209,7 +202,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -230,7 +223,7 @@ var _ = Describe("application", func() { } By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) instance, err := ControllerManagedBy(m). @@ -244,7 +237,7 @@ var _ = Describe("application", func() { It("should allow multiple controllers for the same kind", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -273,18 +266,21 @@ var _ = Describe("application", func() { Describe("Start with ControllerManagedBy", func() { It("should Reconcile Owns objects", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) bldr := ControllerManagedBy(m). For(&appsv1.Deployment{}). Owns(&appsv1.ReplicaSet{}) - doReconcileTest("3", stop, bldr, m, false) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doReconcileTest(ctx, "3", bldr, m, false) close(done) }, 10) It("should Reconcile Watches objects", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) bldr := ControllerManagedBy(m). @@ -292,14 +288,17 @@ var _ = Describe("application", func() { Watches( // Equivalent of Owns &source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true}) - doReconcileTest("4", stop, bldr, m, true) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doReconcileTest(ctx, "4", bldr, m, true) close(done) }, 10) }) Describe("Set custom predicates", func() { It("should execute registered predicates only for assigned kind", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) var ( @@ -347,7 +346,9 @@ var _ = Describe("application", func() { Owns(&appsv1.ReplicaSet{}, WithPredicates(replicaSetPrct)). WithEventFilter(allPrct) - doReconcileTest("5", stop, bldr, m, true) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + doReconcileTest(ctx, "5", bldr, m, true) Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once") Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once") @@ -359,7 +360,7 @@ var _ = Describe("application", func() { }) -func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr manager.Manager, complete bool) { +func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) { deployName := "deploy-name-" + nameSuffix rsName := "rs-name-" + nameSuffix @@ -389,7 +390,7 @@ func doReconcileTest(nameSuffix string, stop chan struct{}, blder *Builder, mgr By("Starting the application") go func() { defer GinkgoRecover() - Expect(mgr.Start(stop)).NotTo(HaveOccurred()) + Expect(mgr.Start(ctx)).NotTo(HaveOccurred()) By("Stopping the application") }() diff --git a/pkg/builder/example_test.go b/pkg/builder/example_test.go index 207867d470..8dd7249516 100644 --- a/pkg/builder/example_test.go +++ b/pkg/builder/example_test.go @@ -45,7 +45,7 @@ func ExampleBuilder() { var log = logf.Log.WithName("builder-examples") - mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{}) + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { log.Error(err, "could not create manager") os.Exit(1) diff --git a/pkg/builder/example_webhook_test.go b/pkg/builder/example_webhook_test.go index f2dacd4dbc..63333a2478 100644 --- a/pkg/builder/example_webhook_test.go +++ b/pkg/builder/example_webhook_test.go @@ -17,7 +17,6 @@ limitations under the License. package builder_test import ( - "context" "os" "sigs.k8s.io/controller-runtime/pkg/builder" @@ -40,7 +39,7 @@ var _ admission.Validator = &examplegroup.ChaosPod{} func ExampleWebhookBuilder() { var log = logf.Log.WithName("webhookbuilder-example") - mgr, err := manager.New(context.Background(), config.GetConfigOrDie(), manager.Options{}) + mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{}) if err != nil { log.Error(err, "could not create manager") os.Exit(1) diff --git a/pkg/builder/webhook_test.go b/pkg/builder/webhook_test.go index 4ccca4f1eb..e2d5d052e0 100644 --- a/pkg/builder/webhook_test.go +++ b/pkg/builder/webhook_test.go @@ -51,7 +51,7 @@ var _ = Describe("webhook", func() { Describe("New", func() { It("should scaffold a defaulting webhook if the type implements the Defaulter interface", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -123,7 +123,7 @@ var _ = Describe("webhook", func() { It("should scaffold a validating webhook if the type implements the Validator interface", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -196,7 +196,7 @@ var _ = Describe("webhook", func() { It("should scaffold defaulting and validating webhooks if the type implements both Defaulter and Validator interfaces", func() { By("creating a controller manager") - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") @@ -273,7 +273,7 @@ var _ = Describe("webhook", func() { By("creating a controller manager") ctx, cancel := context.WithCancel(context.Background()) - m, err := manager.New(ctx, cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("registering the type in the Scheme") diff --git a/pkg/controller/controller_integration_test.go b/pkg/controller/controller_integration_test.go index 0f31c33fb7..762b3d9fbb 100644 --- a/pkg/controller/controller_integration_test.go +++ b/pkg/controller/controller_integration_test.go @@ -38,25 +38,19 @@ import ( var _ = Describe("controller", func() { var reconciled chan reconcile.Request - var stop chan struct{} ctx := context.Background() BeforeEach(func() { - stop = make(chan struct{}) reconciled = make(chan reconcile.Request) Expect(cfg).NotTo(BeNil()) }) - AfterEach(func() { - close(stop) - }) - Describe("controller", func() { // TODO(directxman12): write a whole suite of controller-client interaction tests It("should reconcile", func(done Done) { By("Creating the Manager") - cm, err := manager.New(ctx, cfg, manager.Options{}) + cm, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") @@ -84,9 +78,11 @@ var _ = Describe("controller", func() { Expect(err).To(Equal(&cache.ErrCacheNotStarted{})) By("Starting the Manager") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(cm.Start(stop)).NotTo(HaveOccurred()) + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) }() deployment := &appsv1.Deployment{ diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index cf37a989f5..5b68ae2299 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -38,7 +38,7 @@ var _ = Describe("controller.Controller", func() { Describe("New", func() { It("should return an error if Name is not Specified", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("", m, controller.Options{Reconciler: rec}) Expect(c).To(BeNil()) @@ -48,7 +48,7 @@ var _ = Describe("controller.Controller", func() { }) It("should return an error if Reconciler is not Specified", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("foo", m, controller.Options{}) @@ -59,7 +59,7 @@ var _ = Describe("controller.Controller", func() { }) It("NewController should return an error if injecting Reconciler fails", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c, err := controller.New("foo", m, controller.Options{Reconciler: &failRec{}}) @@ -71,7 +71,7 @@ var _ = Describe("controller.Controller", func() { }) It("should not return an error if two controllers are registered with different names", func(done Done) { - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) c1, err := controller.New("c1", m, controller.Options{Reconciler: rec}) @@ -88,16 +88,15 @@ var _ = Describe("controller.Controller", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) _, err = controller.New("new-controller", m, controller.Options{Reconciler: rec}) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - close(s) - - Expect(m.Start(s)).To(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(m.Start(ctx)).To(Succeed()) // force-close keep-alive connections. These'll time anyway (after // like 30s or so) but force it to speed up the tests. @@ -108,7 +107,7 @@ var _ = Describe("controller.Controller", func() { It("should not create goroutines if never started", func() { currentGRs := goleak.IgnoreCurrent() - m, err := manager.New(context.Background(), cfg, manager.Options{}) + m, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) _, err = controller.New("new-controller", m, controller.Options{Reconciler: rec}) diff --git a/pkg/envtest/webhook_test.go b/pkg/envtest/webhook_test.go index 4a71e6bd00..1064190745 100644 --- a/pkg/envtest/webhook_test.go +++ b/pkg/envtest/webhook_test.go @@ -22,7 +22,7 @@ var _ = Describe("Test", func() { Describe("Webhook", func() { It("should reject create request for webhook that rejects all requests", func(done Done) { - m, err := manager.New(context.Background(), env.Config, manager.Options{ + m, err := manager.New(env.Config, manager.Options{ Port: env.WebhookInstallOptions.LocalServingPort, Host: env.WebhookInstallOptions.LocalServingHost, CertDir: env.WebhookInstallOptions.LocalServingCertDir, diff --git a/pkg/internal/recorder/recorder_integration_test.go b/pkg/internal/recorder/recorder_integration_test.go index 25b05fa1e3..a67d0e1ed5 100644 --- a/pkg/internal/recorder/recorder_integration_test.go +++ b/pkg/internal/recorder/recorder_integration_test.go @@ -36,22 +36,10 @@ import ( ) var _ = Describe("recorder", func() { - var stop chan struct{} - ctx := context.Background() - - BeforeEach(func() { - stop = make(chan struct{}) - Expect(cfg).NotTo(BeNil()) - }) - - AfterEach(func() { - close(stop) - }) - Describe("recorder", func() { It("should publish events", func(done Done) { By("Creating the Manager") - cm, err := manager.New(ctx, cfg, manager.Options{}) + cm, err := manager.New(cfg, manager.Options{}) Expect(err).NotTo(HaveOccurred()) By("Creating the Controller") @@ -72,9 +60,11 @@ var _ = Describe("recorder", func() { Expect(err).NotTo(HaveOccurred()) By("Starting the Manager") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(cm.Start(stop)).NotTo(HaveOccurred()) + Expect(cm.Start(ctx)).NotTo(HaveOccurred()) }() deployment := &appsv1.Deployment{ diff --git a/pkg/manager/example_test.go b/pkg/manager/example_test.go index 5b1cbf98a5..3d54ca3144 100644 --- a/pkg/manager/example_test.go +++ b/pkg/manager/example_test.go @@ -41,7 +41,7 @@ func ExampleNew() { os.Exit(1) } - mgr, err := manager.New(context.Background(), cfg, manager.Options{}) + mgr, err := manager.New(cfg, manager.Options{}) if err != nil { log.Error(err, "unable to set up manager") os.Exit(1) @@ -57,7 +57,7 @@ func ExampleNew_multinamespaceCache() { os.Exit(1) } - mgr, err := manager.New(context.Background(), cfg, manager.Options{ + mgr, err := manager.New(cfg, manager.Options{ NewCache: cache.MultiNamespacedCacheBuilder([]string{"namespace1", "namespace2"}), }) if err != nil { diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index 07f4af7799..7830360d31 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -57,6 +57,7 @@ const ( defaultMetricsEndpoint = "/metrics" ) +var _ Runnable = &controllerManager{} var log = logf.RuntimeLog.WithName("manager") type controllerManager struct { @@ -187,6 +188,10 @@ type controllerManager struct { internalCtx context.Context internalCancel context.CancelFunc + + // internalProceduresStop channel is used internally to the manager when coordinating + // the proper shutdown of servers. This channel is also used for dependency injection. + internalProceduresStop chan struct{} } // Add sets dependencies on i, and adds it to the list of Runnables to start. @@ -240,7 +245,7 @@ func (cm *controllerManager) SetFields(i interface{}) error { if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } - if _, err := inject.StopChannelInto(cm.internalCtx.Done(), i); err != nil { + if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil { return err } if _, err := inject.MapperInto(cm.mapper, i); err != nil { @@ -408,7 +413,7 @@ func (cm *controllerManager) serveMetrics() { })) // Shutdown the server when stop is closed - <-cm.internalCtx.Done() + <-cm.internalProceduresStop if err := server.Shutdown(cm.shutdownCtx); err != nil { cm.errChan <- err } @@ -445,13 +450,15 @@ func (cm *controllerManager) serveHealthProbes() { cm.mu.Unlock() // Shutdown the server when stop is closed - <-cm.internalCtx.Done() + <-cm.internalProceduresStop if err := server.Shutdown(cm.shutdownCtx); err != nil { cm.errChan <- err } } -func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { +func (cm *controllerManager) Start(ctx context.Context) (err error) { + cm.internalCtx, cm.internalCancel = context.WithCancel(ctx) + // This chan indicates that stop is complete, in other words all runnables have returned or timeout on stop request stopComplete := make(chan struct{}) defer close(stopComplete) @@ -506,7 +513,7 @@ func (cm *controllerManager) Start(stop <-chan struct{}) (err error) { }() select { - case <-stop: + case <-ctx.Done(): // We are done return nil case err := <-cm.errChan: @@ -527,7 +534,8 @@ func (cm *controllerManager) engageStopProcedure(stopComplete <-chan struct{}) e } defer shutdownCancel() - // Cancel the internal context and wait for the stop procedures to complete. + // Cancel the internal stop channel and wait for the procedures to stop and complete. + close(cm.internalProceduresStop) cm.internalCancel() // Start draining the errors before acquiring the lock to make sure we don't deadlock diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 90e34dffc1..966f1d701d 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -74,12 +74,13 @@ type Manager interface { // AddReadyzCheck allows you to add Readyz checker AddReadyzCheck(name string, check healthz.Checker) error - // Start starts all registered Controllers and blocks until the Stop channel is closed. + // Start starts all registered Controllers and blocks until the context is cancelled. // Returns an error if there is an error starting any controller. + // // If LeaderElection is used, the binary must be exited immediately after this returns, // otherwise components that need leader election might continue to run after the leader // lock was lost. - Start(<-chan struct{}) error + Start(ctx context.Context) error // GetConfig returns an initialized Config GetConfig() *rest.Config @@ -295,7 +296,7 @@ type LeaderElectionRunnable interface { } // New returns a new Manager for creating Controllers. -func New(ctx context.Context, config *rest.Config, options Options) (Manager, error) { +func New(config *rest.Config, options Options) (Manager, error) { // Initialize a rest.config if none was specified if config == nil { return nil, fmt.Errorf("must specify Config") @@ -371,8 +372,6 @@ func New(ctx context.Context, config *rest.Config, options Options) (Manager, er return nil, err } - internalCtx, internalCancel := context.WithCancel(ctx) - return &controllerManager{ config: config, scheme: options.Scheme, @@ -397,8 +396,7 @@ func New(ctx context.Context, config *rest.Config, options Options) (Manager, er readinessEndpointName: options.ReadinessEndpointName, livenessEndpointName: options.LivenessEndpointName, gracefulShutdownTimeout: *options.GracefulShutdownTimeout, - internalCtx: internalCtx, - internalCancel: internalCancel, + internalProceduresStop: make(chan struct{}), }, nil } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 0b0efee3d3..38c831e8ec 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -52,19 +52,9 @@ import ( ) var _ = Describe("manger.Manager", func() { - var stop chan struct{} - - BeforeEach(func() { - stop = make(chan struct{}) - }) - - AfterEach(func() { - close(stop) - }) - Describe("New", func() { It("should return an error if there is no Config", func() { - m, err := New(context.Background(), nil, Options{}) + m, err := New(nil, Options{}) Expect(m).To(BeNil()) Expect(err.Error()).To(ContainSubstring("must specify Config")) @@ -72,7 +62,7 @@ var _ = Describe("manger.Manager", func() { It("should return an error if it can't create a RestMapper", func() { expected := fmt.Errorf("expected error: RestMapper") - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ MapperProvider: func(c *rest.Config) (meta.RESTMapper, error) { return nil, expected }, }) Expect(m).To(BeNil()) @@ -81,7 +71,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error it can't create a client.Client", func(done Done) { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { return nil, fmt.Errorf("expected error") }, @@ -94,7 +84,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error it can't create a cache.Cache", func(done Done) { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) { return nil, fmt.Errorf("expected error") }, @@ -107,7 +97,7 @@ var _ = Describe("manger.Manager", func() { }) It("should create a client defined in by the new client function", func(done Done) { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ NewClient: func(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { return nil, nil }, @@ -120,7 +110,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return an error it can't create a recorder.Provider", func(done Done) { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ newRecorderProvider: func(_ *rest.Config, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { return nil, fmt.Errorf("expected error") }, @@ -134,7 +124,7 @@ var _ = Describe("manger.Manager", func() { It("should lazily initialize a webhook server if needed", func(done Done) { By("creating a manager with options") - m, err := New(context.Background(), cfg, Options{Port: 9440, Host: "foo.com"}) + m, err := New(cfg, Options{Port: 9440, Host: "foo.com"}) Expect(err).NotTo(HaveOccurred()) Expect(m).NotTo(BeNil()) @@ -149,7 +139,7 @@ var _ = Describe("manger.Manager", func() { Context("with leader election enabled", func() { It("should only cancel the leader election after all runnables are done", func() { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id-2", @@ -174,15 +164,15 @@ var _ = Describe("manger.Manager", func() { close(leaderElectionDone) } - mgrStopChan := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) mgrDone := make(chan struct{}) go func() { defer GinkgoRecover() - Expect(m.Start(mgrStopChan)).To(BeNil()) + Expect(m.Start(ctx)).To(BeNil()) close(mgrDone) }() <-cm.elected - close(mgrStopChan) + cancel() select { case <-leaderElectionDone: Expect(errors.New("leader election was cancelled before runnables were done")).ToNot(HaveOccurred()) @@ -194,7 +184,7 @@ var _ = Describe("manger.Manager", func() { }) It("should disable gracefulShutdown when stopping to lead", func() { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id-3", @@ -203,10 +193,12 @@ var _ = Describe("manger.Manager", func() { }) Expect(err).To(BeNil()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() mgrDone := make(chan struct{}) go func() { defer GinkgoRecover() - err := m.Start(make(chan struct{})) + err := m.Start(ctx) Expect(err).ToNot(BeNil()) Expect(err.Error()).To(Equal("leader election lost")) close(mgrDone) @@ -221,7 +213,7 @@ var _ = Describe("manger.Manager", func() { }) It("should default ID to controller-runtime if ID is not set", func() { var rl resourcelock.Interface - m1, err := New(context.Background(), cfg, Options{ + m1, err := New(cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id", @@ -241,7 +233,7 @@ var _ = Describe("manger.Manager", func() { Expect(ok).To(BeTrue()) m1cm.onStoppedLeading = func() {} - m2, err := New(context.Background(), cfg, Options{ + m2, err := New(cfg, Options{ LeaderElection: true, LeaderElectionNamespace: "default", LeaderElectionID: "test-leader-election-id", @@ -268,12 +260,12 @@ var _ = Describe("manger.Manager", func() { return nil }))).To(Succeed()) - m1Stop := make(chan struct{}) - defer close(m1Stop) + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() go func() { defer GinkgoRecover() Expect(m1.Elected()).ShouldNot(BeClosed()) - Expect(m1.Start(m1Stop)).NotTo(HaveOccurred()) + Expect(m1.Start(ctx1)).NotTo(HaveOccurred()) Expect(m1.Elected()).Should(BeClosed()) }() <-c1 @@ -285,22 +277,22 @@ var _ = Describe("manger.Manager", func() { return nil }))).To(Succeed()) - m2Stop := make(chan struct{}) + ctx2, cancel := context.WithCancel(context.Background()) m2done := make(chan struct{}) go func() { defer GinkgoRecover() - Expect(m2.Start(m2Stop)).NotTo(HaveOccurred()) + Expect(m2.Start(ctx2)).NotTo(HaveOccurred()) close(m2done) }() Consistently(m2.Elected()).ShouldNot(Receive()) Consistently(c2).ShouldNot(Receive()) - close(m2Stop) + cancel() <-m2done }) It("should return an error if it can't create a ResourceLock", func() { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ newResourceLock: func(_ *rest.Config, _ recorder.Provider, _ leaderelection.Options) (resourcelock.Interface, error) { return nil, fmt.Errorf("expected error") }, @@ -309,7 +301,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).To(MatchError(ContainSubstring("expected error"))) }) It("should return an error if namespace not set and not running in cluster", func() { - m, err := New(context.Background(), cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) + m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime"}) Expect(m).To(BeNil()) Expect(err).To(HaveOccurred()) Expect(err.Error()).To(ContainSubstring("unable to find leader election namespace: not running in-cluster, please specify LeaderElectionNamespace")) @@ -319,7 +311,7 @@ var _ = Describe("manger.Manager", func() { // ConfigMap lock to a controller-runtime version that has this new default. Many users of controller-runtime skip // versions, so we should be extremely conservative here. It("should default to ConfigMapsLeasesResourceLock", func() { - m, err := New(context.Background(), cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "my-ns"}) + m, err := New(cfg, Options{LeaderElection: true, LeaderElectionID: "controller-runtime", LeaderElectionNamespace: "my-ns"}) Expect(m).ToNot(BeNil()) Expect(err).ToNot(HaveOccurred()) cm, ok := m.(*controllerManager) @@ -333,7 +325,7 @@ var _ = Describe("manger.Manager", func() { }) It("should use the specified ResourceLock", func() { - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ LeaderElection: true, LeaderElectionResourceLock: resourcelock.LeasesResourceLock, LeaderElectionID: "controller-runtime", @@ -350,7 +342,7 @@ var _ = Describe("manger.Manager", func() { It("should create a listener for the metrics if a valid address is provided", func() { var listener net.Listener - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ MetricsBindAddress: ":0", newMetricsListener: func(addr string) (net.Listener, error) { var err error @@ -369,7 +361,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).ShouldNot(HaveOccurred()) var listener net.Listener - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ MetricsBindAddress: ln.Addr().String(), newMetricsListener: func(addr string) (net.Listener, error) { var err error @@ -386,7 +378,7 @@ var _ = Describe("manger.Manager", func() { It("should create a listener for the health probes if a valid address is provided", func() { var listener net.Listener - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ HealthProbeBindAddress: ":0", newHealthProbeListener: func(addr string) (net.Listener, error) { var err error @@ -405,7 +397,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).ShouldNot(HaveOccurred()) var listener net.Listener - m, err := New(context.Background(), cfg, Options{ + m, err := New(cfg, Options{ HealthProbeBindAddress: ln.Addr().String(), newHealthProbeListener: func(addr string) (net.Listener, error) { var err error @@ -424,7 +416,7 @@ var _ = Describe("manger.Manager", func() { Describe("Start", func() { var startSuite = func(options Options, callbacks ...func(Manager)) { It("should Start each Component", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -443,10 +435,12 @@ var _ = Describe("manger.Manager", func() { return nil }))).To(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() Expect(m.Elected()).ShouldNot(BeClosed()) - Expect(m.Start(stop)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) Expect(m.Elected()).Should(BeClosed()) }() @@ -454,21 +448,21 @@ var _ = Describe("manger.Manager", func() { close(done) }) - It("should stop when stop is called", func(done Done) { - m, err := New(context.Background(), cfg, options) + It("should stop when context is cancelled", func(done Done) { + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) } - s := make(chan struct{}) - close(s) - Expect(m.Start(s)).NotTo(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }) It("should return an error if it can't start the cache", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -478,13 +472,16 @@ var _ = Describe("manger.Manager", func() { mgr.startCache = func(context.Context) error { return fmt.Errorf("expected error") } - Expect(m.Start(stop)).To(MatchError(ContainSubstring("expected error"))) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + Expect(m.Start(ctx)).To(MatchError(ContainSubstring("expected error"))) close(done) }) It("should return an error if any Components fail to Start", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -507,7 +504,9 @@ var _ = Describe("manger.Manager", func() { }))).To(Succeed()) defer GinkgoRecover() - err = m.Start(stop) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = m.Start(ctx) Expect(err).ToNot(BeNil()) Expect(err.Error()).To(Equal("expected error")) @@ -515,7 +514,7 @@ var _ = Describe("manger.Manager", func() { }) It("should wait for runnables to stop", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -548,27 +547,27 @@ var _ = Describe("manger.Manager", func() { }))).To(Succeed()) defer GinkgoRecover() - s := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) var wgManagerRunning sync.WaitGroup wgManagerRunning.Add(1) go func() { defer GinkgoRecover() defer wgManagerRunning.Done() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) Eventually(func() int64 { return atomic.LoadInt64(&runnableDoneCount) }).Should(BeEquivalentTo(2)) }() wgRunnableRunning.Wait() - close(s) + cancel() wgManagerRunning.Wait() close(done) }) It("should return an error if any Components fail to Start and wait for runnables to stop", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -595,14 +594,16 @@ var _ = Describe("manger.Manager", func() { return nil }))).To(Succeed()) - Expect(m.Start(stop)).To(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + Expect(m.Start(ctx)).To(HaveOccurred()) Expect(runnableDoneCount).To(Equal(2)) close(done) }) It("should refuse to add runnable if stop procedure is already engaged", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -618,12 +619,12 @@ var _ = Describe("manger.Manager", func() { return nil }))).To(Succeed()) - s := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) go func() { - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() wgRunnableRunning.Wait() - close(s) + cancel() time.Sleep(100 * time.Millisecond) // give some time for the stop chan closure to be caught by the manager Expect(m.Add(RunnableFunc(func(context.Context) error { defer GinkgoRecover() @@ -634,7 +635,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return both runnables and stop errors when both error", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -656,7 +657,9 @@ var _ = Describe("manger.Manager", func() { return nil } }))) - err = m.Start(make(chan struct{})) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = m.Start(ctx) Expect(err).ToNot(BeNil()) eMsg := "[not feeling like that, failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded]" Expect(err.Error()).To(Equal(eMsg)) @@ -667,7 +670,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return only stop errors if runnables dont error", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -690,13 +693,13 @@ var _ = Describe("manger.Manager", func() { return nil } }))).NotTo(HaveOccurred()) - stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) managerStopDone := make(chan struct{}) - go func() { err = m.Start(stop); close(managerStopDone) }() + go func() { err = m.Start(ctx); close(managerStopDone) }() // Use the 'elected' channel to find out if startup was done, otherwise we stop // before we started the Runnable and see flakes, mostly in low-CPU envs like CI <-m.(*controllerManager).elected - close(stop) + cancel() <-managerStopDone Expect(err).ToNot(BeNil()) Expect(err.Error()).To(Equal("failed waiting for all runnables to end within grace period of 1ns: context deadline exceeded")) @@ -707,7 +710,7 @@ var _ = Describe("manger.Manager", func() { }) It("should return only runnables error if stop doesn't error", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -715,7 +718,9 @@ var _ = Describe("manger.Manager", func() { Expect(m.Add(RunnableFunc(func(context.Context) error { return runnableError{} }))) - err = m.Start(make(chan struct{})) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + err = m.Start(ctx) Expect(err).ToNot(BeNil()) Expect(err.Error()).To(Equal("not feeling like that")) Expect(errors.Is(err, context.DeadlineExceeded)).ToNot(BeTrue()) @@ -725,7 +730,7 @@ var _ = Describe("manger.Manager", func() { }) It("should not wait for runnables if gracefulShutdownTimeout is 0", func(done Done) { - m, err := New(context.Background(), cfg, options) + m, err := New(cfg, options) Expect(err).NotTo(HaveOccurred()) for _, cb := range callbacks { cb(m) @@ -740,14 +745,14 @@ var _ = Describe("manger.Manager", func() { return nil }))).ToNot(HaveOccurred()) - managerStop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) managerStopDone := make(chan struct{}) go func() { - Expect(m.Start(managerStop)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(managerStopDone) }() <-m.(*controllerManager).elected - close(managerStop) + cancel() <-managerStopDone <-runnableStopped @@ -799,13 +804,13 @@ var _ = Describe("manger.Manager", func() { It("should stop serving metrics when stop is called", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -815,7 +820,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) // Shutdown the server - close(s) + cancel() // Expect the metrics server to shutdown Eventually(func() error { @@ -826,14 +831,14 @@ var _ = Describe("manger.Manager", func() { It("should serve metrics endpoint", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - defer close(s) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -845,14 +850,14 @@ var _ = Describe("manger.Manager", func() { It("should not serve anything other than metrics endpoint by default", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - defer close(s) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -872,14 +877,14 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) opts.MetricsBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - defer close(s) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -903,7 +908,7 @@ var _ = Describe("manger.Manager", func() { It("should serve extra endpoints", func(done Done) { opts.MetricsBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) err = m.AddMetricsExtraHandler("/debug", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -917,11 +922,11 @@ var _ = Describe("manger.Manager", func() { })) Expect(err).To(HaveOccurred()) - s := make(chan struct{}) - defer close(s) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -960,13 +965,13 @@ var _ = Describe("manger.Manager", func() { It("should stop serving health probes when stop is called", func(done Done) { opts.HealthProbeBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -976,7 +981,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) // Shutdown the server - close(s) + cancel() // Expect the health probes server to shutdown Eventually(func() error { @@ -987,7 +992,7 @@ var _ = Describe("manger.Manager", func() { It("should serve readiness endpoint", func(done Done) { opts.HealthProbeBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) res := fmt.Errorf("not ready yet") @@ -995,11 +1000,11 @@ var _ = Describe("manger.Manager", func() { err = m.AddReadyzCheck(namedCheck, func(_ *http.Request) error { return res }) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - defer close(s) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -1038,7 +1043,7 @@ var _ = Describe("manger.Manager", func() { It("should serve liveness endpoint", func(done Done) { opts.HealthProbeBindAddress = ":0" - m, err := New(context.Background(), cfg, opts) + m, err := New(cfg, opts) Expect(err).NotTo(HaveOccurred()) res := fmt.Errorf("not alive") @@ -1046,11 +1051,11 @@ var _ = Describe("manger.Manager", func() { err = m.AddHealthzCheck(namedCheck, func(_ *http.Request) error { return res }) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - defer close(s) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(s)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) close(done) }() @@ -1091,7 +1096,7 @@ var _ = Describe("manger.Manager", func() { Describe("Add", func() { It("should immediately start the Component if the Manager has already Started another Component", func(done Done) { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1104,9 +1109,11 @@ var _ = Describe("manger.Manager", func() { return nil }))).To(Succeed()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(stop)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() // Wait for the Manager to start @@ -1130,14 +1137,16 @@ var _ = Describe("manger.Manager", func() { }) It("should immediately start the Component if the Manager has already Started", func(done Done) { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() go func() { defer GinkgoRecover() - Expect(m.Start(stop)).NotTo(HaveOccurred()) + Expect(m.Start(ctx)).NotTo(HaveOccurred()) }() // Wait for the Manager to start @@ -1159,14 +1168,14 @@ var _ = Describe("manger.Manager", func() { }) It("should fail if SetFields fails", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.Add(&failRec{})).To(HaveOccurred()) }) }) Describe("SetFields", func() { It("should inject field values", func(done Done) { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1264,12 +1273,12 @@ var _ = Describe("manger.Manager", func() { It("should not leak goroutines when stopped", func() { currentGRs := goleak.IgnoreCurrent() - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) - s := make(chan struct{}) - close(s) - Expect(m.Start(s)).NotTo(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(m.Start(ctx)).NotTo(HaveOccurred()) // force-close keep-alive connections. These'll time anyway (after // like 30s or so) but force it to speed up the tests. @@ -1280,7 +1289,7 @@ var _ = Describe("manger.Manager", func() { It("should not leak goroutines if the default event broadcaster is used & events are emitted", func() { currentGRs := goleak.IgnoreCurrent() - m, err := New(context.Background(), cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) + m, err := New(cfg, Options{ /* implicit: default setting for EventBroadcaster */ }) Expect(err).NotTo(HaveOccurred()) By("adding a runnable that emits an event") @@ -1294,12 +1303,12 @@ var _ = Describe("manger.Manager", func() { }))).To(Succeed()) By("starting the manager & waiting till we've sent our event") - stopCh := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) doneCh := make(chan struct{}) go func() { defer GinkgoRecover() defer close(doneCh) - Expect(m.Start(stopCh)).To(Succeed()) + Expect(m.Start(ctx)).To(Succeed()) }() Eventually(func() *corev1.Event { evts, err := clientset.CoreV1().Events("").Search(m.GetScheme(), &ns) @@ -1314,7 +1323,7 @@ var _ = Describe("manger.Manager", func() { }).ShouldNot(BeNil()) By("making sure there's no extra go routines still running after we stop") - close(stopCh) + cancel() <-doneCh // force-close keep-alive connections. These'll time anyway (after @@ -1324,7 +1333,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the Config", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1332,7 +1341,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the Client", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1340,7 +1349,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the Scheme", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1348,7 +1357,7 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the FieldIndexer", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) @@ -1356,12 +1365,12 @@ var _ = Describe("manger.Manager", func() { }) It("should provide a function to get the EventRecorder", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.GetEventRecorderFor("test")).NotTo(BeNil()) }) It("should provide a function to get the APIReader", func() { - m, err := New(context.Background(), cfg, Options{}) + m, err := New(cfg, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(m.GetAPIReader()).NotTo(BeNil()) }) diff --git a/pkg/manager/signals/signal.go b/pkg/manager/signals/signal.go index 08eaef7b42..9a85558f82 100644 --- a/pkg/manager/signals/signal.go +++ b/pkg/manager/signals/signal.go @@ -17,6 +17,7 @@ limitations under the License. package signals import ( + "context" "os" "os/signal" ) @@ -26,18 +27,19 @@ var onlyOneSignalHandler = make(chan struct{}) // SetupSignalHandler registers for SIGTERM and SIGINT. A stop channel is returned // which is closed on one of these signals. If a second signal is caught, the program // is terminated with exit code 1. -func SetupSignalHandler() (stopCh <-chan struct{}) { +func SetupSignalHandler() context.Context { close(onlyOneSignalHandler) // panics when called twice - stop := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + c := make(chan os.Signal, 2) signal.Notify(c, shutdownSignals...) go func() { <-c - close(stop) + cancel() <-c os.Exit(1) // second signal. Exit directly. }() - return stop + return ctx } diff --git a/pkg/manager/signals/signal_test.go b/pkg/manager/signals/signal_test.go index 8ac66ceaf9..2776e13a6d 100644 --- a/pkg/manager/signals/signal_test.go +++ b/pkg/manager/signals/signal_test.go @@ -32,7 +32,7 @@ var _ = Describe("runtime signal", func() { Context("SignalHandler Test", func() { It("test signal handler", func() { - stop := SetupSignalHandler() + ctx := SetupSignalHandler() task := &Task{ ticker: time.NewTicker(time.Second * 2), } @@ -47,7 +47,7 @@ var _ = Describe("runtime signal", func() { select { case sig := <-c: fmt.Printf("Got %s signal. Aborting...\n", sig) - case _, ok := <-stop: + case _, ok := <-ctx.Done(): Expect(ok).To(BeFalse()) } }) diff --git a/pkg/webhook/example_test.go b/pkg/webhook/example_test.go index e27b03c2cb..b225fea89b 100644 --- a/pkg/webhook/example_test.go +++ b/pkg/webhook/example_test.go @@ -46,7 +46,7 @@ func Example() { // Create a manager // Note: GetConfigOrDie will os.Exit(1) w/o any message if no kube-config can be found - mgr, err := ctrl.NewManager(context.Background(), ctrl.GetConfigOrDie(), ctrl.Options{}) + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{}) if err != nil { panic(err) }