Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⚠ Support registration and removal of event handler #2046

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 10 additions & 2 deletions pkg/cache/cache.go
Expand Up @@ -78,11 +78,19 @@ type Informer interface {
// AddEventHandler adds an event handler to the shared informer using the shared informer's resync
// period. Events to a single handler are delivered sequentially, but there is no coordination
// between different handlers.
AddEventHandler(handler toolscache.ResourceEventHandler)
// It returns a registration handle for the handler that can be used to remove
// the handler again.
AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error)
// AddEventHandlerWithResyncPeriod adds an event handler to the shared informer using the
// specified resync period. Events to a single handler are delivered sequentially, but there is
// no coordination between different handlers.
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration)
// It returns a registration handle for the handler that can be used to remove
// the handler again and an error if the handler cannot be added.
AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error)
// RemoveEventHandler removes a formerly added event handler given by
// its registration handle.
// This function is guaranteed to be idempotent, and thread-safe.
RemoveEventHandler(handle toolscache.ResourceEventHandlerRegistration) error
// AddIndexers adds more indexers to this store. If you call this after you already have data
// in the store, the results are undefined.
AddIndexers(indexers toolscache.Indexers) error
Expand Down
8 changes: 4 additions & 4 deletions pkg/cache/cache_test.go
Expand Up @@ -1345,7 +1345,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand All @@ -1369,7 +1369,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand Down Expand Up @@ -1528,7 +1528,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand Down Expand Up @@ -1646,7 +1646,7 @@ func CacheTest(createCacheFunc func(config *rest.Config, opts cache.Options) (ca
addFunc := func(obj interface{}) {
out <- obj
}
sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})
_, _ = sii.AddEventHandler(kcache.ResourceEventHandlerFuncs{AddFunc: addFunc})

By("adding an object")
cl, err := client.New(cfg, client.Options{})
Expand Down
22 changes: 2 additions & 20 deletions pkg/cache/informer_cache.go
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"reflect"
"strings"
"time"

apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -142,7 +141,7 @@ func (ip *informerCache) GetInformerForKind(ctx context.Context, gvk schema.Grou
if err != nil {
return nil, err
}
return WrapInformer(i.Informer), err
return i.Informer, err
}

// GetInformer returns the informer for the obj.
Expand All @@ -156,7 +155,7 @@ func (ip *informerCache) GetInformer(ctx context.Context, obj client.Object) (In
if err != nil {
return nil, err
}
return WrapInformer(i.Informer), err
return i.Informer, err
}

// NeedLeaderElection implements the LeaderElectionRunnable interface
Expand Down Expand Up @@ -216,20 +215,3 @@ func indexByField(indexer Informer, field string, extractor client.IndexerFunc)

return indexer.AddIndexers(cache.Indexers{internal.FieldIndexName(field): indexFunc})
}

type informerWrapper struct {
cache.SharedIndexInformer
}

func (iw *informerWrapper) AddEventHandler(handler cache.ResourceEventHandler) {
_, _ = iw.SharedIndexInformer.AddEventHandler(handler)
}

func (iw *informerWrapper) AddEventHandlerWithResyncPeriod(handler cache.ResourceEventHandler, resyncPeriod time.Duration) {
_, _ = iw.SharedIndexInformer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
}

// WrapInformer is a temporary wrapper to make Informer compatible with the SharedIndexInformer in client-go v0.26.0
func WrapInformer(i cache.SharedIndexInformer) Informer {
return &informerWrapper{SharedIndexInformer: i}
}
12 changes: 2 additions & 10 deletions pkg/cache/informertest/fake_cache.go
Expand Up @@ -47,11 +47,7 @@ func (c *FakeInformers) GetInformerForKind(ctx context.Context, gvk schema.Group
if err != nil {
return nil, err
}
i, err := c.informerFor(gvk, obj)
if err != nil {
return nil, err
}
return cache.WrapInformer(i), nil
return c.informerFor(gvk, obj)
}

