From cd065bf2f63c3057db307990c21316483fa60ce8 Mon Sep 17 00:00:00 2001 From: Quique Llorente Date: Tue, 2 Mar 2021 14:07:25 +0100 Subject: [PATCH] Add SelectorsByObject option to cache Controller-Runtime controllers use a cache to subscribe to events from Kubernetes objects and to read those objects more efficiently by avoiding to call out to the API. This cache is backed by Kubernetes informers. The only way to filter this cache is by namespace and resource type. In cases where a controller is only interested in a small subset of objects (for example all pods on a node), this might end up not being efficient enough. This change increase the "pkg/cache" interface adding the "BuildWithOptins" function and the "Options.SelectorsByObject" option. The SelectorsByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object, the map's value must implement Selector [1] using for example a Set [2] This is the implementation of the design document [3] [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set [3] https://github.com/kubernetes-sigs/controller-runtime/blob/master/designs/use-selectors-at-cache.md Signed-off-by: Quique Llorente --- Makefile | 2 + hack/test-all.sh | 2 +- pkg/cache/cache.go | 52 ++++++++++- pkg/cache/cache_test.go | 131 ++++++++++++++++++++++++++-- pkg/cache/internal/deleg_map.go | 25 +++--- pkg/cache/internal/informers_map.go | 12 +++ pkg/cache/internal/selector.go | 27 ++++++ 7 files changed, 232 insertions(+), 19 deletions(-) create mode 100644 pkg/cache/internal/selector.go diff --git a/Makefile b/Makefile index 139c6b177b..9aac36dc0b 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,8 @@ SHELL:=/usr/bin/env bash .DEFAULT_GOAL:=help +export WHAT ?= ./... + # Use GOPROXY environment variable if set GOPROXY := $(shell go env GOPROXY) ifeq ($(GOPROXY),) diff --git a/hack/test-all.sh b/hack/test-all.sh index 42d64f006f..4abe8fced8 100755 --- a/hack/test-all.sh +++ b/hack/test-all.sh @@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh header_text "running go test" -go test -race ${MOD_OPT} ./... +go test -race ${MOD_OPT} ${WHAT} if [[ -n ${ARTIFACTS:-} ]]; then if grep -Rin '' ${ARTIFACTS}/*; then exit 1; fi diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..dee523fe23 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -86,6 +86,9 @@ type Informer interface { HasSynced() bool } +// SelectorsByObject associate a client.Object's GVK to a field/label selector +type SelectorsByObject map[client.Object]internal.Selector + // Options are the optional arguments for creating a new InformersMap object type Options struct { // Scheme is the scheme to use for mapping objects to GroupVersionKinds @@ -103,6 +106,13 @@ type Options struct { // Namespace restricts the cache's ListWatch to the desired namespace // Default watches all namespaces Namespace string + + // SelectorsByObject restricts the cache's ListWatch to the desired + // fields per GVK at the specified object, the map's value must implement + // Selector [1] using for example a Set [2] + // [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector + // [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set + SelectorsByObject SelectorsByObject } var defaultResyncTime = 10 * time.Hour @@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) + selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme) + if err != nil { + return nil, err + } + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK) return &informerCache{InformersMap: im}, nil } +// BuilderWithOptions returns a Cache constructor that will build the a cache +// honoring the options argument, this is useful to specify options like +// SelectorsByObject +// WARNING: if SelectorsByObject is specified. filtered out resources are not +// returned. +func BuilderWithOptions(options Options) NewCacheFunc { + return func(config *rest.Config, opts Options) (Cache, error) { + if opts.Scheme == nil { + opts.Scheme = options.Scheme + } + if opts.Mapper == nil { + opts.Mapper = options.Mapper + } + if opts.Resync == nil { + opts.Resync = options.Resync + } + if opts.Namespace == "" { + opts.Namespace = options.Namespace + } + opts.SelectorsByObject = options.SelectorsByObject + return New(config, opts) + } +} + func defaultOpts(config *rest.Config, opts Options) (Options, error) { // Use the default Kubernetes Scheme if unset if opts.Scheme == nil { @@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { } return opts, nil } + +func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) { + selectorsByGVK := internal.SelectorsByGVK{} + for object, selector := range selectorsByObject { + gvk, err := apiutil.GVKForObject(object, scheme) + if err != nil { + return nil, err + } + selectorsByGVK[gvk] = selector + } + return selectorsByGVK, nil +} diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index d23db63f75..4c866fa822 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -21,11 +21,15 @@ import ( "fmt" . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" + kcorev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3" // TODO(community): Pull these helper functions into testenv. // Restart policy is included to allow indexing on that field. -func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object { +func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object { three := int64(3) + if labels == nil { + labels = map[string]string{} + } + labels["test-label"] = name pod := &kcorev1.Pod{ ObjectMeta: kmetav1.ObjectMeta{ Name: name, Namespace: namespace, - Labels: map[string]string{ - "test-label": name, - }, + Labels: labels, }, Spec: kcorev1.PodSpec{ Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}}, @@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie return pod } +func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object { + return createPodWithLabels(name, namespace, restartPolicy, nil) +} + func deletePod(pod client.Object) { cl, err := client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) @@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca // Includes restart policy since these objects are indexed on this field. knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever) knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways) - knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure) - knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever) + knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"}) + knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"}) podGVK := schema.GroupVersionKind{ Kind: "Pod", Version: "v1", @@ -284,6 +294,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Expect(err).To(HaveOccurred()) Expect(errors.IsTimeout(err)).To(BeTrue()) }) + }) Context("with unstructured objects", func() { It("should be able to list objects that haven't been watched previously", func() { @@ -709,6 +720,113 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Expect(err).To(HaveOccurred()) }) }) + type selectorsTestCase struct { + fieldSelectors map[string]string + labelSelectors map[string]string + expectedPods []string + } + DescribeTable(" and cache with selectors", func(tc selectorsTestCase) { + By("creating the cache") + builder := cache.BuilderWithOptions( + cache.Options{ + SelectorsByObject: cache.SelectorsByObject{ + &kcorev1.Pod{}: { + Label: labels.Set(tc.labelSelectors).AsSelector(), + Field: fields.Set(tc.fieldSelectors).AsSelector(), + }, + }, + }, + ) + informer, err := builder(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(informerCacheCtx)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse()) + + By("Checking with structured") + obtainedStructuredPodList := kcorev1.PodList{} + Expect(informer.List(context.Background(), &obtainedStructuredPodList)).To(Succeed()) + Expect(obtainedStructuredPodList.Items).Should(WithTransform(func(pods []kcorev1.Pod) []string { + obtainedPodNames := []string{} + for _, pod := range pods { + obtainedPodNames = append(obtainedPodNames, pod.Name) + } + return obtainedPodNames + }, ConsistOf(tc.expectedPods))) + + By("Checking with unstructured") + obtainedUnstructuredPodList := unstructured.UnstructuredList{} + obtainedUnstructuredPodList.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err = informer.List(context.Background(), &obtainedUnstructuredPodList) + Expect(err).To(Succeed()) + Expect(obtainedUnstructuredPodList.Items).Should(WithTransform(func(pods []unstructured.Unstructured) []string { + obtainedPodNames := []string{} + for _, pod := range pods { + obtainedPodNames = append(obtainedPodNames, pod.GetName()) + } + return obtainedPodNames + }, ConsistOf(tc.expectedPods))) + + By("Checking with metadata") + obtainedMetadataPodList := kmetav1.PartialObjectMetadataList{} + obtainedMetadataPodList.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "", + Version: "v1", + Kind: "PodList", + }) + err = informer.List(context.Background(), &obtainedMetadataPodList) + Expect(err).To(Succeed()) + Expect(obtainedMetadataPodList.Items).Should(WithTransform(func(pods []kmetav1.PartialObjectMetadata) []string { + obtainedPodNames := []string{} + for _, pod := range pods { + obtainedPodNames = append(obtainedPodNames, pod.Name) + } + return obtainedPodNames + }, ConsistOf(tc.expectedPods))) + }, + Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{ + fieldSelectors: map[string]string{}, + labelSelectors: map[string]string{}, + expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"}, + }), + Entry("when field matches one pod it has to inform about it", selectorsTestCase{ + fieldSelectors: map[string]string{"metadata.name": "test-pod-2"}, + expectedPods: []string{"test-pod-2"}, + }), + Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{ + fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo}, + expectedPods: []string{"test-pod-2", "test-pod-3"}, + }), + Entry("when label matches one pod it has to inform about it", selectorsTestCase{ + labelSelectors: map[string]string{"test-label": "test-pod-4"}, + expectedPods: []string{"test-pod-4"}, + }), + Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{ + labelSelectors: map[string]string{"common-label": "common"}, + expectedPods: []string{"test-pod-3", "test-pod-4"}, + }), + Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{ + labelSelectors: map[string]string{"common-label": "common"}, + fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo}, + expectedPods: []string{"test-pod-3"}, + }), + Entry("when label does not match it does not has to inform", selectorsTestCase{ + labelSelectors: map[string]string{"new-label": "new"}, + expectedPods: []string{}, + }), + Entry("when field does not match it does not has to inform", selectorsTestCase{ + fieldSelectors: map[string]string{"metadata.namespace": "new"}, + expectedPods: []string{}, + }), + ) }) Describe("as an Informer", func() { Context("with structured objects", func() { @@ -789,7 +907,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Eventually(out).Should(Receive(Equal(pod))) close(done) }) - It("should be able to index an object field then retrieve objects by that field", func() { By("creating the cache") informer, err := cache.New(cfg, cache.Options{}) diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 02bb1919f7..2242d9b674 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string) *InformersMap { + namespace string, + selectors SelectorsByGVK, +) *InformersMap { return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), - metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace), + structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors), + unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors), + metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors), Scheme: scheme, } @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj } // newStructuredInformersMap creates a new InformersMap for structured objects. -func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) +func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGVK) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. -func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch) +func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGVK) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch) } // newMetadataInformersMap creates a new InformersMap for metadata-only objects. -func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch) +func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGVK) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch) } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 6b57c6fa61..bac342b54d 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -48,6 +48,7 @@ func newSpecificInformersMap(config *rest.Config, mapper meta.RESTMapper, resync time.Duration, namespace string, + selectors SelectorsByGVK, createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, @@ -60,6 +61,7 @@ func newSpecificInformersMap(config *rest.Config, startWait: make(chan struct{}), createListWatcher: createListWatcher, namespace: namespace, + selectors: selectors, } return ip } @@ -120,6 +122,10 @@ type specificInformersMap struct { // namespace is the namespace that all ListWatches are restricted to // default or empty string means all namespaces namespace string + + // selectors are the label or field selectors that will be added to the + // ListWatch ListOptions. + selectors SelectorsByGVK } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -256,6 +262,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors[gvk].ApplyToList(&opts) res := listObj.DeepCopyObject() isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) @@ -263,6 +270,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot @@ -289,6 +297,7 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors[gvk].ApplyToList(&opts) if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts) } @@ -296,6 +305,7 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { @@ -327,6 +337,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM // create the relevant listwatch return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors[gvk].ApplyToList(&opts) if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts) } @@ -334,6 +345,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { diff --git a/pkg/cache/internal/selector.go b/pkg/cache/internal/selector.go new file mode 100644 index 0000000000..03eda629a2 --- /dev/null +++ b/pkg/cache/internal/selector.go @@ -0,0 +1,27 @@ +package internal + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// SelectorsByGVK associate a GroupVersionKind to a field/label selector +type SelectorsByGVK map[schema.GroupVersionKind]Selector + +// Selector specify the label/field selector to fill in ListOptions +type Selector struct { + Label labels.Selector + Field fields.Selector +} + +// ApplyToList fill in ListOptions LabelSelector and FieldSelector if needed +func (s Selector) ApplyToList(listOpts *metav1.ListOptions) { + if s.Label != nil { + listOpts.LabelSelector = s.Label.String() + } + if s.Field != nil { + listOpts.FieldSelector = s.Field.String() + } +}