Skip to content

Commit

Permalink
Add FieldSelectorByResource option to cache
Browse files Browse the repository at this point in the history
All instance for a same resources are being cached by
controller-runtime, for some use cases this consumes a lot of memory and
CPU. This change add a option to the cache so resources can be filtered
by field.

Signed-off-by: Quique Llorente <ellorent@redhat.com>
  • Loading branch information
qinqon committed Apr 5, 2021
1 parent 10ae090 commit ff46f6f
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 21 deletions.
20 changes: 19 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
Expand Down Expand Up @@ -103,6 +104,13 @@ type Options struct {
// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string

// FieldSelectorByResource restricts the cache's ListWatch to the desired
// fields per resource, the map's value must implement Selector [1] using
// for example a Set [2]
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
FieldSelectorByResource map[schema.GroupResource]fields.Selector
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -113,10 +121,20 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, opts.FieldSelectorByResource)
return &informerCache{InformersMap: im}, nil
}

// BuilderWithFieldSelectors returns a Cache constructor that will filter
// contents using fieldSelectorByResource
// WARNING: filtered out resources are not returned.
func BuilderWithFieldSelectors(fieldSelectorByResource map[schema.GroupResource]fields.Selector) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
opts.FieldSelectorByResource = fieldSelectorByResource
return New(config, opts)
}
}

func defaultOpts(config *rest.Config, opts Options) (Options, error) {
// Use the default Kubernetes Scheme if unset
if opts.Scheme == nil {
Expand Down
73 changes: 73 additions & 0 deletions pkg/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
kmetav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
kscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
Expand Down Expand Up @@ -789,7 +790,79 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
Eventually(out).Should(Receive(Equal(pod)))
close(done)
})
It("should be able to filter informers at list watch level by field", func() {
By("creating the cache")
builder := cache.BuilderWithFieldSelectors(
map[schema.GroupResource]fields.Selector{
{Group: "", Resource: "pods"}: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}),
},
)
informer, err := builder(cfg, cache.Options{})
Expect(err).NotTo(HaveOccurred())

By("running the cache and waiting for it to sync")
go func() {
defer GinkgoRecover()
Expect(informer.Start(informerCacheCtx)).To(Succeed())
}()
Expect(informer.WaitForCacheSync(informerCacheCtx)).NotTo(BeFalse())

gvk := schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Pod"}
sii, err := informer.GetInformerForKind(context.TODO(), gvk)
Expect(err).NotTo(HaveOccurred())
Expect(sii).NotTo(BeNil())
Expect(sii.HasSynced()).To(BeTrue())

By("adding an event handler listening for object creation which sends the object to a channel")
out := make(chan interface{})
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding a pair of objects")
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
podFoo := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: "foo",
Namespace: "default",
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
Expect(cl.Create(context.Background(), podFoo)).To(Succeed())
defer deletePod(podFoo)

podBar := &kcorev1.Pod{
ObjectMeta: kmetav1.ObjectMeta{
Name: "bar",
Namespace: "default",
},
Spec: kcorev1.PodSpec{
Containers: []kcorev1.Container{
{
Name: "nginx",
Image: "nginx",
},
},
},
}
Expect(cl.Create(context.Background(), podBar)).To(Succeed())
defer deletePod(podBar)

By("verifying the filter out object is not received on the channel")
var obtainedObj interface{}
Expect(out).Should(Receive(&obtainedObj), "should receive something")
Expect(obtainedObj).Should(Equal(podFoo), "should receive the pod 'foo'")
Consistently(out).ShouldNot(Receive(), "should not receive anything else")
})
It("should be able to index an object field then retrieve objects by that field", func() {
By("creating the cache")
informer, err := cache.New(cfg, cache.Options{})
Expand Down
26 changes: 16 additions & 10 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
Expand All @@ -49,12 +50,14 @@ func NewInformersMap(config *rest.Config,
scheme *runtime.Scheme,
mapper meta.RESTMapper,
resync time.Duration,
namespace string) *InformersMap {
namespace string,
fieldSelectorByResource map[schema.GroupResource]fields.Selector,
) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource),

Scheme: scheme,
}
Expand Down Expand Up @@ -105,16 +108,19 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj
}

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createStructuredListWatch)
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, fieldSelectorByResource map[schema.GroupResource]fields.Selector) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createUnstructuredListWatch)
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, fieldSelectorByResource map[schema.GroupResource]fields.Selector) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, namespace string) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, createMetadataListWatch)
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, fieldSelectorByResource map[schema.GroupResource]fields.Selector) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource, createMetadataListWatch)
}
40 changes: 30 additions & 10 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand All @@ -48,18 +49,20 @@ func newSpecificInformersMap(config *rest.Config,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
fieldSelectorByResource map[schema.GroupResource]fields.Selector,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
fieldSelectorByResource: fieldSelectorByResource,
}
return ip
}
Expand Down Expand Up @@ -120,6 +123,8 @@ type specificInformersMap struct {
// namespace is the namespace that all ListWatches are restricted to
// default or empty string means all namespaces
namespace string

fieldSelectorByResource map[schema.GroupResource]fields.Selector
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -231,6 +236,15 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
return i, ip.started, nil
}

func (ip *specificInformersMap) findFieldSelectorByGVR(gvr schema.GroupVersionResource) string {
gr := schema.GroupResource{Group: gvr.Group, Resource: gvr.Resource}
selctr := ip.fieldSelectorByResource[gr]
if selctr == nil {
return ""
}
return selctr.String()
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
Expand All @@ -256,13 +270,15 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
Expand All @@ -289,13 +305,15 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return dynamicClient.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down Expand Up @@ -327,13 +345,15 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return client.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down

0 comments on commit ff46f6f

Please sign in to comment.