Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
joelanford committed Aug 18, 2022
1 parent 6182d4b commit 73b74e8
Show file tree
Hide file tree
Showing 2 changed files with 261 additions and 16 deletions.
186 changes: 186 additions & 0 deletions pkg/cache/cache_test.go
Expand Up @@ -122,6 +122,192 @@ var _ = Describe("Multi-Namespace Informer Cache", func() {
var _ = Describe("Informer Cache without DeepCopy", func() {
CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}})
})
var _ = Describe("ByNamespace Cache", func() {
defer GinkgoRecover()
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc
pod1a client.Object
pod1b client.Object
pod2a client.Object
pod2b client.Object
pod2c client.Object
pod3a client.Object
pod3b client.Object
)
BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceTwo, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceThree, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNode(testNodeOne, cl)
Expect(err).NotTo(HaveOccurred())
// namespace 1 stuff
pod1a = createPod("pod-1a", testNamespaceOne, corev1.RestartPolicyNever) // matches (everything matches)
pod1b = createPodWithLabels("pod-1b", testNamespaceOne, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // matches (everything matches)
// namespace 2 stuff
pod2a = createPodWithLabels("pod-2a", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"ns2-match": "false"}) // no match (does not match ns2 label selector)
pod2b = createPodWithLabels("pod-2b", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"ns2-match": "true"}) // matches (matches ns2 label selector)
pod2c = createPodWithLabels("pod-2c", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // no match (does not match ns2 label selector)
// namespace 3 stuff
pod3a = createPodWithLabels("pod-3a", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"other-match": "false"}) // no match (does not match default cache label selector)
pod3b = createPodWithLabels("pod-3b", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // matches (matches default cache label selector)
By("creating the informer cache")
informerCache, err = cache.BuilderByNamespace(cache.ByNamespaceOptions{
NewNamespaceCaches: map[string]cache.NewCacheFunc{
// Everything in ns1
testNamespaceOne: cache.New,
// Only things in ns2 with label "ns2-match"="true"
testNamespaceTwo: cache.BuilderWithOptions(cache.Options{
DefaultSelector: cache.ObjectSelector{
Label: labels.Set{"ns2-match": "true"}.AsSelector(),
},
}),
},
// For all other namespaces, match "other-match"="true"
NewDefaultNamespaceCache: cache.BuilderWithOptions(cache.Options{
DefaultSelector: cache.ObjectSelector{
Label: labels.Set{"other-match": "true"}.AsSelector(),
},
}),
// For cluster-scoped objects, only match metadata.name = "test-node-1"
NewClusterCache: cache.BuilderWithOptions(cache.Options{
DefaultSelector: cache.ObjectSelector{
Field: fields.OneTermEqualSelector("metadata.name", testNodeOne),
},
}),
})(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})
Describe("Get", func() {
It("should get an item from a namespace cache", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod1a), pod)
Expect(err).NotTo(HaveOccurred())
})
It("should get an item from the default namespace cache", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod3b), pod)
Expect(err).NotTo(HaveOccurred())
})
It("should get a cluster-scoped item", func() {
node := &corev1.Node{}
err := informerCache.Get(informerCacheCtx, client.ObjectKey{Name: testNodeOne}, node)
Expect(err).NotTo(HaveOccurred())
})
It("should not find an item from a namespace-specific cache if it is not matched", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod2a), pod)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
It("should not find an item from the default namespace cache if it is not matched", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod3a), pod)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
It("should not find an item at the cluster-scope if it is not matched", func() {
ns := &corev1.Namespace{}
err := informerCache.Get(informerCacheCtx, client.ObjectKey{Name: testNamespaceOne}, ns)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
})
Describe("List", func() {
When("Request is cluster-scoped", func() {
It("Should list all pods and find exactly four", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods)
Expect(err).NotTo(HaveOccurred())
sort.Slice(pods.Items, func(i, j int) bool {
if pods.Items[i].Namespace != pods.Items[j].Namespace {
return pods.Items[i].Namespace < pods.Items[j].Namespace
}
return pods.Items[i].Name < pods.Items[j].Name
})
Expect(pods.Items).To(HaveLen(4))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[0].Name).To(Equal("pod-1a"))
Expect(pods.Items[1].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[1].Name).To(Equal("pod-1b"))
Expect(pods.Items[2].Namespace).To(Equal(testNamespaceTwo))
Expect(pods.Items[2].Name).To(Equal("pod-2b"))
Expect(pods.Items[3].Namespace).To(Equal(testNamespaceThree))
Expect(pods.Items[3].Name).To(Equal("pod-3b"))
})
It("Should list nodes and find exactly one", func() {
var nodes corev1.NodeList
err := informerCache.List(informerCacheCtx, &nodes)
Expect(err).NotTo(HaveOccurred())
Expect(nodes.Items).To(HaveLen(1))
Expect(nodes.Items[0].Namespace).To(Equal(""))
Expect(nodes.Items[0].Name).To(Equal(testNodeOne))
})
It("Should list namespaces and find none", func() {
var namespaces corev1.NamespaceList
err := informerCache.List(informerCacheCtx, &namespaces)
Expect(err).NotTo(HaveOccurred())
Expect(namespaces.Items).To(HaveLen(0))
})
})
When("Request is namespace-scoped", func() {
It("Should list pods in namespace one", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceOne))
Expect(err).NotTo(HaveOccurred())
sort.Slice(pods.Items, func(i, j int) bool {
if pods.Items[i].Namespace != pods.Items[j].Namespace {
return pods.Items[i].Namespace < pods.Items[j].Namespace
}
return pods.Items[i].Name < pods.Items[j].Name
})
Expect(pods.Items).To(HaveLen(2))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[0].Name).To(Equal("pod-1a"))
Expect(pods.Items[1].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[1].Name).To(Equal("pod-1b"))
})
It("Should list pods in namespace two", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceTwo))
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceTwo))
Expect(pods.Items[0].Name).To(Equal("pod-2b"))
})
It("Should list pods in namespace three", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceThree))
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceThree))
Expect(pods.Items[0].Name).To(Equal("pod-3b"))
})
})
})
AfterEach(func() {
deletePod(pod1a)
deletePod(pod1b)
deletePod(pod2a)
deletePod(pod2b)
deletePod(pod2c)
deletePod(pod3a)
deletePod(pod3b)
informerCacheCancel()
})
})

