Skip to content

Commit

Permalink
multiNamespaceCache: support custom newCache funcs per namespace
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed Jul 28, 2022
1 parent f035121 commit f831e3d
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 20 deletions.
64 changes: 61 additions & 3 deletions pkg/cache/cache.go
Expand Up @@ -22,11 +22,14 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
Expand Down Expand Up @@ -188,14 +191,69 @@ func BuilderWithOptions(options Options) NewCacheFunc {
if options.Namespace == "" {
options.Namespace = opts.Namespace
}
if opts.Resync == nil {
opts.Resync = options.Resync
for obj, selector := range opts.SelectorsByObject {
options.SelectorsByObject[obj] = combineSelector(options.SelectorsByObject[obj], selector)
}
options.DefaultSelector = combineSelector(options.DefaultSelector, opts.DefaultSelector)
for obj, deepCopy := range opts.UnsafeDisableDeepCopyByObject {
if _, ok := options.UnsafeDisableDeepCopyByObject[obj]; !ok {
options.UnsafeDisableDeepCopyByObject[obj] = deepCopy
}
}
for obj, transform := range opts.TransformByObject {
if _, ok := options.TransformByObject[obj]; !ok {
options.TransformByObject[obj] = transform
}
}
if options.DefaultTransform == nil {
options.DefaultTransform = opts.DefaultTransform
}

return New(config, options)
}
}

func combineSelector(selectors ...ObjectSelector) ObjectSelector {
ls := make([]labels.Selector, 0, len(selectors))
fs := make([]fields.Selector, 0, len(selectors))
for _, s := range selectors {
ls = append(ls, s.Label)
fs = append(fs, s.Field)
}
return ObjectSelector{
Label: combineLabelSelectors(ls...),
Field: combineFieldSelectors(fs...),
}
}

func combineLabelSelectors(ls ...labels.Selector) labels.Selector {
allReqs := labels.Requirements{}
for _, l := range ls {
if l == nil {
continue
}
reqs, _ := l.Requirements()
allReqs = append(allReqs, reqs...)
}
return labels.NewSelector().Add(allReqs...)
}

