Skip to content

Commit

Permalink
Merge pull request #1650 from joelanford/fix-partialmetadata-data-race
Browse files Browse the repository at this point in the history
🐛 set partialmetadata gvk in list/watch funcs to avoid data race in cache
  • Loading branch information
k8s-ci-robot committed Sep 2, 2021
2 parents 1730628 + 74ba5d7 commit 498ee8a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 95 deletions.
40 changes: 26 additions & 14 deletions pkg/builder/controller_test.go
Expand Up @@ -307,7 +307,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "3", bldr, m, false)
doReconcileTest(ctx, "3", m, false, bldr)
}, 10)

It("should Reconcile Watches objects", func() {
Expand All @@ -322,7 +322,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "4", bldr, m, true)
doReconcileTest(ctx, "4", m, true, bldr)
}, 10)
})

Expand Down Expand Up @@ -378,7 +378,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "5", bldr, m, true)
doReconcileTest(ctx, "5", m, true, bldr)

Expect(deployPrctExecuted).To(BeTrue(), "Deploy predicated should be called at least once")
Expect(replicaSetPrctExecuted).To(BeTrue(), "ReplicaSet predicated should be called at least once")
Expand All @@ -396,6 +396,16 @@ var _ = Describe("application", func() {
Expect(err).NotTo(HaveOccurred())
})

It("should support multiple controllers watching the same metadata kind", func() {
bldr1 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)
bldr2 := ControllerManagedBy(mgr).For(&appsv1.Deployment{}, OnlyMetadata)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

doReconcileTest(ctx, "6", mgr, true, bldr1, bldr2)
})

It("should support watching For, Owns, and Watch as metadata", func() {
statefulSetMaps := make(chan *metav1.PartialObjectMetadata)

Expand All @@ -421,7 +431,7 @@ var _ = Describe("application", func() {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
doReconcileTest(ctx, "8", bldr, mgr, true)
doReconcileTest(ctx, "8", mgr, true, bldr)

By("Creating a new stateful set")
set := &appsv1.StatefulSet{
Expand Down Expand Up @@ -496,7 +506,7 @@ func (c *nonTypedOnlyCache) GetInformerForKind(ctx context.Context, gvk schema.G

// TODO(directxman12): this function has too many arguments, and the whole
// "nameSuffix" think is a bit of a hack It should be cleaned up significantly by someone with a bit of time.
func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr manager.Manager, complete bool) {
func doReconcileTest(ctx context.Context, nameSuffix string, mgr manager.Manager, complete bool, blders ...*Builder) {
deployName := "deploy-name-" + nameSuffix
rsName := "rs-name-" + nameSuffix

Expand All @@ -512,15 +522,17 @@ func doReconcileTest(ctx context.Context, nameSuffix string, blder *Builder, mgr
return reconcile.Result{}, nil
})

if complete {
err := blder.Complete(fn)
Expect(err).NotTo(HaveOccurred())
} else {
var err error
var c controller.Controller
c, err = blder.Build(fn)
Expect(err).NotTo(HaveOccurred())
Expect(c).NotTo(BeNil())
for _, blder := range blders {
if complete {
err := blder.Complete(fn)
Expect(err).NotTo(HaveOccurred())
} else {
var err error
var c controller.Controller
c, err = blder.Build(fn)
Expect(err).NotTo(HaveOccurred())
Expect(c).NotTo(BeNil())
}
}

By("Starting the application")
Expand Down
74 changes: 64 additions & 10 deletions pkg/cache/internal/informers_map.go
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

Expand Down Expand Up @@ -231,12 +232,6 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
return nil, false, err
}

switch obj.(type) {
case *metav1.PartialObjectMetadata, *metav1.PartialObjectMetadataList:
ni = metadataSharedIndexInformerPreserveGVK(gvk, ni)
default:
}

i := &MapEntry{
Informer: ni,
Reader: CacheReader{
Expand Down Expand Up @@ -372,26 +367,85 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
return &cache.ListWatch{
ListFunc: func(opts metav1.ListOptions) (runtime.Object, error) {
ip.selectors[gvk].ApplyToList(&opts)

var (
list *metav1.PartialObjectMetadataList
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
list, err = client.Resource(mapping.Resource).Namespace(namespace).List(ctx, opts)
} else {
list, err = client.Resource(mapping.Resource).List(ctx, opts)
}
if list != nil {
for i := range list.Items {
list.Items[i].SetGroupVersionKind(gvk)
}
}
return client.Resource(mapping.Resource).List(ctx, opts)
return list, err
},
// Setup the watch function
WatchFunc: func(opts metav1.ListOptions) (watch.Interface, error) {
ip.selectors[gvk].ApplyToList(&opts)
// Watch needs to be set to true separately
opts.Watch = true

var (
watcher watch.Interface
err error
)
namespace := restrictNamespaceBySelector(ip.namespace, ip.selectors[gvk])
if namespace != "" && mapping.Scope.Name() != meta.RESTScopeNameRoot {
return client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
watcher, err = client.Resource(mapping.Resource).Namespace(namespace).Watch(ctx, opts)
} else {
watcher, err = client.Resource(mapping.Resource).Watch(ctx, opts)
}
if watcher != nil {
watcher = newGVKFixupWatcher(gvk, watcher)
}
return client.Resource(mapping.Resource).Watch(ctx, opts)
return watcher, err
},
}, nil
}

type gvkFixupWatcher struct {
watcher watch.Interface
ch chan watch.Event
gvk schema.GroupVersionKind
wg sync.WaitGroup
}

func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
ch := make(chan watch.Event)
w := &gvkFixupWatcher{
gvk: gvk,
watcher: watcher,
ch: ch,
}
w.wg.Add(1)
go w.run()
return w
}

func (w *gvkFixupWatcher) run() {
for e := range w.watcher.ResultChan() {
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
w.ch <- e
}
w.wg.Done()
}

func (w *gvkFixupWatcher) Stop() {
w.watcher.Stop()
w.wg.Wait()
close(w.ch)
}

func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
return w.ch
}

// resyncPeriod returns a function which generates a duration each time it is
// invoked; this is so that multiple controllers don't get into lock-step and all
// hammer the apiserver with list requests simultaneously.
Expand Down
71 changes: 0 additions & 71 deletions pkg/cache/internal/metadata_infomer_wrapper.go

This file was deleted.

0 comments on commit 498ee8a

Please sign in to comment.