diff --git a/Makefile b/Makefile index 139c6b177b..9aac36dc0b 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,8 @@ SHELL:=/usr/bin/env bash .DEFAULT_GOAL:=help +export WHAT ?= ./... + # Use GOPROXY environment variable if set GOPROXY := $(shell go env GOPROXY) ifeq ($(GOPROXY),) diff --git a/hack/test-all.sh b/hack/test-all.sh index 42d64f006f..4abe8fced8 100755 --- a/hack/test-all.sh +++ b/hack/test-all.sh @@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh header_text "running go test" -go test -race ${MOD_OPT} ./... +go test -race ${MOD_OPT} ${WHAT} if [[ -n ${ARTIFACTS:-} ]]; then if grep -Rin '' ${ARTIFACTS}/*; then exit 1; fi diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 71dfbd0454..7e8f025e30 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -86,6 +86,9 @@ type Informer interface { HasSynced() bool } +// SelectorsByObject associate a client.Object's GVK to a field/label selector +type SelectorsByObject map[client.Object]internal.Selector + // Options are the optional arguments for creating a new InformersMap object type Options struct { // Scheme is the scheme to use for mapping objects to GroupVersionKinds @@ -103,6 +106,13 @@ type Options struct { // Namespace restricts the cache's ListWatch to the desired namespace // Default watches all namespaces Namespace string + + // SelectorsByObject restricts the cache's ListWatch to the desired + // fields per GVK at the specified object, the map's value must implement + // Selector [1] using for example a Set [2] + // [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector + // [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set + SelectorsByObject SelectorsByObject } var defaultResyncTime = 10 * time.Hour @@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace) + selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme) + if err != nil { + return nil, err + } + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK) return &informerCache{InformersMap: im}, nil } +// BuilderWithOptions returns a Cache constructor that will build the a cache +// honoring the options argument, this is useful to specify options like +// SelectorsByObject +// WARNING: if SelectorsByObject is specified. filtered out resources are not +// returned. +func BuilderWithOptions(options Options) NewCacheFunc { + return func(config *rest.Config, opts Options) (Cache, error) { + if options.Scheme != nil { + opts.Scheme = options.Scheme + } + if options.Mapper != nil { + opts.Mapper = options.Mapper + } + if options.Resync != nil { + opts.Resync = options.Resync + } + if options.Namespace != "" { + opts.Namespace = options.Namespace + } + opts.SelectorsByObject = options.SelectorsByObject + return New(config, opts) + } +} + func defaultOpts(config *rest.Config, opts Options) (Options, error) { // Use the default Kubernetes Scheme if unset if opts.Scheme == nil { @@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { } return opts, nil } + +func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) { + selectorsByGVK := internal.SelectorsByGVK{} + for object, selector := range selectorsByObject { + gvk, err := apiutil.GVKForObject(object, scheme) + if err != nil { + return nil, err + } + selectorsByGVK[gvk] = selector + } + return selectorsByGVK, nil +} diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index d23db63f75..3c513cdfa9 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -21,11 +21,15 @@ import ( "fmt" . "github.com/onsi/ginkgo" + . "github.com/onsi/ginkgo/extensions/table" . "github.com/onsi/gomega" + kcorev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" kscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" @@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3" // TODO(community): Pull these helper functions into testenv. // Restart policy is included to allow indexing on that field. -func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object { +func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object { three := int64(3) + if labels == nil { + labels = map[string]string{} + } + labels["test-label"] = name pod := &kcorev1.Pod{ ObjectMeta: kmetav1.ObjectMeta{ Name: name, Namespace: namespace, - Labels: map[string]string{ - "test-label": name, - }, + Labels: labels, }, Spec: kcorev1.PodSpec{ Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}}, @@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie return pod } +func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object { + return createPodWithLabels(name, namespace, restartPolicy, nil) +} + func deletePod(pod client.Object) { cl, err := client.New(cfg, client.Options{}) Expect(err).NotTo(HaveOccurred()) @@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca // Includes restart policy since these objects are indexed on this field. knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever) knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways) - knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure) - knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever) + knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"}) + knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"}) podGVK := schema.GroupVersionKind{ Kind: "Pod", Version: "v1", @@ -789,7 +799,98 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca Eventually(out).Should(Receive(Equal(pod))) close(done) }) + type selectorsTestCase struct { + fieldSelectors map[string]string + labelSelectors map[string]string + expectedPods []string + } + DescribeTable("filter informers at list watch with selectors", func(tc selectorsTestCase) { + By("creating the cache") + builder := cache.BuilderWithOptions( + cache.Options{ + SelectorsByObject: cache.SelectorsByObject{ + &kcorev1.Pod{}: { + Field: fields.Set(tc.fieldSelectors).AsSelector(), + Label: labels.Set(tc.labelSelectors).AsSelector(), + }, + }, + }, + ) + informer, err := builder(cfg, cache.Options{}) + Expect(err).NotTo(HaveOccurred()) + + By("running the cache and waiting for it to sync") + go func() { + defer GinkgoRecover() + Expect(informer.Start(informerCacheCtx)).To(Succeed()) + }() + Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse()) + + gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"} + sii, err := informer.GetInformerForKind(context.TODO(), gvk) + Expect(err).NotTo(HaveOccurred()) + Expect(sii).NotTo(BeNil()) + Expect(sii.HasSynced()).To(BeTrue()) + + By("adding an event handler listening for object creation which sends the object to a channel") + out := make(chan interface{}) + addFunc := func(obj interface{}) { + By(fmt.Sprintf("%+v", obj)) + out <- obj + } + sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + By("verifying the filter out object is not received on the channel") + obtainedObjs := []interface{}{} + for range tc.expectedPods { + var obtainedObj interface{} + Eventually(out).Should(Receive(&obtainedObj), "should receive something") + obtainedObjs = append(obtainedObjs, obtainedObj) + } + Consistently(out).ShouldNot(Receive(), "should not receive anything else") + Expect(obtainedObjs).Should(WithTransform(func(objs []interface{}) []string { + obtainedPodNames := []string{} + for _, obj := range objs { + obtainedPodNames = append(obtainedPodNames, obj.(client.Object).GetName()) + } + return obtainedPodNames + }, ConsistOf(tc.expectedPods))) + }, + Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{ + fieldSelectors: map[string]string{}, + labelSelectors: map[string]string{}, + expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"}, + }), + Entry("when field matches one pod it has to inform about it", selectorsTestCase{ + fieldSelectors: map[string]string{"metadata.name": "test-pod-2"}, + expectedPods: []string{"test-pod-2"}, + }), + Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{ + fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo}, + expectedPods: []string{"test-pod-2", "test-pod-3"}, + }), + Entry("when label matches one pod it has to inform about it", selectorsTestCase{ + labelSelectors: map[string]string{"test-label": "test-pod-4"}, + expectedPods: []string{"test-pod-4"}, + }), + Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{ + labelSelectors: map[string]string{"common-label": "common"}, + expectedPods: []string{"test-pod-3", "test-pod-4"}, + }), + Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{ + labelSelectors: map[string]string{"common-label": "common"}, + fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo}, + expectedPods: []string{"test-pod-3"}, + }), + Entry("when label does not match it does not has to inform", selectorsTestCase{ + labelSelectors: map[string]string{"new-label": "new"}, + expectedPods: []string{}, + }), + Entry("when field does not match it does not has to inform", selectorsTestCase{ + fieldSelectors: map[string]string{"metadata.namespace": "new"}, + expectedPods: []string{}, + }), + ) It("should be able to index an object field then retrieve objects by that field", func() { By("creating the cache") informer, err := cache.New(cfg, cache.Options{}) diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 02bb1919f7..2242d9b674 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string) *InformersMap { + namespace string, + selectors SelectorsByGVK, +) *InformersMap { return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace), - metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace), + structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors), + unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors), + metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors), Scheme: scheme, } @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj } // newStructuredInformersMap creates a new InformersMap for structured objects. -func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch) +func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGVK) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. -func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch) +func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGVK) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch) } // newMetadataInformersMap creates a new InformersMap for metadata-only objects. -func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch) +func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, + namespace string, selectors SelectorsByGVK) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch) } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 6b57c6fa61..36a07127cf 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -48,6 +48,7 @@ func newSpecificInformersMap(config *rest.Config, mapper meta.RESTMapper, resync time.Duration, namespace string, + selectors SelectorsByGVK, createListWatcher createListWatcherFunc) *specificInformersMap { ip := &specificInformersMap{ config: config, @@ -60,6 +61,7 @@ func newSpecificInformersMap(config *rest.Config, startWait: make(chan struct{}), createListWatcher: createListWatcher, namespace: namespace, + selectors: selectors, } return ip } @@ -120,6 +122,8 @@ type specificInformersMap struct { // namespace is the namespace that all ListWatches are restricted to // default or empty string means all namespaces namespace string + + selectors SelectorsByGVK } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -256,6 +260,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors[gvk].ApplyToList(&opts) res := listObj.DeepCopyObject() isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res) @@ -263,6 +268,7 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot @@ -289,6 +295,7 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform // Create a new ListWatch for the obj return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors[gvk].ApplyToList(&opts) if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts) } @@ -296,6 +303,7 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { @@ -327,6 +335,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM // create the relevant listwatch return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { + ip.selectors[gvk].ApplyToList(&opts) if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts) } @@ -334,6 +343,7 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { + ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { diff --git a/pkg/cache/internal/selector.go b/pkg/cache/internal/selector.go new file mode 100644 index 0000000000..03eda629a2 --- /dev/null +++ b/pkg/cache/internal/selector.go @@ -0,0 +1,27 @@ +package internal + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +// SelectorsByGVK associate a GroupVersionKind to a field/label selector +type SelectorsByGVK map[schema.GroupVersionKind]Selector + +// Selector specify the label/field selector to fill in ListOptions +type Selector struct { + Label labels.Selector + Field fields.Selector +} + +// ApplyToList fill in ListOptions LabelSelector and FieldSelector if needed +func (s Selector) ApplyToList(listOpts *metav1.ListOptions) { + if s.Label != nil { + listOpts.LabelSelector = s.Label.String() + } + if s.Field != nil { + listOpts.FieldSelector = s.Field.String() + } +}