From 6d2d247cb6f3a26e6b5597c2aa4a943a90c988bb Mon Sep 17 00:00:00 2001 From: FillZpp Date: Tue, 15 Nov 2022 19:32:17 +0800 Subject: [PATCH] Support registration and removal for event handler Signed-off-by: FillZpp --- pkg/cache/cache.go | 12 ++++++-- pkg/cache/cache_test.go | 8 ++--- pkg/cache/informer_cache.go | 22 ++------------ pkg/cache/informertest/fake_cache.go | 12 ++------ pkg/cache/multi_namespace_cache.go | 42 +++++++++++++++++++++++---- pkg/source/source.go | 11 +++++-- pkg/source/source_integration_test.go | 7 ++--- 7 files changed, 66 insertions(+), 48 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 8e8d889dd7..9827ea0297 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -78,11 +78,19 @@ type Informer interface { // AddEventHandler adds an event handler to the shared informer using the shared informer's resync // period. Events to a single handler are delivered sequentially, but there is no coordination // between different handlers. - AddEventHandler(handler toolscache.ResourceEventHandler) + // It returns a registration handle for the handler that can be used to remove + // the handler again. + AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) // AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the // specified resync period. Events to a single handler are delivered sequentially, but there is // no coordination between different handlers. - AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) + // It returns a registration handle for the handler that can be used to remove + // the handler again and an error if the handler cannot be added. + AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) + // RemoveEventHandler removes a formerly added event handler given by + // its registration handle. + // This function is guaranteed to be idempotent, and thread-safe. + RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error // AddIndexers adds more indexers to this store. If you call this after you already have data // in the store, the results are undefined. AddIndexers(indexers toolscache.Indexers) error diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index e89e3a72de..9f04a943ba 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -1345,7 +1345,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca addFunc := func(obj interface{}) { out <- obj } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + _, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) By("adding an object") cl, err := client.New(cfg, client.Options{}) @@ -1369,7 +1369,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca addFunc := func(obj interface{}) { out <- obj } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + _, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) By("adding an object") cl, err := client.New(cfg, client.Options{}) @@ -1528,7 +1528,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca addFunc := func(obj interface{}) { out <- obj } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + _, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) By("adding an object") cl, err := client.New(cfg, client.Options{}) @@ -1646,7 +1646,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca addFunc := func(obj interface{}) { out <- obj } - sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) + _, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc}) By("adding an object") cl, err := client.New(cfg, client.Options{}) diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index c79221e419..08e4e6df59 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -21,7 +21,6 @@ import ( "fmt" "reflect" "strings" - "time" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -142,7 +141,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou if err != nil { return nil, err } - return WrapInformer(i.Informer), err + return i.Informer, err } // GetInformer returns the informer for the obj. @@ -156,7 +155,7 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In if err != nil { return nil, err } - return WrapInformer(i.Informer), err + return i.Informer, err } // NeedLeaderElection implements the LeaderElectionRunnable interface @@ -216,20 +215,3 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc) return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc}) } - -type informerWrapper struct { - cache.SharedIndexInformer -} - -func (iw *informerWrapper) AddEventHandler(handler cache.ResourceEventHandler) { - _, _ = iw.SharedIndexInformer.AddEventHandler(handler) -} - -func (iw *informerWrapper) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) { - _, _ = iw.SharedIndexInformer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) -} - -// WrapInformer is a temporary wrapper to make Informer compatible with the SharedIndexInformer in client-go v0.26.0 -func WrapInformer(i cache.SharedIndexInformer) Informer { - return &informerWrapper{SharedIndexInformer: i} -} diff --git a/pkg/cache/informertest/fake_cache.go b/pkg/cache/informertest/fake_cache.go index 72ab5e6a10..da3bf8e0d4 100644 --- a/pkg/cache/informertest/fake_cache.go +++ b/pkg/cache/informertest/fake_cache.go @@ -47,11 +47,7 @@ func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.Group if err != nil { return nil, err } - i, err := c.informerFor(gvk, obj) - if err != nil { - return nil, err - } - return cache.WrapInformer(i), nil + return c.informerFor(gvk, obj) } // FakeInformerForKind implements Informers. @@ -80,11 +76,7 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac return nil, err } gvk := gvks[0] - i, err := c.informerFor(gvk, obj) - if err != nil { - return nil, err - } - return cache.WrapInformer(i), nil + return c.informerFor(gvk, obj) } // WaitForCacheSync implements Informers. diff --git a/pkg/cache/multi_namespace_cache.go b/pkg/cache/multi_namespace_cache.go index 64514c0c55..fccb364710 100644 --- a/pkg/cache/multi_namespace_cache.go +++ b/pkg/cache/multi_namespace_cache.go @@ -296,17 +296,47 @@ type multiNamespaceInformer struct { var _ Informer = &multiNamespaceInformer{} // AddEventHandler adds the handler to each namespaced informer. -func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) { - for _, informer := range i.namespaceToInformer { - informer.AddEventHandler(handler) +func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) { + handles := make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)) + for ns, informer := range i.namespaceToInformer { + registration, err := informer.AddEventHandler(handler) + if err != nil { + return nil, err + } + handles[ns] = registration } + return handles, nil } // AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer. -func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) { - for _, informer := range i.namespaceToInformer { - informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) +func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) { + handles := make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer)) + for ns, informer := range i.namespaceToInformer { + registration, err := informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod) + if err != nil { + return nil, err + } + handles[ns] = registration + } + return handles, nil +} + +// RemoveEventHandler removes a formerly added event handler given by its registration handle. +func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error { + handles, ok := h.(map[string]toolscache.ResourceEventHandlerRegistration) + if !ok { + return fmt.Errorf("it is not the registration returned by multiNamespaceInformer") } + for ns, informer := range i.namespaceToInformer { + registration, ok := handles[ns] + if !ok { + continue + } + if err := informer.RemoveEventHandler(registration); err != nil { + return err + } + } + return nil } // AddIndexers adds the indexer for each namespaced informer. diff --git a/pkg/source/source.go b/pkg/source/source.go index 241c582eff..ae1b6fcfbf 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -155,7 +155,11 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w return } - i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + _, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + if err != nil { + ks.started <- err + return + } if !ks.cache.WaitForCacheSync(ctx) { // Would be great to return something more informative here ks.started <- errors.New("cache did not sync") @@ -351,7 +355,10 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que return fmt.Errorf("must specify Informer.Informer") } - is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + _, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct}) + if err != nil { + return err + } return nil } diff --git a/pkg/source/source_integration_test.go b/pkg/source/source_integration_test.go index 5f327dd422..c7b3da39e2 100644 --- a/pkg/source/source_integration_test.go +++ b/pkg/source/source_integration_test.go @@ -20,7 +20,6 @@ import ( "fmt" "time" - "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" @@ -245,7 +244,7 @@ var _ = Describe("Source", func() { c := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{Informer: cache.WrapInformer(depInformer)} + instance := &source.Informer{Informer: depInformer} err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { defer GinkgoRecover() @@ -286,7 +285,7 @@ var _ = Describe("Source", func() { rs2.SetLabels(map[string]string{"biz": "baz"}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{Informer: cache.WrapInformer(depInformer)} + instance := &source.Informer{Informer: depInformer} err = instance.Start(ctx, handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { }, @@ -323,7 +322,7 @@ var _ = Describe("Source", func() { c := make(chan struct{}) q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test") - instance := &source.Informer{Informer: cache.WrapInformer(depInformer)} + instance := &source.Informer{Informer: depInformer} err := instance.Start(ctx, handler.Funcs{ CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) { },