diff --git a/pkg/builder/controller_test.go b/pkg/builder/controller_test.go index a07c8ce73b..62c5044e99 100644 --- a/pkg/builder/controller_test.go +++ b/pkg/builder/controller_test.go @@ -307,7 +307,7 @@ var _ = Describe("application", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - doReconcileTest(ctx, "3", bldr, m, false) + doReconcileTest(ctx, "3", m, false, bldr) }, 10) It("should Reconcile Watches objects", func() { @@ -322,7 +322,7 @@ var _ = Describe("application", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - doReconcileTest(ctx, "4", bldr, m, true) + doReconcileTest(ctx, "4", m, true, bldr) }, 10) }) @@ -378,7 +378,7 @@ var _ = Describe("application", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - doReconcileTest(ctx, "5", bldr, m, true) + doReconcileTest(ctx, "5", m, true, bldr) Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once") Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once") @@ -396,6 +396,16 @@ var _ = Describe("application", func() { Expect(err).NotTo(HaveOccurred()) }) + It("should support multiple controllers watching the same metadata kind", func() { + bldr1 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata) + bldr2 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + doReconcileTest(ctx, "6", mgr, true, bldr1, bldr2) + }) + It("should support watching For, Owns, and Watch as metadata", func() { statefulSetMaps := make(chan *metav1.PartialObjectMetadata) @@ -421,7 +431,7 @@ var _ = Describe("application", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - doReconcileTest(ctx, "8", bldr, mgr, true) + doReconcileTest(ctx, "8", mgr, true, bldr) By("Creating a new stateful set") set := &appsv1.StatefulSet{ @@ -496,7 +506,7 @@ func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.G // TODO(directxman12): this function has too many arguments, and the whole // "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time. -func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) { +func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*Builder) { deployName := "deploy-name-" + nameSuffix rsName := "rs-name-" + nameSuffix @@ -512,15 +522,17 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr return reconcile.Result{}, nil }) - if complete { - err := blder.Complete(fn) - Expect(err).NotTo(HaveOccurred()) - } else { - var err error - var c controller.Controller - c, err = blder.Build(fn) - Expect(err).NotTo(HaveOccurred()) - Expect(c).NotTo(BeNil()) + for _, blder := range blders { + if complete { + err := blder.Complete(fn) + Expect(err).NotTo(HaveOccurred()) + } else { + var err error + var c controller.Controller + c, err = blder.Build(fn) + Expect(err).NotTo(HaveOccurred()) + Expect(c).NotTo(BeNil()) + } } By("Starting the application") diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 413b048f0c..f8e957343f 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -34,6 +34,7 @@ import ( "k8s.io/client-go/metadata" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) @@ -231,12 +232,6 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob return nil, false, err } - switch obj.(type) { - case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList: - ni = metadataSharedIndexInformerPreserveGVK(gvk, ni) - default: - } - i := &MapEntry{ Informer: ni, Reader: CacheReader{ @@ -372,26 +367,85 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM return &cache.ListWatch{ ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) { ip.selectors[gvk].ApplyToList(&opts) + + var ( + list *metav1.PartialObjectMetadataList + err error + ) namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk]) if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - return client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts) + list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts) + } else { + list, err = client.Resource(mapping.Resource).List(ctx, opts) + } + if list != nil { + for i := range list.Items { + list.Items[i].SetGroupVersionKind(gvk) + } } - return client.Resource(mapping.Resource).List(ctx, opts) + return list, err }, // Setup the watch function WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) { ip.selectors[gvk].ApplyToList(&opts) // Watch needs to be set to true separately opts.Watch = true + + var ( + watcher watch.Interface + err error + ) namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk]) if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot { - return client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts) + watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts) + } else { + watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts) + } + if watcher != nil { + watcher = newGVKFixupWatcher(gvk, watcher) } - return client.Resource(mapping.Resource).Watch(ctx, opts) + return watcher, err }, }, nil } +type gvkFixupWatcher struct { + watcher watch.Interface + ch chan watch.Event + gvk schema.GroupVersionKind + wg sync.WaitGroup +} + +func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface { + ch := make(chan watch.Event) + w := &gvkFixupWatcher{ + gvk: gvk, + watcher: watcher, + ch: ch, + } + w.wg.Add(1) + go w.run() + return w +} + +func (w *gvkFixupWatcher) run() { + for e := range w.watcher.ResultChan() { + e.Object.GetObjectKind().SetGroupVersionKind(w.gvk) + w.ch <- e + } + w.wg.Done() +} + +func (w *gvkFixupWatcher) Stop() { + w.watcher.Stop() + w.wg.Wait() + close(w.ch) +} + +func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event { + return w.ch +} + // resyncPeriod returns a function which generates a duration each time it is // invoked; this is so that multiple controllers don't get into lock-step and all // hammer the apiserver with list requests simultaneously. diff --git a/pkg/cache/internal/metadata_infomer_wrapper.go b/pkg/cache/internal/metadata_infomer_wrapper.go deleted file mode 100644 index c0fa24a5c1..0000000000 --- a/pkg/cache/internal/metadata_infomer_wrapper.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package internal - -import ( - "time" - - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" -) - -func metadataSharedIndexInformerPreserveGVK(gvk schema.GroupVersionKind, si cache.SharedIndexInformer) cache.SharedIndexInformer { - return &sharedInformerWrapper{ - gvk: gvk, - SharedIndexInformer: si, - } -} - -type sharedInformerWrapper struct { - gvk schema.GroupVersionKind - cache.SharedIndexInformer -} - -func (s *sharedInformerWrapper) AddEventHandler(handler cache.ResourceEventHandler) { - s.SharedIndexInformer.AddEventHandler(&handlerPreserveGVK{s.gvk, handler}) -} - -func (s *sharedInformerWrapper) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { - s.SharedIndexInformer.AddEventHandlerWithResyncPeriod(&handlerPreserveGVK{s.gvk, handler}, resyncPeriod) -} - -type handlerPreserveGVK struct { - gvk schema.GroupVersionKind - cache.ResourceEventHandler -} - -func (h *handlerPreserveGVK) resetGroupVersionKind(obj interface{}) { - if v, ok := obj.(schema.ObjectKind); ok { - v.SetGroupVersionKind(h.gvk) - } -} - -func (h *handlerPreserveGVK) OnAdd(obj interface{}) { - h.resetGroupVersionKind(obj) - h.ResourceEventHandler.OnAdd(obj) -} - -func (h *handlerPreserveGVK) OnUpdate(oldObj, newObj interface{}) { - h.resetGroupVersionKind(oldObj) - h.resetGroupVersionKind(newObj) - h.ResourceEventHandler.OnUpdate(oldObj, newObj) -} - -func (h *handlerPreserveGVK) OnDelete(obj interface{}) { - h.resetGroupVersionKind(obj) - h.ResourceEventHandler.OnDelete(obj) -}