Skip to content

Commit

Permalink
Merge pull request #1435 from qinqon/add-cache-filter-option-labels-a…
Browse files Browse the repository at this point in the history
…nd-fields

✨ Add SelectorsByObject option to cache
  • Loading branch information
k8s-ci-robot committed Apr 27, 2021
2 parents b2c90ab + cd065bf commit 4477c71
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 19 deletions.
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -24,6 +24,8 @@
SHELL:=/usr/bin/env bash
.DEFAULT_GOAL:=help

export WHAT ?= ./...

# Use GOPROXY environment variable if set
GOPROXY := $(shell go env GOPROXY)
ifeq ($(GOPROXY),)
Expand Down
2 changes: 1 addition & 1 deletion hack/test-all.sh
Expand Up @@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh

header_text "running go test"

go test -race ${MOD_OPT} ./...
go test -race ${MOD_OPT} ${WHAT}

if [[ -n ${ARTIFACTS:-} ]]; then
if grep -Rin '<failure type="Failure">' ${ARTIFACTS}/*; then exit 1; fi
Expand Down
52 changes: 51 additions & 1 deletion pkg/cache/cache.go
Expand Up @@ -86,6 +86,9 @@ type Informer interface {
HasSynced() bool
}

// SelectorsByObject associate a client.Object's GVK to a field/label selector
type SelectorsByObject map[client.Object]internal.Selector

// Options are the optional arguments for creating a new InformersMap object
type Options struct {
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
Expand All @@ -103,6 +106,13 @@ type Options struct {
// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string

// SelectorsByObject restricts the cache's ListWatch to the desired
// fields per GVK at the specified object, the map's value must implement
// Selector [1] using for example a Set [2]
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
SelectorsByObject SelectorsByObject
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK)
return &informerCache{InformersMap: im}, nil
}

// BuilderWithOptions returns a Cache constructor that will build the a cache
// honoring the options argument, this is useful to specify options like
// SelectorsByObject
// WARNING: if SelectorsByObject is specified. filtered out resources are not
// returned.
func BuilderWithOptions(options Options) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
if opts.Scheme == nil {
opts.Scheme = options.Scheme
}
if opts.Mapper == nil {
opts.Mapper = options.Mapper
}
if opts.Resync == nil {
opts.Resync = options.Resync
}
if opts.Namespace == "" {
opts.Namespace = options.Namespace
}
opts.SelectorsByObject = options.SelectorsByObject
return New(config, opts)
}
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand All @@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
}
return opts, nil
}

func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
selectorsByGVK := internal.SelectorsByGVK{}
for object, selector := range selectorsByObject {
gvk, err := apiutil.GVKForObject(object, scheme)
if err != nil {
return nil, err
}
selectorsByGVK[gvk] = selector
}
return selectorsByGVK, nil
}
131 changes: 124 additions & 7 deletions pkg/cache/cache_test.go
Expand Up @@ -21,11 +21,15 @@ import (
"fmt"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

kcorev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3"

// TODO(community): Pull these helper functions into testenv.
// Restart policy is included to allow indexing on that field.
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object {
three := int64(3)
if labels == nil {
labels = map[string]string{}
}
labels["test-label"] = name
pod := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"test-label": name,
},
Labels: labels,
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}},
Expand All @@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie
return pod
}

func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
return createPodWithLabels(name, namespace, restartPolicy, nil)
}

func deletePod(pod client.Object) {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
// Includes restart policy since these objects are indexed on this field.
knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever)
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure)
knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
Expand Down Expand Up @@ -284,6 +294,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(err).To(HaveOccurred())
Expect(errors.IsTimeout(err)).To(BeTrue())
})

})
Context("with unstructured objects", func() {
It("should be able to list objects that haven't been watched previously", func() {
Expand Down Expand Up @@ -709,6 +720,113 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Expect(err).To(HaveOccurred())
})
})
type selectorsTestCase struct {
fieldSelectors map[string]string
labelSelectors map[string]string
expectedPods []string
}
DescribeTable(" and cache with selectors", func(tc selectorsTestCase) {
By("creating the cache")
builder := cache.BuilderWithOptions(
cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&kcorev1.Pod{}: {
Label: labels.Set(tc.labelSelectors).AsSelector(),
Field: fields.Set(tc.fieldSelectors).AsSelector(),
},
},
},
)
informer, err := builder(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(informer.Start(informerCacheCtx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

By("Checking with structured")
obtainedStructuredPodList := kcorev1.PodList{}
Expect(informer.List(context.Background(), &obtainedStructuredPodList)).To(Succeed())
Expect(obtainedStructuredPodList.Items).Should(WithTransform(func(pods []kcorev1.Pod) []string {
obtainedPodNames := []string{}
for _, pod := range pods {
obtainedPodNames = append(obtainedPodNames, pod.Name)
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))

By("Checking with unstructured")
obtainedUnstructuredPodList := unstructured.UnstructuredList{}
obtainedUnstructuredPodList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
err = informer.List(context.Background(), &obtainedUnstructuredPodList)
Expect(err).To(Succeed())
Expect(obtainedUnstructuredPodList.Items).Should(WithTransform(func(pods []unstructured.Unstructured) []string {
obtainedPodNames := []string{}
for _, pod := range pods {
obtainedPodNames = append(obtainedPodNames, pod.GetName())
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))

By("Checking with metadata")
obtainedMetadataPodList := kmetav1.PartialObjectMetadataList{}
obtainedMetadataPodList.SetGroupVersionKind(schema.GroupVersionKind{
Group: "",
Version: "v1",
Kind: "PodList",
})
err = informer.List(context.Background(), &obtainedMetadataPodList)
Expect(err).To(Succeed())
Expect(obtainedMetadataPodList.Items).Should(WithTransform(func(pods []kmetav1.PartialObjectMetadata) []string {
obtainedPodNames := []string{}
for _, pod := range pods {
obtainedPodNames = append(obtainedPodNames, pod.Name)
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))
},
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
fieldSelectors: map[string]string{},
labelSelectors: map[string]string{},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
}),
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
expectedPods: []string{"test-pod-2"},
}),
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-2", "test-pod-3"},
}),
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
labelSelectors: map[string]string{"test-label": "test-pod-4"},
expectedPods: []string{"test-pod-4"},
}),
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
expectedPods: []string{"test-pod-3", "test-pod-4"},
}),
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-3"},
}),
Entry("when label does not match it does not has to inform", selectorsTestCase{
labelSelectors: map[string]string{"new-label": "new"},
expectedPods: []string{},
}),
Entry("when field does not match it does not has to inform", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": "new"},
expectedPods: []string{},
}),
)
})
Describe("as an Informer", func() {
Context("with structured objects", func() {
Expand Down Expand Up @@ -789,7 +907,6 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
close(done)
})

It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
25 changes: 15 additions & 10 deletions pkg/cache/internal/deleg_map.go
Expand Up @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
namespace string,
selectors SelectorsByGVK,
) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),

Scheme: scheme,
}
Expand Down Expand Up @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
}

0 comments on commit 4477c71

Please sign in to comment.