Skip to content

Commit

Permalink
✨ Add ClientWithWatch for use in CLIs
Browse files Browse the repository at this point in the history
This change adds a new WithWatch interface and implementation to the
Client package. It is a superset of the existing Client interface that
additionally contains a Watch method. It is intended to be used by CLI
apps that need to wait for a condition to arise.

Because this is not intended to be used with controllers and might cause
confusion there, the existing interface and all references to it are
kept as-is.
  • Loading branch information
alvaroaleman committed Mar 28, 2021
1 parent df2c43d commit 45aa968
Show file tree
Hide file tree
Showing 4 changed files with 262 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/client/client.go
Expand Up @@ -53,6 +53,10 @@ type Options struct {
// case of unstructured types, the group, version, and kind will be extracted
// from the corresponding fields on the object.
func New(config *rest.Config, options Options) (Client, error) {
return newClient(config, options)
}

func newClient(config *rest.Config, options Options) (*client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/interfaces.go
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
)

// ObjectKey identifies a Kubernetes Object.
Expand Down Expand Up @@ -108,6 +109,14 @@ type Client interface {
RESTMapper() meta.RESTMapper
}

// WithWatch supports Watch on top of the CRUD operations supported by
// the normal Client. Its intended use-case are CLI apps that need to wait for
// events.
type WithWatch interface {
Client
Watch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error)
}

// IndexerFunc knows how to take an object and turn it into a series
// of non-namespaced keys. Namespaced objects are automatically given
// namespaced and non-spaced variants, so keys do not need to include namespace.
Expand Down
118 changes: 118 additions & 0 deletions pkg/client/watch.go
@@ -0,0 +1,118 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

// NewWithWatch returns a new WithWatch.
func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) {
client, err := newClient(config, options)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return &watchingClient{client: client, dynamic: dynamicClient}, nil
}

type watchingClient struct {
*client
dynamic dynamic.Interface
}

func (w *watchingClient) Watch(ctx context.Context, list ObjectList, opts ...ListOption) (watch.Interface, error) {
switch l := list.(type) {
case *unstructured.UnstructuredList:
return w.unstructuredWatch(ctx, l, opts...)
case *metav1.PartialObjectMetadataList:
return w.metadataWatch(ctx, l, opts...)
default:
return w.typedWatch(ctx, l, opts...)
}
}

func (w *watchingClient) listOpts(opts ...ListOption) ListOptions {
listOpts := ListOptions{}
listOpts.ApplyOptions(opts)
if listOpts.Raw == nil {
listOpts.Raw = &metav1.ListOptions{}
}
listOpts.Raw.Watch = true

return listOpts
}

func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) {
gvk := obj.GroupVersionKind()
if strings.HasSuffix(gvk.Kind, "List") {
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

listOpts := w.listOpts(opts...)

resInt, err := w.client.metadataClient.getResourceInterface(gvk, listOpts.Namespace)
if err != nil {
return nil, err
}

return resInt.Watch(ctx, *listOpts.AsListOptions())
}

func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) {
gvk := obj.GroupVersionKind()
if strings.HasSuffix(gvk.Kind, "List") {
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

r, err := w.client.unstructuredClient.cache.getResource(obj)
if err != nil {
return nil, err
}

listOpts := w.listOpts(opts...)

if listOpts.Namespace != "" && r.isNamespaced() {
return w.dynamic.Resource(r.mapping.Resource).Namespace(listOpts.Namespace).Watch(ctx, *listOpts.AsListOptions())
}
return w.dynamic.Resource(r.mapping.Resource).Watch(ctx, *listOpts.AsListOptions())
}

func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) {
r, err := w.client.typedClient.cache.getResource(obj)
if err != nil {
return nil, err
}

listOpts := w.listOpts(opts...)

return r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), w.client.typedClient.paramCodec).
Watch(ctx)
}
131 changes: 131 additions & 0 deletions pkg/client/watch_test.go
@@ -0,0 +1,131 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client_test

import (
"context"
"fmt"
"sync/atomic"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("ClientWithWatch", func() {
var dep *appsv1.Deployment
var count uint64 = 0
var replicaCount int32 = 2
var ns = "kube-public"
ctx := context.TODO()

BeforeEach(func(done Done) {
atomic.AddUint64(&count, 1)
dep = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("watch-deployment-name-%v", count), Namespace: ns, Labels: map[string]string{"app": fmt.Sprintf("bar-%v", count)}},
Spec: appsv1.DeploymentSpec{
Replicas: &replicaCount,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
},
},
}

var err error
dep, err = clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
close(done)
}, serverSideTimeoutSeconds)

AfterEach(func(done Done) {
deleteDeployment(ctx, dep, ns)
close(done)
}, serverSideTimeoutSeconds)

Describe("NewWithWatch", func() {
It("should return a new Client", func(done Done) {
cl, err := client.NewWithWatch(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())

close(done)
})

watchSuite := func(through client.ObjectList, expectedType client.Object) {
cl, err := client.NewWithWatch(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())

watchInterface, err := cl.Watch(ctx, through, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name),
Namespace: dep.Namespace,
})
Expect(err).NotTo(HaveOccurred())
Expect(watchInterface).NotTo(BeNil())

defer watchInterface.Stop()

event, ok := <-watchInterface.ResultChan()
Expect(ok).To(BeTrue())
Expect(event.Type).To(BeIdenticalTo(watch.Added))
Expect(event.Object).To(BeAssignableToTypeOf(expectedType))

// The metadata client doesn't set GVK so we just use the
// name and UID as a proxy to confirm that we got the right
// object.
metaObject, ok := event.Object.(metav1.Object)
Expect(ok).To(BeTrue())
Expect(metaObject.GetName()).To(Equal(dep.Name))
Expect(metaObject.GetUID()).To(Equal(dep.UID))

}

It("should receive a create event when watching the typed object", func(done Done) {
watchSuite(&appsv1.DeploymentList{}, &appsv1.Deployment{})
close(done)
}, 15)

It("should receive a create event when watching the unstructured object", func(done Done) {
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "Deployment",
Version: "v1",
})
watchSuite(u, &unstructured.Unstructured{})
close(done)
}, 15)

It("should receive a create event when watching the metadata object", func(done Done) {
m := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}}
watchSuite(m, &metav1.PartialObjectMetadata{})
close(done)
}, 15)
})

})

0 comments on commit 45aa968

Please sign in to comment.