Skip to content

Commit

Permalink
fix: set partialmetadata gvk in list/watch funcs to avoid data race i…
Browse files Browse the repository at this point in the history
…n cache

Signed-off-by: Joe Lanford <joe.lanford@gmail.com>
  • Loading branch information
joelanford committed Sep 2, 2021
1 parent 1730628 commit 74ba5d7
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 95 deletions.
40 changes: 26 additions & 14 deletions pkg/builder/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "3", bldr, m, false)
doReconcileTest(ctx, "3", m, false, bldr)
}, 10)

It("should Reconcile Watches objects", func() {
Expand All @@ -322,7 +322,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "4", bldr, m, true)
doReconcileTest(ctx, "4", m, true, bldr)
}, 10)
})

Expand Down Expand Up @@ -378,7 +378,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "5", bldr, m, true)
doReconcileTest(ctx, "5", m, true, bldr)

Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
Expand All @@ -396,6 +396,16 @@ var _ = Describe("application", func() {
Expect(err).NotTo(HaveOccurred())
})

It("should support multiple controllers watching the same metadata kind", func() {
bldr1 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)
bldr2 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

doReconcileTest(ctx, "6", mgr, true, bldr1, bldr2)
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

Expand All @@ -421,7 +431,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "8", bldr, mgr, true)
doReconcileTest(ctx, "8", mgr, true, bldr)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
Expand Down Expand Up @@ -496,7 +506,7 @@ func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.G

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time.
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*Builder) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix

Expand All @@ -512,15 +522,17 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr
return reconcile.Result{}, nil
})

if complete {
err := blder.Complete(fn)
Expect(err).NotTo(HaveOccurred())
} else {
var err error
var c controller.Controller
c, err = blder.Build(fn)
Expect(err).NotTo(HaveOccurred())
Expect(c).NotTo(BeNil())
for _, blder := range blders {
if complete {
err := blder.Complete(fn)
Expect(err).NotTo(HaveOccurred())
} else {
var err error
var c controller.Controller
c, err = blder.Build(fn)
Expect(err).NotTo(HaveOccurred())
Expect(c).NotTo(BeNil())
}
}

By("Starting the application")
Expand Down
74 changes: 64 additions & 10 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

Expand Down Expand Up @@ -231,12 +232,6 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
return nil, false, err
}

switch obj.(type) {
case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
ni = metadataSharedIndexInformerPreserveGVK(gvk, ni)
default:
}

i := &MapEntry{
Informer: ni,
Reader: CacheReader{
Expand Down Expand Up @@ -372,26 +367,85 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)

var (
list *metav1.PartialObjectMetadataList
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
} else {
list, err = client.Resource(mapping.Resource).List(ctx, opts)
}
if list != nil {
for i := range list.Items {
list.Items[i].SetGroupVersionKind(gvk)
}
}
return client.Resource(mapping.Resource).List(ctx, opts)
return list, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true

var (
watcher watch.Interface
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
} else {
watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
}
if watcher != nil {
watcher = newGVKFixupWatcher(gvk, watcher)
}
return client.Resource(mapping.Resource).Watch(ctx, opts)
return watcher, err
},
}, nil
}

type gvkFixupWatcher struct {
watcher watch.Interface
ch chan watch.Event
gvk schema.GroupVersionKind
wg sync.WaitGroup
}

func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
ch := make(chan watch.Event)
w := &gvkFixupWatcher{
gvk: gvk,
watcher: watcher,
ch: ch,
}
w.wg.Add(1)
go w.run()
return w
}

func (w *gvkFixupWatcher) run() {
for e := range w.watcher.ResultChan() {
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
w.ch <- e
}
w.wg.Done()
}

func (w *gvkFixupWatcher) Stop() {
w.watcher.Stop()
w.wg.Wait()
close(w.ch)
}

func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
return w.ch
}

// resyncPeriod returns a function which generates a duration each time it is
// invoked; this is so that multiple controllers don't get into lock-step and all
// hammer the apiserver with list requests simultaneously.
Expand Down
71 changes: 0 additions & 71 deletions pkg/cache/internal/metadata_infomer_wrapper.go

This file was deleted.

0 comments on commit 74ba5d7

Please sign in to comment.