Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

馃悰 pkg/client: optionally allow caching of unstructured objects #1332

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
164 changes: 108 additions & 56 deletions pkg/client/client_test.go
Expand Up @@ -22,8 +22,6 @@ import (
"sync/atomic"
"time"

"k8s.io/apimachinery/pkg/types"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -33,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
kscheme "k8s.io/client-go/kubernetes/scheme"

"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -3106,48 +3105,79 @@ var _ = Describe("DelegatingClient", func() {
Expect(1).To(Equal(cachedReader.Called))
})

It("should call client reader when unstructured object", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())
dep := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deployment1",
Labels: map[string]string{"app": "frontend"},
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "frontend"},
When("getting unstructured objects", func() {
var dep *appsv1.Deployment

BeforeEach(func() {
dep = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: "deployment1",
Labels: map[string]string{"app": "frontend"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "frontend"}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "x", Image: "x"}}},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"app": "frontend"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"app": "frontend"}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "x", Image: "x"}}},
},
},
},
}
dep, err = clientset.AppsV1().Deployments("default").Create(context.Background(), dep, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
}
var err error
dep, err = clientset.AppsV1().Deployments("default").Create(context.Background(), dep, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
Expect(clientset.AppsV1().Deployments("default").Delete(
context.Background(),
dep.Name,
metav1.DeleteOptions{},
)).To(Succeed())
})
It("should call client reader when not cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())

actual := &unstructured.Unstructured{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "Deployment",
Version: "v1",
})
actual.SetName(dep.Name)
key := client.ObjectKey{Namespace: dep.Namespace, Name: dep.Name}
Expect(dReader.Get(context.TODO(), key, actual)).To(Succeed())
Expect(0).To(Equal(cachedReader.Called))
})
It("should call cache reader when cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
CacheUnstructured: true,
})
Expect(err).NotTo(HaveOccurred())

actual := &unstructured.Unstructured{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "Deployment",
Version: "v1",
})
actual.SetName(dep.Name)
key := client.ObjectKey{Namespace: dep.Namespace, Name: dep.Name}
Expect(dReader.Get(context.TODO(), key, actual)).To(Succeed())
Expect(0).To(Equal(cachedReader.Called))
Expect(clientset.AppsV1().Deployments("default").Delete(
context.Background(),
dep.Name,
metav1.DeleteOptions{},
)).To(Succeed())
actual := &unstructured.Unstructured{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "Deployment",
Version: "v1",
})
actual.SetName(dep.Name)
key := client.ObjectKey{Namespace: dep.Namespace, Name: dep.Name}
Expect(dReader.Get(context.TODO(), key, actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
})
})
})
Describe("List", func() {
Expand All @@ -3165,24 +3195,46 @@ var _ = Describe("DelegatingClient", func() {
Expect(1).To(Equal(cachedReader.Called))
})

It("should call client reader when unstructured object", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
When("listing unstructured objects", func() {
It("should call client reader when not cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
})
Expect(err).NotTo(HaveOccurred())

actual := &unstructured.UnstructuredList{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "DeploymentList",
Version: "v1",
})
Expect(dReader.List(context.Background(), actual)).To(Succeed())
Expect(0).To(Equal(cachedReader.Called))
})
Expect(err).NotTo(HaveOccurred())
It("should call cache reader when cached", func() {
cachedReader := &fakeReader{}
cl, err := client.New(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
dReader, err := client.NewDelegatingClient(client.NewDelegatingClientInput{
CacheReader: cachedReader,
Client: cl,
CacheUnstructured: true,
})
Expect(err).NotTo(HaveOccurred())

actual := &unstructured.UnstructuredList{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "DeploymentList",
Version: "v1",
actual := &unstructured.UnstructuredList{}
actual.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "DeploymentList",
Version: "v1",
})
Expect(dReader.List(context.Background(), actual)).To(Succeed())
Expect(1).To(Equal(cachedReader.Called))
})
Expect(dReader.List(context.Background(), actual)).To(Succeed())
Expect(0).To(Equal(cachedReader.Called))
})
})
})
Expand Down
35 changes: 22 additions & 13 deletions pkg/client/split.go
Expand Up @@ -24,14 +24,16 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

// NewDelegatingClientInput encapsulates the input parameters to create a new delegating client.
type NewDelegatingClientInput struct {
CacheReader Reader
Client Client
UncachedObjects []Object
CacheReader Reader
Client Client
UncachedObjects []Object
CacheUnstructured bool
}

// NewDelegatingClient creates a new delegating client.
Expand All @@ -53,10 +55,11 @@ func NewDelegatingClient(in NewDelegatingClientInput) (Client, error) {
scheme: in.Client.Scheme(),
mapper: in.Client.RESTMapper(),
Reader: &delegatingReader{
CacheReader: in.CacheReader,
ClientReader: in.Client,
scheme: in.Client.Scheme(),
uncachedGVKs: uncachedGVKs,
CacheReader: in.CacheReader,
ClientReader: in.Client,
scheme: in.Client.Scheme(),
uncachedGVKs: uncachedGVKs,
cacheUnstructured: in.CacheUnstructured,
},
Writer: in.Client,
StatusClient: in.Client,
Expand Down Expand Up @@ -91,8 +94,9 @@ type delegatingReader struct {
CacheReader Reader
ClientReader Reader

uncachedGVKs map[schema.GroupVersionKind]struct{}
scheme *runtime.Scheme
uncachedGVKs map[schema.GroupVersionKind]struct{}
scheme *runtime.Scheme
cacheUnstructured bool
}

func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) {
Expand All @@ -105,10 +109,15 @@ func (d *delegatingReader) shouldBypassCache(obj runtime.Object) (bool, error) {
if meta.IsListType(obj) {
gvk.Kind = strings.TrimSuffix(gvk.Kind, "List")
}
_, isUncached := d.uncachedGVKs[gvk]
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
return isUncached || isUnstructured || isUnstructuredList, nil
if _, isUncached := d.uncachedGVKs[gvk]; isUncached {
return true, nil
}
if !d.cacheUnstructured {
_, isUnstructured := obj.(*unstructured.Unstructured)
_, isUnstructuredList := obj.(*unstructured.UnstructuredList)
return isUnstructured || isUnstructuredList, nil
}
return false, nil
}

// Get retrieves an obj for a given object key from the Kubernetes Cluster.
Expand Down