func combineFieldSelectors(fs ...fields.Selector) fields.Selector {
nonNil := fs[:0]
for _, f := range fs {
if f == nil {
continue
}
nonNil = append(nonNil, f)
}
if len(nonNil) == 0 {
return nil
}
if len(nonNil) == 1 {
return nonNil[0]
}
return fields.AndSelectors(nonNil...)
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand Down
198 changes: 197 additions & 1 deletion pkg/cache/cache_test.go
Expand Up @@ -26,7 +26,6 @@ import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
Expand Down Expand Up @@ -124,6 +123,203 @@ var _ = Describe("Informer Cache without DeepCopy", func() {
CacheTest(cache.New, cache.Options{UnsafeDisableDeepCopyByObject: cache.DisableDeepCopyByObject{cache.ObjectAll{}: true}})
})

var _ = Describe("ByNamespace Cache", func() {
defer GinkgoRecover()
var (
informerCache cache.Cache
informerCacheCtx context.Context
informerCacheCancel context.CancelFunc

pod1a client.Object
pod1b client.Object
pod2a client.Object
pod2b client.Object
pod2c client.Object
pod3a client.Object
pod3b client.Object
)

BeforeEach(func() {
informerCacheCtx, informerCacheCancel = context.WithCancel(context.Background())
Expect(cfg).NotTo(BeNil())
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceOne, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceTwo, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNamespace(testNamespaceThree, cl)
Expect(err).NotTo(HaveOccurred())
err = ensureNode(testNodeOne, cl)
Expect(err).NotTo(HaveOccurred())

// namespace 1 stuff
pod1a = createPod("pod-1a", testNamespaceOne, corev1.RestartPolicyNever) // matches (everything matches)
pod1b = createPodWithLabels("pod-1b", testNamespaceOne, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // matches (everything matches)

// namespace 2 stuff
pod2a = createPodWithLabels("pod-2a", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"ns2-match": "false"}) // no match (does not match ns2 label selector)
pod2b = createPodWithLabels("pod-2b", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"ns2-match": "true"}) // matches (matches ns2 label selector)
pod2c = createPodWithLabels("pod-2c", testNamespaceTwo, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // no match (does not match ns2 label selector)

// namespace 3 stuff
pod3a = createPodWithLabels("pod-3a", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"other-match": "false"}) // no match (does not match default cache label selector)
pod3b = createPodWithLabels("pod-3b", testNamespaceThree, corev1.RestartPolicyNever, map[string]string{"other-match": "true"}) // matches (matches default cache label selector)

By("creating the informer cache")
informerCache, err = cache.BuilderByNamespace(cache.ByNamespaceOptions{
NewNamespaceCaches: map[string]cache.NewCacheFunc{
// Everything in ns1
testNamespaceOne: cache.New,

// Only things in ns2 with label "ns2-match"="true"
testNamespaceTwo: cache.BuilderWithOptions(cache.Options{
DefaultSelector: cache.ObjectSelector{
Label: labels.Set{"ns2-match": "true"}.AsSelector(),
},
}),
},
// For all other namespaces, match "other-match"="true"
NewDefaultNamespaceCache: cache.BuilderWithOptions(cache.Options{
DefaultSelector: cache.ObjectSelector{
Label: labels.Set{"other-match": "true"}.AsSelector(),
},
}),
// For cluster-scoped objects, only match metadata.name = "test-node-1"
NewClusterCache: cache.BuilderWithOptions(cache.Options{
DefaultSelector: cache.ObjectSelector{
Field: fields.OneTermEqualSelector("metadata.name", testNodeOne),
},
}),
})(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())
By("running the cache and waiting for it to sync")
// pass as an arg so that we don't race between close and re-assign
go func(ctx context.Context) {
defer GinkgoRecover()
Expect(informerCache.Start(ctx)).To(Succeed())
}(informerCacheCtx)
Expect(informerCache.WaitForCacheSync(informerCacheCtx)).To(BeTrue())
})
Describe("Get", func() {
It("should get an item from a namespace cache", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod1a), pod)
Expect(err).NotTo(HaveOccurred())
})
It("should get an item from the default namespace cache", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod3b), pod)
Expect(err).NotTo(HaveOccurred())
})
It("should get a cluster-scoped item", func() {
node := &corev1.Node{}
err := informerCache.Get(informerCacheCtx, client.ObjectKey{Name: testNodeOne}, node)
Expect(err).NotTo(HaveOccurred())
})
It("should not find an item from a namespace-specific cache if it is not matched", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod2a), pod)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
It("should not find an item from the default namespace cache if it is not matched", func() {
pod := &corev1.Pod{}
err := informerCache.Get(informerCacheCtx, client.ObjectKeyFromObject(pod3a), pod)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
It("should not find an item at the cluster-scope if it is not matched", func() {
ns := &corev1.Namespace{}
err := informerCache.Get(informerCacheCtx, client.ObjectKey{Name: testNamespaceOne}, ns)
Expect(apierrors.IsNotFound(err)).To(BeTrue())
})
})
Describe("List", func() {
When("Request is cluster-scoped", func() {
It("Should list all pods and find exactly four", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods)
Expect(err).NotTo(HaveOccurred())
sort.Slice(pods.Items, func(i, j int) bool {
if pods.Items[i].Namespace != pods.Items[j].Namespace {
return pods.Items[i].Namespace < pods.Items[j].Namespace
}
return pods.Items[i].Name < pods.Items[j].Name
})

Expect(pods.Items).To(HaveLen(4))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[0].Name).To(Equal("pod-1a"))
Expect(pods.Items[1].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[1].Name).To(Equal("pod-1b"))
Expect(pods.Items[2].Namespace).To(Equal(testNamespaceTwo))
Expect(pods.Items[2].Name).To(Equal("pod-2b"))
Expect(pods.Items[3].Namespace).To(Equal(testNamespaceThree))
Expect(pods.Items[3].Name).To(Equal("pod-3b"))
})
It("Should list nodes and find exactly one", func() {
var nodes corev1.NodeList
err := informerCache.List(informerCacheCtx, &nodes)
Expect(err).NotTo(HaveOccurred())
Expect(nodes.Items).To(HaveLen(1))
Expect(nodes.Items[0].Namespace).To(Equal(""))
Expect(nodes.Items[0].Name).To(Equal(testNodeOne))
})
It("Should list namespaces and find none", func() {
var namespaces corev1.NamespaceList
err := informerCache.List(informerCacheCtx, &namespaces)
Expect(err).NotTo(HaveOccurred())
Expect(namespaces.Items).To(HaveLen(0))
})
})
When("Request is namespace-scoped", func() {
It("Should list pods in namespace one", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceOne))
Expect(err).NotTo(HaveOccurred())
sort.Slice(pods.Items, func(i, j int) bool {
if pods.Items[i].Namespace != pods.Items[j].Namespace {
return pods.Items[i].Namespace < pods.Items[j].Namespace
}
return pods.Items[i].Name < pods.Items[j].Name
})

Expect(pods.Items).To(HaveLen(2))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[0].Name).To(Equal("pod-1a"))
Expect(pods.Items[1].Namespace).To(Equal(testNamespaceOne))
Expect(pods.Items[1].Name).To(Equal("pod-1b"))
})
It("Should list pods in namespace two", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceTwo))
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceTwo))
Expect(pods.Items[0].Name).To(Equal("pod-2b"))
})
It("Should list pods in namespace three", func() {
var pods corev1.PodList
err := informerCache.List(informerCacheCtx, &pods, client.InNamespace(testNamespaceThree))
Expect(err).NotTo(HaveOccurred())
Expect(pods.Items).To(HaveLen(1))
Expect(pods.Items[0].Namespace).To(Equal(testNamespaceThree))
Expect(pods.Items[0].Name).To(Equal("pod-3b"))
})
})
})

AfterEach(func() {
deletePod(pod1a)
deletePod(pod1b)
deletePod(pod2a)
deletePod(pod2b)
deletePod(pod2c)
deletePod(pod3a)
deletePod(pod3b)
informerCacheCancel()
})
})

var _ = Describe("Cache with transformers", func() {
var (
informerCache cache.Cache
Expand Down

0 comments on commit f831e3d

Please sign in to comment.