var _ = Describe("Cache with transformers", func() {
var (
Expand Down
91 changes: 75 additions & 16 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -23,10 +23,12 @@ import (

corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/internal/objectutil"
)
Expand All @@ -37,39 +39,88 @@ type NewCacheFunc func(config *rest.Config, opts Options) (Cache, error)
// a new global namespaced cache to handle cluster scoped resources.
const globalCache = "_cluster-scope"

// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache.
// This will scope the cache to a list of namespaces. Listing for all namespaces
// will list for all the namespaces that this knows about. By default this will create
// a global cache for cluster scoped resource. Note that this is not intended
// to be used for excluding namespaces, this is better done via a Predicate. Also note that
// you may face performance issues when using this with a high number of namespaces.
func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
// ByNamespaceOptions is used to configure the functions used to create caches
// on a per-namespace basis.
type ByNamespaceOptions struct {
NewNamespaceCaches map[string]NewCacheFunc
NewClusterCache NewCacheFunc
NewDefaultNamespaceCache NewCacheFunc
}

// BuilderByNamespace builds a composite cache that delegates to per-namespace
// caches built according to the passed ByNamespaceOptions. If NewDefaultNamespaceCache
// is defined, it will be used as a catch-all for objects not in the namespaces defined in
// NewNamespaceCaches. The default namespace cache is automatically configured with extra
// field selectors to avoid duplicate caching of objects between namespace-specific caches
// and this catch-all cache. NewClusterCache is used to create a cache for cluster-scoped
// objects. If it is undefined, a default cache will be created using the New function.
func BuilderByNamespace(byNamespaceOpts ByNamespaceOptions) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
opts, err := defaultOpts(config, opts)
if err != nil {
return nil, err
}

caches := map[string]Cache{}

// create a cache for cluster scoped resources
gCache, err := New(config, opts)
if byNamespaceOpts.NewClusterCache == nil {
byNamespaceOpts.NewClusterCache = New
}
clusterCache, err := byNamespaceOpts.NewClusterCache(config, opts)
if err != nil {
return nil, fmt.Errorf("error creating global cache: %w", err)
return nil, err
}

for _, ns := range namespaces {
nsToCache := map[string]Cache{}
if byNamespaceOpts.NewDefaultNamespaceCache != nil {
defaultNamespaceCache, err := byNamespaceOpts.NewDefaultNamespaceCache(config, ignoreNamespaces(opts, byNamespaceOpts.NewNamespaceCaches))
if err != nil {
return nil, err
}
nsToCache[corev1.NamespaceAll] = defaultNamespaceCache
}

for ns, newCacheFunc := range byNamespaceOpts.NewNamespaceCaches {
opts.Namespace = ns
c, err := New(config, opts)
nsToCache[ns], err = newCacheFunc(config, opts)
if err != nil {
return nil, err
}
caches[ns] = c
}
return &multiNamespaceCache{namespaceToCache: caches, Scheme: opts.Scheme, RESTMapper: opts.Mapper, clusterCache: gCache}, nil

return &multiNamespaceCache{
namespaceToCache: nsToCache,
clusterCache: clusterCache,
RESTMapper: opts.Mapper,
Scheme: opts.Scheme,
}, nil
}
}

func ignoreNamespaces(opts Options, newObjectCaches map[string]NewCacheFunc) Options {
fieldSelectors := []fields.Selector{}
if opts.DefaultSelector.Field != nil {
fieldSelectors = append(fieldSelectors, opts.DefaultSelector.Field)
}
for ns := range newObjectCaches {
fieldSelectors = append(fieldSelectors, fields.OneTermNotEqualSelector("metadata.namespace", ns))
}
opts.DefaultSelector.Field = fields.AndSelectors(fieldSelectors...)
return opts
}

// MultiNamespacedCacheBuilder - Builder function to create a new multi-namespaced cache.
// This will scope the cache to a list of namespaces. Listing for all namespaces
// will list for all the namespaces that this knows about. By default this will create
// a global cache for cluster scoped resource. Note that this is not intended
// to be used for excluding namespaces, this is better done via a Predicate. Also note that
// you may face performance issues when using this with a high number of namespaces.
func MultiNamespacedCacheBuilder(namespaces []string) NewCacheFunc {
byNamespaceOpts := ByNamespaceOptions{NewNamespaceCaches: map[string]NewCacheFunc{}}
for _, ns := range namespaces {
byNamespaceOpts.NewNamespaceCaches[ns] = New
}
return BuilderByNamespace(byNamespaceOpts)
}

// multiNamespaceCache knows how to handle multiple namespaced caches
// Use this feature when scoping permissions for your
// operator to a list of namespaces instead of watching every namespace
Expand Down Expand Up @@ -212,6 +263,10 @@ func (c *multiNamespaceCache) Get(ctx context.Context, key client.ObjectKey, obj
}

cache, ok := c.namespaceToCache[key.Namespace]
if !ok {
// Use the default/catch-all namespace cache if we have one.
cache, ok = c.namespaceToCache[corev1.NamespaceAll]
}
if !ok {
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", key)
}
Expand All @@ -235,6 +290,10 @@ func (c *multiNamespaceCache) List(ctx context.Context, list client.ObjectList,

if listOpts.Namespace != corev1.NamespaceAll {
cache, ok := c.namespaceToCache[listOpts.Namespace]
if !ok {
// Use the default/catch-all namespace cache if we have one.
cache, ok = c.namespaceToCache[corev1.NamespaceAll]
}
if !ok {
return fmt.Errorf("unable to get: %v because of unknown namespace for the cache", listOpts.Namespace)
}
Expand Down

0 comments on commit 73b74e8

Please sign in to comment.