Skip to content

Commit

Permalink
Add cache ListWatch field label selectors at opts
Browse files Browse the repository at this point in the history
All instance for a same resources are being cached by
controller-runtime, for some use cases this consumes a lot of memory and
CPU. This change add a option to the cache so resources can be selected
by field and label.

Signed-off-by: Quique Llorente <ellorent@redhat.com>
  • Loading branch information
qinqon committed Apr 5, 2021
1 parent ff46f6f commit 551ced4
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 49 deletions.
12 changes: 6 additions & 6 deletions pkg/cache/cache.go
Expand Up @@ -22,13 +22,13 @@ import (
"time"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
toolscache "k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
"sigs.k8s.io/controller-runtime/pkg/cache/selector"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
logf "sigs.k8s.io/controller-runtime/pkg/internal/log"
Expand Down Expand Up @@ -110,7 +110,7 @@ type Options struct {
// for example a Set [2]
// [1] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Selector
// [2] https://pkg.go.dev/k8s.io/apimachinery/pkg/fields#Set
FieldSelectorByResource map[schema.GroupResource]fields.Selector
SelectorByResource selector.SelectorsByGroupResource
}

var defaultResyncTime = 10 * time.Hour
Expand All @@ -121,16 +121,16 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, opts.FieldSelectorByResource)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, opts.SelectorByResource)
return &informerCache{InformersMap: im}, nil
}

// BuilderWithFieldSelectors returns a Cache constructor that will filter
// BuilderWithSelectors returns a Cache constructor that will filter
// contents using fieldSelectorByResource
// WARNING: filtered out resources are not returned.
func BuilderWithFieldSelectors(fieldSelectorByResource map[schema.GroupResource]fields.Selector) NewCacheFunc {
func BuilderWithSelectors(selectors selector.SelectorsByGroupResource) NewCacheFunc {
return func(config *rest.Config, opts Options) (Cache, error) {
opts.FieldSelectorByResource = fieldSelectorByResource
opts.SelectorByResource = selectors
return New(config, opts)
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/cache/cache_test.go
Expand Up @@ -33,6 +33,7 @@ import (
kcache "k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/selector"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -792,9 +793,11 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
})
It("should be able to filter informers at list watch level by field", func() {
By("creating the cache")
builder := cache.BuilderWithFieldSelectors(
map[schema.GroupResource]fields.Selector{
{Group: "", Resource: "pods"}: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}),
builder := cache.BuilderWithSelectors(
selector.SelectorsByGroupResource{
{Group: "", Resource: "pods"}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": "foo"}),
},
},
)
informer, err := builder(cfg, cache.Options{})
Expand Down
22 changes: 11 additions & 11 deletions pkg/cache/internal/deleg_map.go
Expand Up @@ -23,11 +23,11 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"sigs.k8s.io/controller-runtime/pkg/cache/selector"
)

// InformersMap create and caches Informers for (runtime.Object, schema.GroupVersionKind) pairs.
Expand All @@ -51,13 +51,13 @@ func NewInformersMap(config *rest.Config,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
fieldSelectorByResource map[schema.GroupResource]fields.Selector,
selectors selector.SelectorsByGroupResource,
) *InformersMap {

return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors),

Scheme: scheme,
}
Expand Down Expand Up @@ -109,18 +109,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, fieldSelectorByResource map[schema.GroupResource]fields.Selector) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource, createStructuredListWatch)
namespace string, selectors selector.SelectorsByGroupResource) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createStructuredListWatch)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, fieldSelectorByResource map[schema.GroupResource]fields.Selector) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource, createUnstructuredListWatch)
namespace string, selectors selector.SelectorsByGroupResource) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createUnstructuredListWatch)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, fieldSelectorByResource map[schema.GroupResource]fields.Selector) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, fieldSelectorByResource, createMetadataListWatch)
namespace string, selectors selector.SelectorsByGroupResource) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, createMetadataListWatch)
}
49 changes: 20 additions & 29 deletions pkg/cache/internal/informers_map.go
Expand Up @@ -26,7 +26,6 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
Expand All @@ -36,6 +35,7 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/cache/selector"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

Expand All @@ -49,20 +49,20 @@ func newSpecificInformersMap(config *rest.Config,
mapper meta.RESTMapper,
resync time.Duration,
namespace string,
fieldSelectorByResource map[schema.GroupResource]fields.Selector,
selectors selector.SelectorsByGroupResource,
createListWatcher createListWatcherFunc) *specificInformersMap {
ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
fieldSelectorByResource: fieldSelectorByResource,
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
selectors: selectors,
}
return ip
}
Expand Down Expand Up @@ -124,7 +124,7 @@ type specificInformersMap struct {
// default or empty string means all namespaces
namespace string

fieldSelectorByResource map[schema.GroupResource]fields.Selector
selectors selector.SelectorsByGroupResource
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -236,15 +236,6 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
return i, ip.started, nil
}

func (ip *specificInformersMap) findFieldSelectorByGVR(gvr schema.GroupVersionResource) string {
gr := schema.GroupResource{Group: gvr.Group, Resource: gvr.Resource}
selctr := ip.fieldSelectorByResource[gr]
if selctr == nil {
return ""
}
return selctr.String()
}

// newListWatch returns a new ListWatch object that can be used to create a SharedIndexInformer.
func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformersMap) (*cache.ListWatch, error) {
// Kubernetes APIs work against Resources, not GroupVersionKinds. Map the
Expand All @@ -270,15 +261,15 @@ func createStructuredListWatch(gvk schema.GroupVersionKind, ip *specificInformer
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
res := listObj.DeepCopyObject()
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
err := client.Get().NamespaceIfScoped(ip.namespace, isNamespaceScoped).Resource(mapping.Resource.Resource).VersionedParams(&opts, ip.paramCodec).Do(ctx).Into(res)
return res, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
// Watch needs to be set to true separately
opts.Watch = true
isNamespaceScoped := ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot
Expand All @@ -305,15 +296,15 @@ func createUnstructuredListWatch(gvk schema.GroupVersionKind, ip *specificInform
// Create a new ListWatch for the obj
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return dynamicClient.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return dynamicClient.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down Expand Up @@ -345,15 +336,15 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
// create the relevant listwatch
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(ip.namespace).List(ctx, opts)
}
return client.Resource(mapping.Resource).List(ctx, opts)
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
opts.FieldSelector = ip.findFieldSelectorByGVR(mapping.Resource)
ip.selectors.FindByGR(mapping.Resource).FillInListOpts(&opts)
// Watch needs to be set to true separately
opts.Watch = true
if ip.namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
Expand Down
32 changes: 32 additions & 0 deletions pkg/cache/selector/selector.go
@@ -0,0 +1,32 @@
package selector

import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
)

// SelectorsByGroupResource associate a GroupResource to a field/label selector
type SelectorsByGroupResource map[schema.GroupResource]Selector

// Selector specify the label/field selector to fill in ListOptions
type Selector struct {
Label labels.Selector
Field fields.Selector
}

// FillInListOpts fill in ListOptions LabelSelector and FieldSelector if needed
func (s Selector) FillInListOpts(listOpts *metav1.ListOptions) {
if s.Label != nil {
listOpts.LabelSelector = s.Label.String()
}
if s.Field != nil {
listOpts.FieldSelector = s.Field.String()
}
}

// FindByGR use the GVR group and resource to find the field/label selector
func (s SelectorsByGroupResource) FindByGR(gvr schema.GroupVersionResource) Selector {
return s[schema.GroupResource{Group: gvr.Group, Resource: gvr.Resource}]
}

0 comments on commit 551ced4

Please sign in to comment.