Skip to content

Commit

Permalink
Add SelectorsByObject option to cache
Browse files Browse the repository at this point in the history
Controller-Runtime controllers use a cache to subscribe to events from
Kubernetes objects and to read those objects more efficiently by avoiding
to call out to the API. This cache is backed by Kubernetes informers.

The only way to filter this cache is by namespace and resource type.
In cases where a controller is only interested in a small subset of objects
(for example all pods on a node), this might end up not being efficient enough.

This change increase the "pkg/cache" interface adding the
"BuildWithOptins" function and the "Options.SelectorsByObject" option.

The SelectorsByObject restricts the cache's ListWatch to the desired
fields per GVK at the specified object, the map's value must implement
Selector [1] using for example a Set [2]

This is the implementation of the design document [3]

[1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
[2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
[3] https://github.com/kubernetes-sigs/controller-runtime/blob/master/designs/use-selectors-at-cache.md

Signed-off-by: Quique Llorente <ellorent@redhat.com>
  • Loading branch information
qinqon committed Apr 15, 2021
1 parent 7ef2da0 commit 6d1af5b
Show file tree
Hide file tree
Showing 7 changed files with 213 additions and 18 deletions.
2 changes: 2 additions & 0 deletions Makefile
Expand Up @@ -24,6 +24,8 @@
SHELL:=/usr/bin/env bash
.DEFAULT_GOAL:=help

export WHAT ?= ./...

# Use GOPROXY environment variable if set
GOPROXY := $(shell go env GOPROXY)
ifeq ($(GOPROXY),)
Expand Down
2 changes: 1 addition & 1 deletion hack/test-all.sh
Expand Up @@ -20,7 +20,7 @@ source $(dirname ${BASH_SOURCE})/common.sh

header_text "running go test"

go test -race ${MOD_OPT} ./...
go test -race ${MOD_OPT} ${WHAT}

if [[ -n ${ARTIFACTS:-} ]]; then
if grep -Rin '<failure type="Failure">' ${ARTIFACTS}/*; then exit 1; fi
Expand Down
52 changes: 51 additions & 1 deletion pkg/cache/cache.go
Expand Up @@ -86,6 +86,9 @@ type Informer interface {
HasSynced() bool
}

// SelectorsByObject associate a client.Object's GVK to a field/label selector
type SelectorsByObject map[client.Object]internal.Selector

// Options are the optional arguments for creating a new InformersMap object
type Options struct {
// Scheme is the scheme to use for mapping objects to GroupVersionKinds
Expand All @@ -103,6 +106,13 @@ type Options struct {
// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string

// SelectorsByObject restricts the cache's ListWatch to the desired
// fields per GVK at the specified object, the map's value must implement
// Selector [1] using for example a Set [2]
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
SelectorsByObject SelectorsByObject
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -113,10 +123,38 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
selectorsByGVK, err := convertToSelectorsByGVK(opts.SelectorsByObject, opts.Scheme)
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK)
return &informerCache{InformersMap: im}, nil
}

// BuilderWithOptions returns a Cache constructor that will build the a cache
// honoring the options argument, this is useful to specify options like
// SelectorsByObject
// WARNING: if SelectorsByObject is specified. filtered out resources are not
// returned.
func BuilderWithOptions(options Options) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
if options.Scheme != nil {
opts.Scheme = options.Scheme
}
if options.Mapper != nil {
opts.Mapper = options.Mapper
}
if options.Resync != nil {
opts.Resync = options.Resync
}
if options.Namespace != "" {
opts.Namespace = options.Namespace
}
opts.SelectorsByObject = options.SelectorsByObject
return New(config, opts)
}
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand All @@ -139,3 +177,15 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
}
return opts, nil
}

func convertToSelectorsByGVK(selectorsByObject SelectorsByObject, scheme *runtime.Scheme) (internal.SelectorsByGVK, error) {
selectorsByGVK := internal.SelectorsByGVK{}
for object, selector := range selectorsByObject {
gvk, err := apiutil.GVKForObject(object, scheme)
if err != nil {
return nil, err
}
selectorsByGVK[gvk] = selector
}
return selectorsByGVK, nil
}
113 changes: 107 additions & 6 deletions pkg/cache/cache_test.go
Expand Up @@ -21,11 +21,15 @@ import (
"fmt"

. "github.com/onsi/ginkgo"
. "github.com/onsi/ginkgo/extensions/table"
. "github.com/onsi/gomega"

kcorev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand All @@ -42,15 +46,17 @@ const testNamespaceThree = "test-namespace-3"

// TODO(community): Pull these helper functions into testenv.
// Restart policy is included to allow indexing on that field.
func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
func createPodWithLabels(name, namespace string, restartPolicy kcorev1.RestartPolicy, labels map[string]string) client.Object {
three := int64(3)
if labels == nil {
labels = map[string]string{}
}
labels["test-label"] = name
pod := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
"test-label": name,
},
Labels: labels,
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{{Name: "nginx", Image: "nginx"}},
Expand All @@ -65,6 +71,10 @@ func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) clie
return pod
}

func createPod(name, namespace string, restartPolicy kcorev1.RestartPolicy) client.Object {
return createPodWithLabels(name, namespace, restartPolicy, nil)
}

func deletePod(pod client.Object) {
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -110,8 +120,8 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
// Includes restart policy since these objects are indexed on this field.
knownPod1 = createPod("test-pod-1", testNamespaceOne, kcorev1.RestartPolicyNever)
knownPod2 = createPod("test-pod-2", testNamespaceTwo, kcorev1.RestartPolicyAlways)
knownPod3 = createPod("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure)
knownPod4 = createPod("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever)
knownPod3 = createPodWithLabels("test-pod-3", testNamespaceTwo, kcorev1.RestartPolicyOnFailure, map[string]string{"common-label": "common"})
knownPod4 = createPodWithLabels("test-pod-4", testNamespaceThree, kcorev1.RestartPolicyNever, map[string]string{"common-label": "common"})
podGVK := schema.GroupVersionKind{
Kind: "Pod",
Version: "v1",
Expand Down Expand Up @@ -789,7 +799,98 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
close(done)
})
type selectorsTestCase struct {
fieldSelectors map[string]string
labelSelectors map[string]string
expectedPods []string
}
DescribeTable("filter informers at list watch with selectors", func(tc selectorsTestCase) {
By("creating the cache")
builder := cache.BuilderWithOptions(
cache.Options{
SelectorsByObject: cache.SelectorsByObject{
&kcorev1.Pod{}: {
Field: fields.Set(tc.fieldSelectors).AsSelector(),
Label: labels.Set(tc.labelSelectors).AsSelector(),
},
},
},
)
informer, err := builder(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(informer.Start(informerCacheCtx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
sii, err := informer.GetInformerForKind(context.TODO(), gvk)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("adding an event handler listening for object creation which sends the object to a channel")
out := make(chan interface{})
addFunc := func(obj interface{}) {
By(fmt.Sprintf("%+v", obj))
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("verifying the filter out object is not received on the channel")
obtainedObjs := []interface{}{}
for range tc.expectedPods {
var obtainedObj interface{}
Eventually(out).Should(Receive(&obtainedObj), "should receive something")
obtainedObjs = append(obtainedObjs, obtainedObj)
}
Consistently(out).ShouldNot(Receive(), "should not receive anything else")
Expect(obtainedObjs).Should(WithTransform(func(objs []interface{}) []string {
obtainedPodNames := []string{}
for _, obj := range objs {
obtainedPodNames = append(obtainedPodNames, obj.(client.Object).GetName())
}
return obtainedPodNames
}, ConsistOf(tc.expectedPods)))
},
Entry("when selectors are empty it has to inform about all the pods", selectorsTestCase{
fieldSelectors: map[string]string{},
labelSelectors: map[string]string{},
expectedPods: []string{"test-pod-1", "test-pod-2", "test-pod-3", "test-pod-4"},
}),
Entry("when field matches one pod it has to inform about it", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.name": "test-pod-2"},
expectedPods: []string{"test-pod-2"},
}),
Entry("when field matches multiple pods it has to infor about all of them", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-2", "test-pod-3"},
}),
Entry("when label matches one pod it has to inform about it", selectorsTestCase{
labelSelectors: map[string]string{"test-label": "test-pod-4"},
expectedPods: []string{"test-pod-4"},
}),
Entry("when label matches multiple pods it has to infor about all of them", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
expectedPods: []string{"test-pod-3", "test-pod-4"},
}),
Entry("when label and field matches one pod it has to infor about about it", selectorsTestCase{
labelSelectors: map[string]string{"common-label": "common"},
fieldSelectors: map[string]string{"metadata.namespace": testNamespaceTwo},
expectedPods: []string{"test-pod-3"},
}),
Entry("when label does not match it does not has to inform", selectorsTestCase{
labelSelectors: map[string]string{"new-label": "new"},
expectedPods: []string{},
}),
Entry("when field does not match it does not has to inform", selectorsTestCase{
fieldSelectors: map[string]string{"metadata.namespace": "new"},
expectedPods: []string{},
}),
)
It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
25 changes: 15 additions & 10 deletions pkg/cache/internal/deleg_map.go
Expand Up @@ -49,12 +49,14 @@ func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
namespace string,
selectors SelectorsByGVK,
) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),

Scheme: scheme,
}
Expand Down Expand Up @@ -105,16 +107,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
}
10 changes: 10 additions & 0 deletions pkg/cache/internal/informers_map.go
Expand Up @@ -48,6 +48,7 @@ func newSpecificInformersMap(config *rest.Config,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
selectors SelectorsByGVK,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Expand All @@ -60,6 +61,7 @@ func newSpecificInformersMap(config *rest.Config,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
selectors: selectors,
}
return ip
}
Expand Down Expand Up @@ -120,6 +122,8 @@ type specificInformersMap struct {
// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string

selectors SelectorsByGVK
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -256,13 +260,15 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
Expand All @@ -289,13 +295,15 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return dynamicClient.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down Expand Up @@ -327,13 +335,15 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return client.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down

0 comments on commit 6d1af5b

Please sign in to comment.