From 45aa968acaa2e28279f7250131c706cc54d4a205 Mon Sep 17 00:00:00 2001 From: Alvaro Aleman Date: Sat, 27 Mar 2021 21:23:14 -0400 Subject: [PATCH] :sparkles: Add ClientWithWatch for use in CLIs This change adds a new WithWatch interface and implementation to the Client package. It is a superset of the existing Client interface that additionally contains a Watch method. It is intended to be used by CLI apps that need to wait for a condition to arise. Because this is not intended to be used with controllers and might cause confusion there, the existing interface and all references to it are kept as-is. --- pkg/client/client.go | 4 ++ pkg/client/interfaces.go | 9 +++ pkg/client/watch.go | 118 +++++++++++++++++++++++++++++++++++ pkg/client/watch_test.go | 131 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 262 insertions(+) create mode 100644 pkg/client/watch.go create mode 100644 pkg/client/watch_test.go diff --git a/pkg/client/client.go b/pkg/client/client.go index 0af814fdf9..818232f549 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -53,6 +53,10 @@ type Options struct { // case of unstructured types, the group, version, and kind will be extracted // from the corresponding fields on the object. func New(config *rest.Config, options Options) (Client, error) { + return newClient(config, options) +} + +func newClient(config *rest.Config, options Options) (*client, error) { if config == nil { return nil, fmt.Errorf("must provide non-nil rest.Config to client.New") } diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 4dc6eb79a0..0dfea4d6c5 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/watch" ) // ObjectKey identifies a Kubernetes Object. @@ -108,6 +109,14 @@ type Client interface { RESTMapper() meta.RESTMapper } +// WithWatch supports Watch on top of the CRUD operations supported by +// the normal Client. Its intended use-case are CLI apps that need to wait for +// events. +type WithWatch interface { + Client + Watch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) +} + // IndexerFunc knows how to take an object and turn it into a series // of non-namespaced keys. Namespaced objects are automatically given // namespaced and non-spaced variants, so keys do not need to include namespace. diff --git a/pkg/client/watch.go b/pkg/client/watch.go new file mode 100644 index 0000000000..765ca5daa6 --- /dev/null +++ b/pkg/client/watch.go @@ -0,0 +1,118 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client + +import ( + "context" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/rest" +) + +// NewWithWatch returns a new WithWatch. +func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) { + client, err := newClient(config, options) + if err != nil { + return nil, err + } + dynamicClient, err := dynamic.NewForConfig(config) + if err != nil { + return nil, err + } + return &watchingClient{client: client, dynamic: dynamicClient}, nil +} + +type watchingClient struct { + *client + dynamic dynamic.Interface +} + +func (w *watchingClient) Watch(ctx context.Context, list ObjectList, opts ...ListOption) (watch.Interface, error) { + switch l := list.(type) { + case *unstructured.UnstructuredList: + return w.unstructuredWatch(ctx, l, opts...) + case *metav1.PartialObjectMetadataList: + return w.metadataWatch(ctx, l, opts...) + default: + return w.typedWatch(ctx, l, opts...) + } +} + +func (w *watchingClient) listOpts(opts ...ListOption) ListOptions { + listOpts := ListOptions{} + listOpts.ApplyOptions(opts) + if listOpts.Raw == nil { + listOpts.Raw = &metav1.ListOptions{} + } + listOpts.Raw.Watch = true + + return listOpts +} + +func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) { + gvk := obj.GroupVersionKind() + if strings.HasSuffix(gvk.Kind, "List") { + gvk.Kind = gvk.Kind[:len(gvk.Kind)-4] + } + + listOpts := w.listOpts(opts...) + + resInt, err := w.client.metadataClient.getResourceInterface(gvk, listOpts.Namespace) + if err != nil { + return nil, err + } + + return resInt.Watch(ctx, *listOpts.AsListOptions()) +} + +func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) { + gvk := obj.GroupVersionKind() + if strings.HasSuffix(gvk.Kind, "List") { + gvk.Kind = gvk.Kind[:len(gvk.Kind)-4] + } + + r, err := w.client.unstructuredClient.cache.getResource(obj) + if err != nil { + return nil, err + } + + listOpts := w.listOpts(opts...) + + if listOpts.Namespace != "" && r.isNamespaced() { + return w.dynamic.Resource(r.mapping.Resource).Namespace(listOpts.Namespace).Watch(ctx, *listOpts.AsListOptions()) + } + return w.dynamic.Resource(r.mapping.Resource).Watch(ctx, *listOpts.AsListOptions()) +} + +func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) { + r, err := w.client.typedClient.cache.getResource(obj) + if err != nil { + return nil, err + } + + listOpts := w.listOpts(opts...) + + return r.Get(). + NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()). + Resource(r.resource()). + VersionedParams(listOpts.AsListOptions(), w.client.typedClient.paramCodec). + Watch(ctx) +} diff --git a/pkg/client/watch_test.go b/pkg/client/watch_test.go new file mode 100644 index 0000000000..7d770cb09c --- /dev/null +++ b/pkg/client/watch_test.go @@ -0,0 +1,131 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package client_test + +import ( + "context" + "fmt" + "sync/atomic" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("ClientWithWatch", func() { + var dep *appsv1.Deployment + var count uint64 = 0 + var replicaCount int32 = 2 + var ns = "kube-public" + ctx := context.TODO() + + BeforeEach(func(done Done) { + atomic.AddUint64(&count, 1) + dep = &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("watch-deployment-name-%v", count), Namespace: ns, Labels: map[string]string{"app": fmt.Sprintf("bar-%v", count)}}, + Spec: appsv1.DeploymentSpec{ + Replicas: &replicaCount, + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"foo": "bar"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}}, + Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}}, + }, + }, + } + + var err error + dep, err = clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{}) + Expect(err).NotTo(HaveOccurred()) + close(done) + }, serverSideTimeoutSeconds) + + AfterEach(func(done Done) { + deleteDeployment(ctx, dep, ns) + close(done) + }, serverSideTimeoutSeconds) + + Describe("NewWithWatch", func() { + It("should return a new Client", func(done Done) { + cl, err := client.NewWithWatch(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + close(done) + }) + + watchSuite := func(through client.ObjectList, expectedType client.Object) { + cl, err := client.NewWithWatch(cfg, client.Options{}) + Expect(err).NotTo(HaveOccurred()) + Expect(cl).NotTo(BeNil()) + + watchInterface, err := cl.Watch(ctx, through, &client.ListOptions{ + FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name), + Namespace: dep.Namespace, + }) + Expect(err).NotTo(HaveOccurred()) + Expect(watchInterface).NotTo(BeNil()) + + defer watchInterface.Stop() + + event, ok := <-watchInterface.ResultChan() + Expect(ok).To(BeTrue()) + Expect(event.Type).To(BeIdenticalTo(watch.Added)) + Expect(event.Object).To(BeAssignableToTypeOf(expectedType)) + + // The metadata client doesn't set GVK so we just use the + // name and UID as a proxy to confirm that we got the right + // object. + metaObject, ok := event.Object.(metav1.Object) + Expect(ok).To(BeTrue()) + Expect(metaObject.GetName()).To(Equal(dep.Name)) + Expect(metaObject.GetUID()).To(Equal(dep.UID)) + + } + + It("should receive a create event when watching the typed object", func(done Done) { + watchSuite(&appsv1.DeploymentList{}, &appsv1.Deployment{}) + close(done) + }, 15) + + It("should receive a create event when watching the unstructured object", func(done Done) { + u := &unstructured.UnstructuredList{} + u.SetGroupVersionKind(schema.GroupVersionKind{ + Group: "apps", + Kind: "Deployment", + Version: "v1", + }) + watchSuite(u, &unstructured.Unstructured{}) + close(done) + }, 15) + + It("should receive a create event when watching the metadata object", func(done Done) { + m := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}} + watchSuite(m, &metav1.PartialObjectMetadata{}) + close(done) + }, 15) + }) + +})