Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add ClientWithWatch for use in CLIs #1460

Merged
merged 1 commit into from Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/client/client.go
Expand Up @@ -53,6 +53,10 @@ type Options struct {
// case of unstructured types, the group, version, and kind will be extracted
// from the corresponding fields on the object.
func New(config *rest.Config, options Options) (Client, error) {
return newClient(config, options)
}

func newClient(config *rest.Config, options Options) (*client, error) {
if config == nil {
return nil, fmt.Errorf("must provide non-nil rest.Config to client.New")
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/client/interfaces.go
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
)

// ObjectKey identifies a Kubernetes Object.
Expand Down Expand Up @@ -108,6 +109,14 @@ type Client interface {
RESTMapper() meta.RESTMapper
}

// WithWatch supports Watch on top of the CRUD operations supported by
// the normal Client. Its intended use-case are CLI apps that need to wait for
// events.
type WithWatch interface {
Client
Watch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error)
}

// IndexerFunc knows how to take an object and turn it into a series
// of non-namespaced keys. Namespaced objects are automatically given
// namespaced and non-spaced variants, so keys do not need to include namespace.
Expand Down
118 changes: 118 additions & 0 deletions pkg/client/watch.go
@@ -0,0 +1,118 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

// NewWithWatch returns a new WithWatch.
func NewWithWatch(config *rest.Config, options Options) (WithWatch, error) {
client, err := newClient(config, options)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
return &watchingClient{client: client, dynamic: dynamicClient}, nil
}

type watchingClient struct {
*client
dynamic dynamic.Interface
}

func (w *watchingClient) Watch(ctx context.Context, list ObjectList, opts ...ListOption) (watch.Interface, error) {
switch l := list.(type) {
case *unstructured.UnstructuredList:
return w.unstructuredWatch(ctx, l, opts...)
case *metav1.PartialObjectMetadataList:
return w.metadataWatch(ctx, l, opts...)
default:
return w.typedWatch(ctx, l, opts...)
}
}

func (w *watchingClient) listOpts(opts ...ListOption) ListOptions {
listOpts := ListOptions{}
listOpts.ApplyOptions(opts)
if listOpts.Raw == nil {
listOpts.Raw = &metav1.ListOptions{}
}
listOpts.Raw.Watch = true

return listOpts
}

func (w *watchingClient) metadataWatch(ctx context.Context, obj *metav1.PartialObjectMetadataList, opts ...ListOption) (watch.Interface, error) {
gvk := obj.GroupVersionKind()
if strings.HasSuffix(gvk.Kind, "List") {
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

listOpts := w.listOpts(opts...)

resInt, err := w.client.metadataClient.getResourceInterface(gvk, listOpts.Namespace)
if err != nil {
return nil, err
}

return resInt.Watch(ctx, *listOpts.AsListOptions())
}

func (w *watchingClient) unstructuredWatch(ctx context.Context, obj *unstructured.UnstructuredList, opts ...ListOption) (watch.Interface, error) {
gvk := obj.GroupVersionKind()
if strings.HasSuffix(gvk.Kind, "List") {
gvk.Kind = gvk.Kind[:len(gvk.Kind)-4]
}

r, err := w.client.unstructuredClient.cache.getResource(obj)
if err != nil {
return nil, err
}

listOpts := w.listOpts(opts...)

if listOpts.Namespace != "" && r.isNamespaced() {
return w.dynamic.Resource(r.mapping.Resource).Namespace(listOpts.Namespace).Watch(ctx, *listOpts.AsListOptions())
}
return w.dynamic.Resource(r.mapping.Resource).Watch(ctx, *listOpts.AsListOptions())
}

func (w *watchingClient) typedWatch(ctx context.Context, obj ObjectList, opts ...ListOption) (watch.Interface, error) {
r, err := w.client.typedClient.cache.getResource(obj)
if err != nil {
return nil, err
}

listOpts := w.listOpts(opts...)

return r.Get().
NamespaceIfScoped(listOpts.Namespace, r.isNamespaced()).
Resource(r.resource()).
VersionedParams(listOpts.AsListOptions(), w.client.typedClient.paramCodec).
Watch(ctx)
}
131 changes: 131 additions & 0 deletions pkg/client/watch_test.go
@@ -0,0 +1,131 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client_test

import (
"context"
"fmt"
"sync/atomic"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("ClientWithWatch", func() {
var dep *appsv1.Deployment
var count uint64 = 0
var replicaCount int32 = 2
var ns = "kube-public"
ctx := context.TODO()

BeforeEach(func(done Done) {
atomic.AddUint64(&count, 1)
dep = &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("watch-deployment-name-%v", count), Namespace: ns, Labels: map[string]string{"app": fmt.Sprintf("bar-%v", count)}},
Spec: appsv1.DeploymentSpec{
Replicas: &replicaCount,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{Containers: []corev1.Container{{Name: "nginx", Image: "nginx"}}},
},
},
}

var err error
dep, err = clientset.AppsV1().Deployments(ns).Create(ctx, dep, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
close(done)
}, serverSideTimeoutSeconds)

AfterEach(func(done Done) {
deleteDeployment(ctx, dep, ns)
close(done)
}, serverSideTimeoutSeconds)

Describe("NewWithWatch", func() {
It("should return a new Client", func(done Done) {
cl, err := client.NewWithWatch(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())

close(done)
})

watchSuite := func(through client.ObjectList, expectedType client.Object) {
cl, err := client.NewWithWatch(cfg, client.Options{})
Expect(err).NotTo(HaveOccurred())
Expect(cl).NotTo(BeNil())

watchInterface, err := cl.Watch(ctx, through, &client.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", dep.Name),
Namespace: dep.Namespace,
})
Expect(err).NotTo(HaveOccurred())
Expect(watchInterface).NotTo(BeNil())

defer watchInterface.Stop()

event, ok := <-watchInterface.ResultChan()
Expect(ok).To(BeTrue())
Expect(event.Type).To(BeIdenticalTo(watch.Added))
Expect(event.Object).To(BeAssignableToTypeOf(expectedType))

// The metadata client doesn't set GVK so we just use the
// name and UID as a proxy to confirm that we got the right
// object.
metaObject, ok := event.Object.(metav1.Object)
Expect(ok).To(BeTrue())
Expect(metaObject.GetName()).To(Equal(dep.Name))
Expect(metaObject.GetUID()).To(Equal(dep.UID))

}

It("should receive a create event when watching the typed object", func(done Done) {
watchSuite(&appsv1.DeploymentList{}, &appsv1.Deployment{})
close(done)
}, 15)

It("should receive a create event when watching the unstructured object", func(done Done) {
u := &unstructured.UnstructuredList{}
u.SetGroupVersionKind(schema.GroupVersionKind{
Group: "apps",
Kind: "Deployment",
Version: "v1",
})
watchSuite(u, &unstructured.Unstructured{})
close(done)
}, 15)

It("should receive a create event when watching the metadata object", func(done Done) {
m := &metav1.PartialObjectMetadataList{TypeMeta: metav1.TypeMeta{Kind: "Deployment", APIVersion: "apps/v1"}}
watchSuite(m, &metav1.PartialObjectMetadata{})
close(done)
}, 15)
})

})