From 066bfea7df288d72eba0fcf98e1c21d72d8ec31b Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Wed, 23 Dec 2020 15:45:55 -0500 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20Move=20cluster-specifics=20from=20M?= =?UTF-8?q?anager=20into=20new=20pkg/cluster?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This change is the first step towards implementing the corresponding [proposal][1]. It essentially consists of moving a bunch if code from pkg/manager into pkg/cluster, slightly adjusting some tests in pkg/manager and copying applicable tests from there into pkg/cluster. It is not yet the full implementation, because the `Manager` will only correctly start it's own cache and not other caches before starting the remaining `Runnables`. This behavior matches current behavior when building something that uses multiple caches and will be fixed in a follow-up. [1]: https://github.com/kubernetes-sigs/controller-runtime/blob/master/designs/move-cluster-specific-code-out-of-manager.md --- pkg/cluster/client_builder.go | 62 ++++++ pkg/cluster/cluster.go | 254 ++++++++++++++++++++++++ pkg/cluster/cluster_suite_test.go | 66 +++++++ pkg/cluster/cluster_test.go | 316 ++++++++++++++++++++++++++++++ pkg/cluster/internal.go | 128 ++++++++++++ pkg/manager/client_builder.go | 38 +--- pkg/manager/internal.go | 65 ++---- pkg/manager/manager.go | 140 ++++--------- pkg/manager/manager_test.go | 18 +- 9 files changed, 887 insertions(+), 200 deletions(-) create mode 100644 pkg/cluster/client_builder.go create mode 100644 pkg/cluster/cluster.go create mode 100644 pkg/cluster/cluster_suite_test.go create mode 100644 pkg/cluster/cluster_test.go create mode 100644 pkg/cluster/internal.go diff --git a/pkg/cluster/client_builder.go b/pkg/cluster/client_builder.go new file mode 100644 index 0000000000..791ce16061 --- /dev/null +++ b/pkg/cluster/client_builder.go @@ -0,0 +1,62 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "k8s.io/client-go/rest" + + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +// ClientBuilder builder is the interface for the client builder. +type ClientBuilder interface { + // WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache + // for this client. This function can be called multiple times, it should append to an internal slice. + WithUncached(objs ...client.Object) ClientBuilder + + // Build returns a new client. + Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) +} + +// NewClientBuilder returns a builder to build new clients to be passed when creating a Manager. +func NewClientBuilder() ClientBuilder { + return &newClientBuilder{} +} + +type newClientBuilder struct { + uncached []client.Object +} + +func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder { + n.uncached = append(n.uncached, objs...) + return n +} + +func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { + // Create the Client for Write operations. + c, err := client.New(config, options) + if err != nil { + return nil, err + } + + return client.NewDelegatingClient(client.NewDelegatingClientInput{ + CacheReader: cache, + Client: c, + UncachedObjects: n.uncached, + }) +} diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go new file mode 100644 index 0000000000..2c7b22382d --- /dev/null +++ b/pkg/cluster/cluster.go @@ -0,0 +1,254 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "errors" + "time" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" +) + +// Cluster provides various methods to interact with a cluster. +type Cluster interface { + // SetFields will set any dependencies on an object for which the object has implemented the inject + // interface - e.g. inject.Client. + SetFields(interface{}) error + + // GetConfig returns an initialized Config + GetConfig() *rest.Config + + // GetScheme returns an initialized Scheme + GetScheme() *runtime.Scheme + + // GetClient returns a client configured with the Config. This client may + // not be a fully "direct" client -- it may read from a cache, for + // instance. See Options.NewClient for more information on how the default + // implementation works. + GetClient() client.Client + + // GetFieldIndexer returns a client.FieldIndexer configured with the client + GetFieldIndexer() client.FieldIndexer + + // GetCache returns a cache.Cache + GetCache() cache.Cache + + // GetEventRecorderFor returns a new EventRecorder for the provided name + GetEventRecorderFor(name string) record.EventRecorder + + // GetRESTMapper returns a RESTMapper + GetRESTMapper() meta.RESTMapper + + // GetAPIReader returns a reader that will be configured to use the API server. + // This should be used sparingly and only when the client does not fit your + // use case. + GetAPIReader() client.Reader + + // Start starts the cluster + Start(ctx context.Context) error +} + +// Options are the possible options that can be configured for a Cluster. +type Options struct { + // Scheme is the scheme used to resolve runtime.Objects to GroupVersionKinds / Resources + // Defaults to the kubernetes/client-go scheme.Scheme, but it's almost always better + // idea to pass your own scheme in. See the documentation in pkg/scheme for more information. + Scheme *runtime.Scheme + + // MapperProvider provides the rest mapper used to map go types to Kubernetes APIs + MapperProvider func(c *rest.Config) (meta.RESTMapper, error) + + // Logger is the logger that should be used by this Cluster. + // If none is set, it defaults to log.Log global logger. + Logger logr.Logger + + // SyncPeriod determines the minimum frequency at which watched resources are + // reconciled. A lower period will correct entropy more quickly, but reduce + // responsiveness to change if there are many watched resources. Change this + // value only if you know what you are doing. Defaults to 10 hours if unset. + // there will a 10 percent jitter between the SyncPeriod of all controllers + // so that all controllers will not send list requests simultaneously. + SyncPeriod *time.Duration + + // Namespace if specified restricts the manager's cache to watch objects in + // the desired namespace Defaults to all namespaces + // + // Note: If a namespace is specified, controllers can still Watch for a + // cluster-scoped resource (e.g Node). For namespaced resources the cache + // will only hold objects from the desired namespace. + Namespace string + + // NewCache is the function that will create the cache to be used + // by the manager. If not set this will use the default new cache function. + NewCache cache.NewCacheFunc + + // ClientBuilder is the builder that creates the client to be used by the manager. + // If not set this will create the default DelegatingClient that will + // use the cache for reads and the client for writes. + ClientBuilder ClientBuilder + + // ClientDisableCacheFor tells the client that, if any cache is used, to bypass it + // for the given objects. + ClientDisableCacheFor []client.Object + + // DryRunClient specifies whether the client should be configured to enforce + // dryRun mode. + DryRunClient bool + + // EventBroadcaster records Events emitted by the manager and sends them to the Kubernetes API + // Use this to customize the event correlator and spam filter + // + // Deprecated: using this may cause goroutine leaks if the lifetime of your manager or controllers + // is shorter than the lifetime of your process. + EventBroadcaster record.EventBroadcaster + + // makeBroadcaster allows deferring the creation of the broadcaster to + // avoid leaking goroutines if we never call Start on this manager. It also + // returns whether or not this is a "owned" broadcaster, and as such should be + // stopped with the manager. + makeBroadcaster intrec.EventBroadcasterProducer + + // Dependency injection for testing + newRecorderProvider func(config *rest.Config, scheme *runtime.Scheme, logger logr.Logger, makeBroadcaster intrec.EventBroadcasterProducer) (*intrec.Provider, error) +} + +// Option can be used to manipulate Options +type Option func(*Options) + +// New constructs a brand new cluster +func New(config *rest.Config, opts ...Option) (Cluster, error) { + if config == nil { + return nil, errors.New("must specify Config") + } + + options := Options{} + for _, opt := range opts { + opt(&options) + } + options = setOptionsDefaults(options) + + // Create the mapper provider + mapper, err := options.MapperProvider(config) + if err != nil { + options.Logger.Error(err, "Failed to get API Group-Resources") + return nil, err + } + + // Create the cache for the cached read client and registering informers + cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) + if err != nil { + return nil, err + } + + clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper} + + apiReader, err := client.New(config, clientOptions) + if err != nil { + return nil, err + } + + writeObj, err := options.ClientBuilder. + WithUncached(options.ClientDisableCacheFor...). + Build(cache, config, clientOptions) + if err != nil { + return nil, err + } + + if options.DryRunClient { + writeObj = client.NewDryRunClient(writeObj) + } + + // Create the recorder provider to inject event recorders for the components. + // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific + // to the particular controller that it's being injected into, rather than a generic one like is here. + recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) + if err != nil { + return nil, err + } + + return &cluster{ + config: config, + scheme: options.Scheme, + cache: cache, + fieldIndexes: cache, + client: writeObj, + apiReader: apiReader, + recorderProvider: recorderProvider, + mapper: mapper, + logger: options.Logger, + }, nil +} + +// setOptionsDefaults set default values for Options fields +func setOptionsDefaults(options Options) Options { + // Use the Kubernetes client-go scheme if none is specified + if options.Scheme == nil { + options.Scheme = scheme.Scheme + } + + if options.MapperProvider == nil { + options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { + return apiutil.NewDynamicRESTMapper(c) + } + } + + // Allow the client builder to be mocked + if options.ClientBuilder == nil { + options.ClientBuilder = NewClientBuilder() + } + + // Allow newCache to be mocked + if options.NewCache == nil { + options.NewCache = cache.New + } + + // Allow newRecorderProvider to be mocked + if options.newRecorderProvider == nil { + options.newRecorderProvider = intrec.NewProvider + } + + // This is duplicated with pkg/manager, we need it here to provide + // the user with an EventBroadcaster and there for the Leader election + if options.EventBroadcaster == nil { + // defer initialization to avoid leaking by default + options.makeBroadcaster = func() (record.EventBroadcaster, bool) { + return record.NewBroadcaster(), true + } + } else { + options.makeBroadcaster = func() (record.EventBroadcaster, bool) { + return options.EventBroadcaster, false + } + } + + if options.Logger == nil { + options.Logger = logf.RuntimeLog.WithName("cluster") + } + + return options +} diff --git a/pkg/cluster/cluster_suite_test.go b/pkg/cluster/cluster_suite_test.go new file mode 100644 index 0000000000..f155ed6971 --- /dev/null +++ b/pkg/cluster/cluster_suite_test.go @@ -0,0 +1,66 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "net/http" + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/envtest" + "sigs.k8s.io/controller-runtime/pkg/envtest/printer" + logf "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +func TestSource(t *testing.T) { + RegisterFailHandler(Fail) + suiteName := "Cluster Suite" + RunSpecsWithDefaultAndCustomReporters(t, suiteName, []Reporter{printer.NewlineReporter{}, printer.NewProwReporter(suiteName)}) +} + +var testenv *envtest.Environment +var cfg *rest.Config +var clientset *kubernetes.Clientset + +// clientTransport is used to force-close keep-alives in tests that check for leaks +var clientTransport *http.Transport + +var _ = BeforeSuite(func(done Done) { + logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true))) + + testenv = &envtest.Environment{} + + var err error + cfg, err = testenv.Start() + Expect(err).NotTo(HaveOccurred()) + + clientTransport = &http.Transport{} + cfg.Transport = clientTransport + + clientset, err = kubernetes.NewForConfig(cfg) + Expect(err).NotTo(HaveOccurred()) + + close(done) +}, 60) + +var _ = AfterSuite(func() { + Expect(testenv.Stop()).To(Succeed()) +}) diff --git a/pkg/cluster/cluster_test.go b/pkg/cluster/cluster_test.go new file mode 100644 index 0000000000..c407869fa0 --- /dev/null +++ b/pkg/cluster/cluster_test.go @@ -0,0 +1,316 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + "fmt" + + "github.com/go-logr/logr" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + "go.uber.org/goleak" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/informertest" + "sigs.k8s.io/controller-runtime/pkg/client" + logf "sigs.k8s.io/controller-runtime/pkg/internal/log" + intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" +) + +type fakeClientBuilder struct { + err error +} + +func (e *fakeClientBuilder) WithUncached(objs ...client.Object) ClientBuilder { + return e +} + +func (e *fakeClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { + return nil, e.err +} + +var _ = Describe("cluster.Cluster", func() { + Describe("New", func() { + It("should return an error if there is no Config", func() { + c, err := New(nil) + Expect(c).To(BeNil()) + Expect(err.Error()).To(ContainSubstring("must specify Config")) + + }) + + It("should return an error if it can't create a RestMapper", func() { + expected := fmt.Errorf("expected error: RestMapper") + c, err := New(cfg, func(o *Options) { + o.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { return nil, expected } + }) + Expect(c).To(BeNil()) + Expect(err).To(Equal(expected)) + + }) + + It("should return an error it can't create a client.Client", func(done Done) { + c, err := New(cfg, func(o *Options) { + o.ClientBuilder = &fakeClientBuilder{err: fmt.Errorf("expected error")} + }) + Expect(c).To(BeNil()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("expected error")) + + close(done) + }) + + It("should return an error it can't create a cache.Cache", func(done Done) { + c, err := New(cfg, func(o *Options) { + o.NewCache = func(config *rest.Config, opts cache.Options) (cache.Cache, error) { + return nil, fmt.Errorf("expected error") + } + }) + Expect(c).To(BeNil()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("expected error")) + + close(done) + }) + + It("should create a client defined in by the new client function", func(done Done) { + c, err := New(cfg, func(o *Options) { + o.ClientBuilder = &fakeClientBuilder{} + }) + Expect(c).ToNot(BeNil()) + Expect(err).ToNot(HaveOccurred()) + Expect(c.GetClient()).To(BeNil()) + + close(done) + }) + + It("should return an error it can't create a recorder.Provider", func(done Done) { + c, err := New(cfg, func(o *Options) { + o.newRecorderProvider = func(_ *rest.Config, _ *runtime.Scheme, _ logr.Logger, _ intrec.EventBroadcasterProducer) (*intrec.Provider, error) { + return nil, fmt.Errorf("expected error") + } + }) + Expect(c).To(BeNil()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("expected error")) + + close(done) + }) + + }) + + Describe("Start", func() { + It("should stop when context is cancelled", func(done Done) { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(c.Start(ctx)).NotTo(HaveOccurred()) + + close(done) + }) + }) + + Describe("SetFields", func() { + It("should inject field values", func(done Done) { + c, err := New(cfg, func(o *Options) { + o.NewCache = func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { + return &informertest.FakeInformers{}, nil + } + }) + Expect(err).NotTo(HaveOccurred()) + + By("Injecting the dependencies") + err = c.SetFields(&injectable{ + scheme: func(scheme *runtime.Scheme) error { + defer GinkgoRecover() + Expect(scheme).To(Equal(c.GetScheme())) + return nil + }, + config: func(config *rest.Config) error { + defer GinkgoRecover() + Expect(config).To(Equal(c.GetConfig())) + return nil + }, + client: func(client client.Client) error { + defer GinkgoRecover() + Expect(client).To(Equal(c.GetClient())) + return nil + }, + cache: func(cache cache.Cache) error { + defer GinkgoRecover() + Expect(cache).To(Equal(c.GetCache())) + return nil + }, + log: func(logger logr.Logger) error { + defer GinkgoRecover() + Expect(logger).To(Equal(logf.RuntimeLog.WithName("cluster"))) + return nil + }, + }) + Expect(err).NotTo(HaveOccurred()) + + By("Returning an error if dependency injection fails") + + expected := fmt.Errorf("expected error") + err = c.SetFields(&injectable{ + client: func(client client.Client) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + scheme: func(scheme *runtime.Scheme) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + config: func(config *rest.Config) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + err = c.SetFields(&injectable{ + cache: func(c cache.Cache) error { + return expected + }, + }) + Expect(err).To(Equal(expected)) + + close(done) + }) + }) + + It("should not leak goroutines when stopped", func() { + currentGRs := goleak.IgnoreCurrent() + + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + Expect(c.Start(ctx)).NotTo(HaveOccurred()) + + // force-close keep-alive connections. These'll time anyway (after + // like 30s or so) but force it to speed up the tests. + clientTransport.CloseIdleConnections() + Eventually(func() error { return goleak.Find(currentGRs) }).Should(Succeed()) + }) + + It("should provide a function to get the Config", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + cluster, ok := c.(*cluster) + Expect(ok).To(BeTrue()) + Expect(c.GetConfig()).To(Equal(cluster.config)) + }) + + It("should provide a function to get the Client", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + cluster, ok := c.(*cluster) + Expect(ok).To(BeTrue()) + Expect(c.GetClient()).To(Equal(cluster.client)) + }) + + It("should provide a function to get the Scheme", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + cluster, ok := c.(*cluster) + Expect(ok).To(BeTrue()) + Expect(c.GetScheme()).To(Equal(cluster.scheme)) + }) + + It("should provide a function to get the FieldIndexer", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + cluster, ok := c.(*cluster) + Expect(ok).To(BeTrue()) + Expect(c.GetFieldIndexer()).To(Equal(cluster.cache)) + }) + + It("should provide a function to get the EventRecorder", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(c.GetEventRecorderFor("test")).NotTo(BeNil()) + }) + It("should provide a function to get the APIReader", func() { + c, err := New(cfg) + Expect(err).NotTo(HaveOccurred()) + Expect(c.GetAPIReader()).NotTo(BeNil()) + }) +}) + +var _ inject.Cache = &injectable{} +var _ inject.Client = &injectable{} +var _ inject.Scheme = &injectable{} +var _ inject.Config = &injectable{} +var _ inject.Logger = &injectable{} + +type injectable struct { + scheme func(scheme *runtime.Scheme) error + client func(client.Client) error + config func(config *rest.Config) error + cache func(cache.Cache) error + log func(logger logr.Logger) error +} + +func (i *injectable) InjectCache(c cache.Cache) error { + if i.cache == nil { + return nil + } + return i.cache(c) +} + +func (i *injectable) InjectConfig(config *rest.Config) error { + if i.config == nil { + return nil + } + return i.config(config) +} + +func (i *injectable) InjectClient(c client.Client) error { + if i.client == nil { + return nil + } + return i.client(c) +} + +func (i *injectable) InjectScheme(scheme *runtime.Scheme) error { + if i.scheme == nil { + return nil + } + return i.scheme(scheme) +} + +func (i *injectable) InjectLogger(log logr.Logger) error { + if i.log == nil { + return nil + } + return i.log(log) +} + +func (i *injectable) Start(<-chan struct{}) error { + return nil +} diff --git a/pkg/cluster/internal.go b/pkg/cluster/internal.go new file mode 100644 index 0000000000..125e1d144e --- /dev/null +++ b/pkg/cluster/internal.go @@ -0,0 +1,128 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cluster + +import ( + "context" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/record" + + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" + "sigs.k8s.io/controller-runtime/pkg/runtime/inject" +) + +type cluster struct { + // config is the rest.config used to talk to the apiserver. Required. + config *rest.Config + + // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults + // to scheme.scheme. + scheme *runtime.Scheme + + cache cache.Cache + + // TODO(directxman12): Provide an escape hatch to get individual indexers + // client is the client injected into Controllers (and EventHandlers, Sources and Predicates). + client client.Client + + // apiReader is the reader that will make requests to the api server and not the cache. + apiReader client.Reader + + // fieldIndexes knows how to add field indexes over the Cache used by this controller, + // which can later be consumed via field selectors from the injected client. + fieldIndexes client.FieldIndexer + + // recorderProvider is used to generate event recorders that will be injected into Controllers + // (and EventHandlers, Sources and Predicates). + recorderProvider *intrec.Provider + + // mapper is used to map resources to kind, and map kind and version. + mapper meta.RESTMapper + + // Logger is the logger that should be used by this manager. + // If none is set, it defaults to log.Log global logger. + logger logr.Logger +} + +func (c *cluster) SetFields(i interface{}) error { + if _, err := inject.ConfigInto(c.config, i); err != nil { + return err + } + if _, err := inject.ClientInto(c.client, i); err != nil { + return err + } + if _, err := inject.APIReaderInto(c.apiReader, i); err != nil { + return err + } + if _, err := inject.SchemeInto(c.scheme, i); err != nil { + return err + } + if _, err := inject.CacheInto(c.cache, i); err != nil { + return err + } + if _, err := inject.MapperInto(c.mapper, i); err != nil { + return err + } + return nil +} + +func (c *cluster) GetConfig() *rest.Config { + return c.config +} + +func (c *cluster) GetClient() client.Client { + return c.client +} + +func (c *cluster) GetScheme() *runtime.Scheme { + return c.scheme +} + +func (c *cluster) GetFieldIndexer() client.FieldIndexer { + return c.fieldIndexes +} + +func (c *cluster) GetCache() cache.Cache { + return c.cache +} + +func (c *cluster) GetEventRecorderFor(name string) record.EventRecorder { + return c.recorderProvider.GetEventRecorderFor(name) +} + +func (c *cluster) GetRESTMapper() meta.RESTMapper { + return c.mapper +} + +func (c *cluster) GetAPIReader() client.Reader { + return c.apiReader +} + +func (c *cluster) GetLogger() logr.Logger { + return c.logger +} + +func (c *cluster) Start(ctx context.Context) error { + defer c.recorderProvider.Stop(ctx) + return c.cache.Start(ctx) +} diff --git a/pkg/manager/client_builder.go b/pkg/manager/client_builder.go index cc9f0817f0..e2fea8d1f7 100644 --- a/pkg/manager/client_builder.go +++ b/pkg/manager/client_builder.go @@ -17,45 +17,13 @@ limitations under the License. package manager import ( - "k8s.io/client-go/rest" - "sigs.k8s.io/controller-runtime/pkg/cache" - "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" ) // ClientBuilder builder is the interface for the client builder. -type ClientBuilder interface { - // WithUncached takes a list of runtime objects (plain or lists) that users don't want to cache - // for this client. This function can be called multiple times, it should append to an internal slice. - WithUncached(objs ...client.Object) ClientBuilder - - // Build returns a new client. - Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) -} +type ClientBuilder = cluster.ClientBuilder // NewClientBuilder returns a builder to build new clients to be passed when creating a Manager. func NewClientBuilder() ClientBuilder { - return &newClientBuilder{} -} - -type newClientBuilder struct { - uncached []client.Object -} - -func (n *newClientBuilder) WithUncached(objs ...client.Object) ClientBuilder { - n.uncached = append(n.uncached, objs...) - return n -} - -func (n *newClientBuilder) Build(cache cache.Cache, config *rest.Config, options client.Options) (client.Client, error) { - // Create the Client for Write operations. - c, err := client.New(config, options) - if err != nil { - return nil, err - } - - return client.NewDelegatingClient(client.NewDelegatingClientInput{ - CacheReader: cache, - Client: c, - UncachedObjects: n.uncached, - }) + return cluster.NewClientBuilder() } diff --git a/pkg/manager/internal.go b/pkg/manager/internal.go index a9fe180d30..a9a313247b 100644 --- a/pkg/manager/internal.go +++ b/pkg/manager/internal.go @@ -37,6 +37,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/healthz" intrec "sigs.k8s.io/controller-runtime/pkg/internal/recorder" "sigs.k8s.io/controller-runtime/pkg/metrics" @@ -59,12 +60,8 @@ const ( var _ Runnable = &controllerManager{} type controllerManager struct { - // config is the rest.config used to talk to the apiserver. Required. - config *rest.Config - - // scheme is the scheme injected into Controllers, EventHandlers, Sources and Predicates. Defaults - // to scheme.scheme. - scheme *runtime.Scheme + // cluster holds a variety of methods to interact with a cluster. Required. + cluster cluster.Cluster // leaderElectionRunnables is the set of Controllers that the controllerManager injects deps into and Starts. // These Runnables are managed by lead election. @@ -73,19 +70,6 @@ type controllerManager struct { // These Runnables will not be blocked by lead election. nonLeaderElectionRunnables []Runnable - cache cache.Cache - - // TODO(directxman12): Provide an escape hatch to get individual indexers - // client is the client injected into Controllers (and EventHandlers, Sources and Predicates). - client client.Client - - // apiReader is the reader that will make requests to the api server and not the cache. - apiReader client.Reader - - // fieldIndexes knows how to add field indexes over the Cache used by this controller, - // which can later be consumed via field selectors from the injected client. - fieldIndexes client.FieldIndexer - // recorderProvider is used to generate event recorders that will be injected into Controllers // (and EventHandlers, Sources and Predicates). recorderProvider *intrec.Provider @@ -97,9 +81,6 @@ type controllerManager struct { // on shutdown leaderElectionReleaseOnCancel bool - // mapper is used to map resources to kind, and map kind and version. - mapper meta.RESTMapper - // metricsListener is used to serve prometheus metrics metricsListener net.Listener @@ -225,33 +206,19 @@ func (cm *controllerManager) Add(r Runnable) error { } func (cm *controllerManager) SetFields(i interface{}) error { - if _, err := inject.ConfigInto(cm.config, i); err != nil { - return err - } - if _, err := inject.ClientInto(cm.client, i); err != nil { - return err - } - if _, err := inject.APIReaderInto(cm.apiReader, i); err != nil { - return err - } - if _, err := inject.SchemeInto(cm.scheme, i); err != nil { - return err - } - if _, err := inject.CacheInto(cm.cache, i); err != nil { - return err - } if _, err := inject.InjectorInto(cm.SetFields, i); err != nil { return err } if _, err := inject.StopChannelInto(cm.internalProceduresStop, i); err != nil { return err } - if _, err := inject.MapperInto(cm.mapper, i); err != nil { + if _, err := inject.LoggerInto(cm.logger, i); err != nil { return err } - if _, err := inject.LoggerInto(cm.logger, i); err != nil { + if err := cm.cluster.SetFields(i); err != nil { return err } + return nil } @@ -317,35 +284,35 @@ func (cm *controllerManager) AddReadyzCheck(name string, check healthz.Checker) } func (cm *controllerManager) GetConfig() *rest.Config { - return cm.config + return cm.cluster.GetConfig() } func (cm *controllerManager) GetClient() client.Client { - return cm.client + return cm.cluster.GetClient() } func (cm *controllerManager) GetScheme() *runtime.Scheme { - return cm.scheme + return cm.cluster.GetScheme() } func (cm *controllerManager) GetFieldIndexer() client.FieldIndexer { - return cm.fieldIndexes + return cm.cluster.GetFieldIndexer() } func (cm *controllerManager) GetCache() cache.Cache { - return cm.cache + return cm.cluster.GetCache() } func (cm *controllerManager) GetEventRecorderFor(name string) record.EventRecorder { - return cm.recorderProvider.GetEventRecorderFor(name) + return cm.cluster.GetEventRecorderFor(name) } func (cm *controllerManager) GetRESTMapper() meta.RESTMapper { - return cm.mapper + return cm.cluster.GetRESTMapper() } func (cm *controllerManager) GetAPIReader() client.Reader { - return cm.apiReader + return cm.cluster.GetAPIReader() } func (cm *controllerManager) GetWebhookServer() *webhook.Server { @@ -624,7 +591,7 @@ func (cm *controllerManager) waitForCache(ctx context.Context) { // Start the Cache. Allow the function to start the cache to be mocked out for testing if cm.startCache == nil { - cm.startCache = cm.cache.Start + cm.startCache = cm.cluster.Start } cm.startRunnable(RunnableFunc(func(ctx context.Context) error { return cm.startCache(ctx) @@ -632,7 +599,7 @@ func (cm *controllerManager) waitForCache(ctx context.Context) { // Wait for the caches to sync. // TODO(community): Check the return value and write a test - cm.cache.WaitForCacheSync(ctx) + cm.cluster.GetCache().WaitForCacheSync(ctx) // TODO: This should be the return value of cm.cache.WaitForCacheSync but we abuse // cm.started as check if we already started the cache so it must always become true. // Making sure that the cache doesn't get started twice is needed to not get a "close diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index f69030391c..2f676ad7a0 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -28,13 +28,12 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/tools/record" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/config" "sigs.k8s.io/controller-runtime/pkg/config/v1alpha1" "sigs.k8s.io/controller-runtime/pkg/healthz" @@ -50,6 +49,9 @@ import ( // Manager initializes shared dependencies such as Caches and Clients, and provides them to Runnables. // A Manager is required to create Controllers. type Manager interface { + // Cluster holds a variety of methods to interact with a cluster. + cluster.Cluster + // Add will set requested dependencies on the component, and cause the component to be // started when Start is called. Add will inject any dependencies for which the argument // implements the inject interface - e.g. inject.Client. @@ -62,10 +64,6 @@ type Manager interface { // election was configured. Elected() <-chan struct{} - // SetFields will set any dependencies on an object for which the object has implemented the inject - // interface - e.g. inject.Client. - SetFields(interface{}) error - // AddMetricsExtraHandler adds an extra handler served on path to the http server that serves metrics. // Might be useful to register some diagnostic endpoints e.g. pprof. Note that these endpoints meant to be // sensitive and shouldn't be exposed publicly. @@ -87,35 +85,6 @@ type Manager interface { // lock was lost. Start(ctx context.Context) error - // GetConfig returns an initialized Config - GetConfig() *rest.Config - - // GetScheme returns an initialized Scheme - GetScheme() *runtime.Scheme - - // GetClient returns a client configured with the Config. This client may - // not be a fully "direct" client -- it may read from a cache, for - // instance. See Options.NewClient for more information on how the default - // implementation works. - GetClient() client.Client - - // GetFieldIndexer returns a client.FieldIndexer configured with the client - GetFieldIndexer() client.FieldIndexer - - // GetCache returns a cache.Cache - GetCache() cache.Cache - - // GetEventRecorderFor returns a new EventRecorder for the provided name - GetEventRecorderFor(name string) record.EventRecorder - - // GetRESTMapper returns a RESTMapper - GetRESTMapper() meta.RESTMapper - - // GetAPIReader returns a reader that will be configured to use the API server. - // This should be used sparingly and only when the client does not fit your - // use case. - GetAPIReader() client.Reader - // GetWebhookServer returns a webhook.Server GetWebhookServer() *webhook.Server @@ -303,49 +272,29 @@ type LeaderElectionRunnable interface { // New returns a new Manager for creating Controllers. func New(config *rest.Config, options Options) (Manager, error) { - // Initialize a rest.config if none was specified - if config == nil { - return nil, fmt.Errorf("must specify Config") - } - // Set default values for options fields options = setOptionsDefaults(options) - // Create the mapper provider - mapper, err := options.MapperProvider(config) - if err != nil { - options.Logger.Error(err, "Failed to get API Group-Resources") - return nil, err - } - - // Create the cache for the cached read client and registering informers - cache, err := options.NewCache(config, cache.Options{Scheme: options.Scheme, Mapper: mapper, Resync: options.SyncPeriod, Namespace: options.Namespace}) - if err != nil { - return nil, err - } - - clientOptions := client.Options{Scheme: options.Scheme, Mapper: mapper} - - apiReader, err := client.New(config, clientOptions) - if err != nil { - return nil, err - } - - writeObj, err := options.ClientBuilder. - WithUncached(options.ClientDisableCacheFor...). - Build(cache, config, clientOptions) + cluster, err := cluster.New(config, func(clusterOptions *cluster.Options) { + clusterOptions.Scheme = options.Scheme + clusterOptions.MapperProvider = options.MapperProvider + clusterOptions.Logger = options.Logger + clusterOptions.SyncPeriod = options.SyncPeriod + clusterOptions.Namespace = options.Namespace + clusterOptions.NewCache = options.NewCache + clusterOptions.ClientBuilder = options.ClientBuilder + clusterOptions.ClientDisableCacheFor = options.ClientDisableCacheFor + clusterOptions.DryRunClient = options.DryRunClient + clusterOptions.EventBroadcaster = options.EventBroadcaster + }) if err != nil { return nil, err } - if options.DryRunClient { - writeObj = client.NewDryRunClient(writeObj) - } - // Create the recorder provider to inject event recorders for the components. // TODO(directxman12): the log for the event provider should have a context (name, tags, etc) specific // to the particular controller that it's being injected into, rather than a generic one like is here. - recorderProvider, err := options.newRecorderProvider(config, options.Scheme, options.Logger.WithName("events"), options.makeBroadcaster) + recorderProvider, err := options.newRecorderProvider(config, cluster.GetScheme(), options.Logger.WithName("events"), options.makeBroadcaster) if err != nil { return nil, err } @@ -383,15 +332,9 @@ func New(config *rest.Config, options Options) (Manager, error) { } return &controllerManager{ - config: config, - scheme: options.Scheme, - cache: cache, - fieldIndexes: cache, - client: writeObj, - apiReader: apiReader, + cluster: cluster, recorderProvider: recorderProvider, resourceLock: resourceLock, - mapper: mapper, metricsListener: metricsListener, metricsExtraHandlers: metricsExtraHandlers, logger: options.Logger, @@ -523,25 +466,10 @@ func defaultHealthProbeListener(addr string) (net.Listener, error) { // setOptionsDefaults set default values for Options fields func setOptionsDefaults(options Options) Options { - // Use the Kubernetes client-go scheme if none is specified - if options.Scheme == nil { - options.Scheme = scheme.Scheme - } - - if options.MapperProvider == nil { - options.MapperProvider = func(c *rest.Config) (meta.RESTMapper, error) { - return apiutil.NewDynamicRESTMapper(c) - } - } - - // Allow the client builder to be mocked - if options.ClientBuilder == nil { - options.ClientBuilder = NewClientBuilder() - } - // Allow newCache to be mocked - if options.NewCache == nil { - options.NewCache = cache.New + // Allow newResourceLock to be mocked + if options.newResourceLock == nil { + options.newResourceLock = leaderelection.NewResourceLock } // Allow newRecorderProvider to be mocked @@ -549,9 +477,18 @@ func setOptionsDefaults(options Options) Options { options.newRecorderProvider = intrec.NewProvider } - // Allow newResourceLock to be mocked - if options.newResourceLock == nil { - options.newResourceLock = leaderelection.NewResourceLock + // This is duplicated with pkg/cluster, we need it here + // for the leader election and there to provide the user with + // an EventBroadcaster + if options.EventBroadcaster == nil { + // defer initialization to avoid leaking by default + options.makeBroadcaster = func() (record.EventBroadcaster, bool) { + return record.NewBroadcaster(), true + } + } else { + options.makeBroadcaster = func() (record.EventBroadcaster, bool) { + return options.EventBroadcaster, false + } } if options.newMetricsListener == nil { @@ -570,17 +507,6 @@ func setOptionsDefaults(options Options) Options { options.RetryPeriod = &retryPeriod } - if options.EventBroadcaster == nil { - // defer initialization to avoid leaking by default - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return record.NewBroadcaster(), true - } - } else { - options.makeBroadcaster = func() (record.EventBroadcaster, bool) { - return options.EventBroadcaster, false - } - } - if options.ReadinessEndpointName == "" { options.ReadinessEndpointName = defaultReadinessEndpoint } diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 172de46fb2..be5c5c3c0a 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -1318,12 +1318,12 @@ var _ = Describe("manger.Manager", func() { }) Describe("SetFields", func() { It("should inject field values", func(done Done) { - m, err := New(cfg, Options{}) + m, err := New(cfg, Options{ + NewCache: func(_ *rest.Config, _ cache.Options) (cache.Cache, error) { + return &informertest.FakeInformers{}, nil + }, + }) Expect(err).NotTo(HaveOccurred()) - mgr, ok := m.(*controllerManager) - Expect(ok).To(BeTrue()) - - mgr.cache = &informertest.FakeInformers{} By("Injecting the dependencies") err = m.SetFields(&injectable{ @@ -1480,7 +1480,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetConfig()).To(Equal(mgr.config)) + Expect(m.GetConfig()).To(Equal(mgr.cluster.GetConfig())) }) It("should provide a function to get the Client", func() { @@ -1488,7 +1488,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetClient()).To(Equal(mgr.client)) + Expect(m.GetClient()).To(Equal(mgr.cluster.GetClient())) }) It("should provide a function to get the Scheme", func() { @@ -1496,7 +1496,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetScheme()).To(Equal(mgr.scheme)) + Expect(m.GetScheme()).To(Equal(mgr.cluster.GetScheme())) }) It("should provide a function to get the FieldIndexer", func() { @@ -1504,7 +1504,7 @@ var _ = Describe("manger.Manager", func() { Expect(err).NotTo(HaveOccurred()) mgr, ok := m.(*controllerManager) Expect(ok).To(BeTrue()) - Expect(m.GetFieldIndexer()).To(Equal(mgr.fieldIndexes)) + Expect(m.GetFieldIndexer()).To(Equal(mgr.cluster.GetFieldIndexer())) }) It("should provide a function to get the EventRecorder", func() {