diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 2a8f53347e..181f5e8059 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -22,11 +22,14 @@ import ( "time" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" @@ -188,14 +191,69 @@ func BuilderWithOptions(options Options) NewCacheFunc { if options.Namespace == "" { options.Namespace = opts.Namespace } - if opts.Resync == nil { - opts.Resync = options.Resync + for obj, selector := range opts.SelectorsByObject { + options.SelectorsByObject[obj] = combineSelector(options.SelectorsByObject[obj], selector) + } + options.DefaultSelector = combineSelector(options.DefaultSelector, opts.DefaultSelector) + for obj, deepCopy := range opts.UnsafeDisableDeepCopyByObject { + if _, ok := options.UnsafeDisableDeepCopyByObject[obj]; !ok { + options.UnsafeDisableDeepCopyByObject[obj] = deepCopy + } + } + for obj, transform := range opts.TransformByObject { + if _, ok := options.TransformByObject[obj]; !ok { + options.TransformByObject[obj] = transform + } + } + if options.DefaultTransform == nil { + options.DefaultTransform = opts.DefaultTransform } - return New(config, options) } } +func combineSelector(selectors ...ObjectSelector) ObjectSelector { + ls := make([]labels.Selector, 0, len(selectors)) + fs := make([]fields.Selector, 0, len(selectors)) + for _, s := range selectors { + ls = append(ls, s.Label) + fs = append(fs, s.Field) + } + return ObjectSelector{ + Label: combineLabelSelectors(ls...), + Field: combineFieldSelectors(fs...), + } +} + +func combineLabelSelectors(ls ...labels.Selector) labels.Selector { + allReqs := labels.Requirements{} + for _, l := range ls { + if l == nil { + continue + } + reqs, _ := l.Requirements() + allReqs = append(allReqs, reqs...) + } + return labels.NewSelector().Add(allReqs...) +} + +func combineFieldSelectors(fs ...fields.Selector) fields.Selector { + nonNil := fs[:0] + for _, f := range fs { + if f == nil { + continue + } + nonNil = append(nonNil, f) + } + if len(nonNil) == 0 { + return nil + } + if len(nonNil) == 1 { + return nonNil[0] + } + return fields.AndSelectors(nonNil...) +} + func defaultOpts(config *rest.Config, opts Options) (Options, error) { // Use the default Kubernetes Scheme if unset if opts.Scheme == nil { diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index a84b08e94c..64611f57e1 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -26,7 +26,6 @@ import ( . "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -124,6 +123,203 @@ var _ = Describe("Informer Cache without DeepCopy", func() { CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}}) }) +var _ = Describe("ByNamespace Cache", func() { + defer GinkgoRecover() + var ( + informerCache cache.Cache + informerCacheCtx context.Context + informerCacheCancel context.CancelFunc + + pod1a client.Object + pod1b client.Object + pod2a client.Object + pod2b client.Object + pod2c client.Object + pod3a client.Object + pod3b client.Object + ) + + BeforeEach(func() { + informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background()) + Expect(cfg).NotTo(BeNil()) + cl, err := client.New(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceOne, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceTwo, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNamespace(testNamespaceThree, cl) + Expect(err).NotTo(HaveOccurred()) + err = ensureNode(testNodeOne, cl) + Expect(err).NotTo(HaveOccurred()) + + // namespace 1 stuff + pod1a = createPod("pod-1a", testNamespaceOne, corev1.RestartPolicyNever) // matches (everything matches) + pod1b = createPodWithLabels("pod-1b", testNamespaceOne, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // matches (everything matches) + + // namespace 2 stuff + pod2a = createPodWithLabels("pod-2a", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"ns2-match": "false"}) // no match (does not match ns2 label selector) + pod2b = createPodWithLabels("pod-2b", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"ns2-match": "true"}) // matches (matches ns2 label selector) + pod2c = createPodWithLabels("pod-2c", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // no match (does not match ns2 label selector) + + // namespace 3 stuff + pod3a = createPodWithLabels("pod-3a", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"other-match": "false"}) // no match (does not match default cache label selector) + pod3b = createPodWithLabels("pod-3b", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // matches (matches default cache label selector) + + By("creating the informer cache") + informerCache, err = cache.BuilderByNamespace(cache.ByNamespaceOptions{ + NewNamespaceCaches: map[string]cache.NewCacheFunc{ + // Everything in ns1 + testNamespaceOne: cache.New, + + // Only things in ns2 with label "ns2-match"="true" + testNamespaceTwo: cache.BuilderWithOptions(cache.Options{ + DefaultSelector: cache.ObjectSelector{ + Label: labels.Set{"ns2-match": "true"}.AsSelector(), + }, + }), + }, + // For all other namespaces, match "other-match"="true" + NewDefaultNamespaceCache: cache.BuilderWithOptions(cache.Options{ + DefaultSelector: cache.ObjectSelector{ + Label: labels.Set{"other-match": "true"}.AsSelector(), + }, + }), + // For cluster-scoped objects, only match metadata.name = "test-node-1" + NewClusterCache: cache.BuilderWithOptions(cache.Options{ + DefaultSelector: cache.ObjectSelector{ + Field: fields.OneTermEqualSelector("metadata.name", testNodeOne), + }, + }), + })(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + By("running the cache and waiting for it to sync") + // pass as an arg so that we don't race between close and re-assign + go func(ctx context.Context) { + defer GinkgoRecover() + Expect(informerCache.Start(ctx)).To(Succeed()) + }(informerCacheCtx) + Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue()) + }) + Describe("Get", func() { + It("should get an item from a namespace cache", func() { + pod := &corev1.Pod{} + err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod1a), pod) + Expect(err).NotTo(HaveOccurred()) + }) + It("should get an item from the default namespace cache", func() { + pod := &corev1.Pod{} + err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod3b), pod) + Expect(err).NotTo(HaveOccurred()) + }) + It("should get a cluster-scoped item", func() { + node := &corev1.Node{} + err := informerCache.Get(informerCacheCtx, client.ObjectKey{Name: testNodeOne}, node) + Expect(err).NotTo(HaveOccurred()) + }) + It("should not find an item from a namespace-specific cache if it is not matched", func() { + pod := &corev1.Pod{} + err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod2a), pod) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }) + It("should not find an item from the default namespace cache if it is not matched", func() { + pod := &corev1.Pod{} + err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod3a), pod) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }) + It("should not find an item at the cluster-scope if it is not matched", func() { + ns := &corev1.Namespace{} + err := informerCache.Get(informerCacheCtx, client.ObjectKey{Name: testNamespaceOne}, ns) + Expect(apierrors.IsNotFound(err)).To(BeTrue()) + }) + }) + Describe("List", func() { + When("Request is cluster-scoped", func() { + It("Should list all pods and find exactly four", func() { + var pods corev1.PodList + err := informerCache.List(informerCacheCtx, &pods) + Expect(err).NotTo(HaveOccurred()) + sort.Slice(pods.Items, func(i, j int) bool { + if pods.Items[i].Namespace != pods.Items[j].Namespace { + return pods.Items[i].Namespace < pods.Items[j].Namespace + } + return pods.Items[i].Name < pods.Items[j].Name + }) + + Expect(pods.Items).To(HaveLen(4)) + Expect(pods.Items[0].Namespace).To(Equal(testNamespaceOne)) + Expect(pods.Items[0].Name).To(Equal("pod-1a")) + Expect(pods.Items[1].Namespace).To(Equal(testNamespaceOne)) + Expect(pods.Items[1].Name).To(Equal("pod-1b")) + Expect(pods.Items[2].Namespace).To(Equal(testNamespaceTwo)) + Expect(pods.Items[2].Name).To(Equal("pod-2b")) + Expect(pods.Items[3].Namespace).To(Equal(testNamespaceThree)) + Expect(pods.Items[3].Name).To(Equal("pod-3b")) + }) + It("Should list nodes and find exactly one", func() { + var nodes corev1.NodeList + err := informerCache.List(informerCacheCtx, &nodes) + Expect(err).NotTo(HaveOccurred()) + Expect(nodes.Items).To(HaveLen(1)) + Expect(nodes.Items[0].Namespace).To(Equal("")) + Expect(nodes.Items[0].Name).To(Equal(testNodeOne)) + }) + It("Should list namespaces and find none", func() { + var namespaces corev1.NamespaceList + err := informerCache.List(informerCacheCtx, &namespaces) + Expect(err).NotTo(HaveOccurred()) + Expect(namespaces.Items).To(HaveLen(0)) + }) + }) + When("Request is namespace-scoped", func() { + It("Should list pods in namespace one", func() { + var pods corev1.PodList + err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceOne)) + Expect(err).NotTo(HaveOccurred()) + sort.Slice(pods.Items, func(i, j int) bool { + if pods.Items[i].Namespace != pods.Items[j].Namespace { + return pods.Items[i].Namespace < pods.Items[j].Namespace + } + return pods.Items[i].Name < pods.Items[j].Name + }) + + Expect(pods.Items).To(HaveLen(2)) + Expect(pods.Items[0].Namespace).To(Equal(testNamespaceOne)) + Expect(pods.Items[0].Name).To(Equal("pod-1a")) + Expect(pods.Items[1].Namespace).To(Equal(testNamespaceOne)) + Expect(pods.Items[1].Name).To(Equal("pod-1b")) + }) + It("Should list pods in namespace two", func() { + var pods corev1.PodList + err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceTwo)) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + Expect(pods.Items[0].Namespace).To(Equal(testNamespaceTwo)) + Expect(pods.Items[0].Name).To(Equal("pod-2b")) + }) + It("Should list pods in namespace three", func() { + var pods corev1.PodList + err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceThree)) + Expect(err).NotTo(HaveOccurred()) + Expect(pods.Items).To(HaveLen(1)) + Expect(pods.Items[0].Namespace).To(Equal(testNamespaceThree)) + Expect(pods.Items[0].Name).To(Equal("pod-3b")) + }) + }) + }) + + AfterEach(func() { + deletePod(pod1a) + deletePod(pod1b) + deletePod(pod2a) + deletePod(pod2b) + deletePod(pod2c) + deletePod(pod3a) + deletePod(pod3b) + informerCacheCancel() + }) +}) + var _ = Describe("Cache with transformers", func() { var ( informerCache cache.Cache diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 64514c0c55..b2b56c2d3d 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -23,10 +23,12 @@ import ( corev1 "k8s.io/api/core/v1" apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" toolscache "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/internal/objectutil" ) @@ -37,39 +39,88 @@ type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error) // a new global namespaced cache to handle cluster scoped resources. const globalCache = "_cluster-scope" -// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache. -// This will scope the cache to a list of namespaces. Listing for all namespaces -// will list for all the namespaces that this knows about. By default this will create -// a global cache for cluster scoped resource. Note that this is not intended -// to be used for excluding namespaces, this is better done via a Predicate. Also note that -// you may face performance issues when using this with a high number of namespaces. -func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc { +// ByNamespaceOptions is used to configure the functions used to create caches +// on a per-namespace basis. +type ByNamespaceOptions struct { + NewNamespaceCaches map[string]NewCacheFunc + NewClusterCache NewCacheFunc + NewDefaultNamespaceCache NewCacheFunc +} + +// BuilderByNamespace builds a composite cache that delegates to per-namespace +// caches built according to the passed ByNamespaceOptions. If NewDefaultNamespaceCache +// is defined, it will be used as a catch-all for objects not in the namespaces defined in +// NewNamespaceCaches. The default namespace cache is automatically configured with extra +// field selectors to avoid duplicate caching of objects between namespace-specific caches +// and this catch-all cache. NewClusterCache is used to create a cache for cluster-scoped +// objects. If it is undefined, a default cache will be created using the New function. +func BuilderByNamespace(byNamespaceOpts ByNamespaceOptions) NewCacheFunc { return func(config *rest.Config, opts Options) (Cache, error) { opts, err := defaultOpts(config, opts) if err != nil { return nil, err } - caches := map[string]Cache{} - - // create a cache for cluster scoped resources - gCache, err := New(config, opts) + if byNamespaceOpts.NewClusterCache == nil { + byNamespaceOpts.NewClusterCache = New + } + clusterCache, err := byNamespaceOpts.NewClusterCache(config, opts) if err != nil { - return nil, fmt.Errorf("error creating global cache: %w", err) + return nil, err } - for _, ns := range namespaces { + nsToCache := map[string]Cache{} + if byNamespaceOpts.NewDefaultNamespaceCache != nil { + defaultNamespaceCache, err := byNamespaceOpts.NewDefaultNamespaceCache(config, ignoreNamespaces(opts, byNamespaceOpts.NewNamespaceCaches)) + if err != nil { + return nil, err + } + nsToCache[corev1.NamespaceAll] = defaultNamespaceCache + } + + for ns, newCacheFunc := range byNamespaceOpts.NewNamespaceCaches { opts.Namespace = ns - c, err := New(config, opts) + nsToCache[ns], err = newCacheFunc(config, opts) if err != nil { return nil, err } - caches[ns] = c } - return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, clusterCache: gCache}, nil + + return &multiNamespaceCache{ + namespaceToCache: nsToCache, + clusterCache: clusterCache, + RESTMapper: opts.Mapper, + Scheme: opts.Scheme, + }, nil } } +func ignoreNamespaces(opts Options, newObjectCaches map[string]NewCacheFunc) Options { + fieldSelectors := []fields.Selector{} + if opts.DefaultSelector.Field != nil { + fieldSelectors = append(fieldSelectors, opts.DefaultSelector.Field) + } + for ns := range newObjectCaches { + fieldSelectors = append(fieldSelectors, fields.OneTermNotEqualSelector("metadata.namespace", ns)) + } + opts.DefaultSelector.Field = fields.AndSelectors(fieldSelectors...) + return opts +} + +// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache. +// This will scope the cache to a list of namespaces. Listing for all namespaces +// will list for all the namespaces that this knows about. By default this will create +// a global cache for cluster scoped resource. Note that this is not intended +// to be used for excluding namespaces, this is better done via a Predicate. Also note that +// you may face performance issues when using this with a high number of namespaces. +func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc { + byNamespaceOpts := ByNamespaceOptions{NewNamespaceCaches: map[string]NewCacheFunc{}} + for _, ns := range namespaces { + byNamespaceOpts.NewNamespaceCaches[ns] = New + } + return BuilderByNamespace(byNamespaceOpts) +} + // multiNamespaceCache knows how to handle multiple namespaced caches // Use this feature when scoping permissions for your // operator to a list of namespaces instead of watching every namespace @@ -212,6 +263,10 @@ func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj } cache, ok := c.namespaceToCache[key.Namespace] + if !ok { + // Use the default/catch-all namespace cache if we have one. + cache, ok = c.namespaceToCache[corev1.NamespaceAll] + } if !ok { return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key) } @@ -235,6 +290,10 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList, if listOpts.Namespace != corev1.NamespaceAll { cache, ok := c.namespaceToCache[listOpts.Namespace] + if !ok { + // Use the default/catch-all namespace cache if we have one. + cache, ok = c.namespaceToCache[corev1.NamespaceAll] + } if !ok { return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace) }