// FakeInformerForKind implements Informers.
Expand Down Expand Up @@ -80,11 +76,7 @@ func (c *FakeInformers) GetInformer(ctx context.Context, obj client.Object) (cac
return nil, err
}
gvk := gvks[0]
i, err := c.informerFor(gvk, obj)
if err != nil {
return nil, err
}
return cache.WrapInformer(i), nil
return c.informerFor(gvk, obj)
}

// WaitForCacheSync implements Informers.
Expand Down
42 changes: 36 additions & 6 deletions pkg/cache/multi_namespace_cache.go
Expand Up @@ -296,17 +296,47 @@ type multiNamespaceInformer struct {
var _ Informer = &multiNamespaceInformer{}

// AddEventHandler adds the handler to each namespaced informer.
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) {
for _, informer := range i.namespaceToInformer {
informer.AddEventHandler(handler)
func (i *multiNamespaceInformer) AddEventHandler(handler toolscache.ResourceEventHandler) (toolscache.ResourceEventHandlerRegistration, error) {
handles := make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer))
for ns, informer := range i.namespaceToInformer {
registration, err := informer.AddEventHandler(handler)
if err != nil {
return nil, err
}
handles[ns] = registration
}
return handles, nil
}

// AddEventHandlerWithResyncPeriod adds the handler with a resync period to each namespaced informer.
func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) {
for _, informer := range i.namespaceToInformer {
informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
func (i *multiNamespaceInformer) AddEventHandlerWithResyncPeriod(handler toolscache.ResourceEventHandler, resyncPeriod time.Duration) (toolscache.ResourceEventHandlerRegistration, error) {
handles := make(map[string]toolscache.ResourceEventHandlerRegistration, len(i.namespaceToInformer))
for ns, informer := range i.namespaceToInformer {
registration, err := informer.AddEventHandlerWithResyncPeriod(handler, resyncPeriod)
if err != nil {
return nil, err
}
handles[ns] = registration
}
return handles, nil
}

// RemoveEventHandler removes a formerly added event handler given by its registration handle.
func (i *multiNamespaceInformer) RemoveEventHandler(h toolscache.ResourceEventHandlerRegistration) error {
handles, ok := h.(map[string]toolscache.ResourceEventHandlerRegistration)
if !ok {
return fmt.Errorf("it is not the registration returned by multiNamespaceInformer")
}
for ns, informer := range i.namespaceToInformer {
registration, ok := handles[ns]
if !ok {
continue
}
if err := informer.RemoveEventHandler(registration); err != nil {
return err
}
}
return nil
}

// AddIndexers adds the indexer for each namespaced informer.
Expand Down
11 changes: 9 additions & 2 deletions pkg/source/source.go
Expand Up @@ -155,7 +155,11 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
return
}

i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
_, err := i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if err != nil {
ks.started <- err
return
}
if !ks.cache.WaitForCacheSync(ctx) {
// Would be great to return something more informative here
ks.started <- errors.New("cache did not sync")
Expand Down Expand Up @@ -351,7 +355,10 @@ func (is *Informer) Start(ctx context.Context, handler handler.EventHandler, que
return fmt.Errorf("must specify Informer.Informer")
}

is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
_, err := is.Informer.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
if err != nil {
return err
}
return nil
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/source/source_integration_test.go
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"time"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
Expand Down Expand Up @@ -245,7 +244,7 @@ var _ = Describe("Source", func() {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: cache.WrapInformer(depInformer)}
instance := &source.Informer{Informer: depInformer}
err := instance.Start(ctx, handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
defer GinkgoRecover()
Expand Down Expand Up @@ -286,7 +285,7 @@ var _ = Describe("Source", func() {
rs2.SetLabels(map[string]string{"biz": "baz"})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: cache.WrapInformer(depInformer)}
instance := &source.Informer{Informer: depInformer}
err = instance.Start(ctx, handler.Funcs{
CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) {
},
Expand Down Expand Up @@ -323,7 +322,7 @@ var _ = Describe("Source", func() {
c := make(chan struct{})

q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
instance := &source.Informer{Informer: cache.WrapInformer(depInformer)}
instance := &source.Informer{Informer: depInformer}
err := instance.Start(ctx, handler.Funcs{
CreateFunc: func(event.CreateEvent, workqueue.RateLimitingInterface) {
},
Expand Down