From e1dff72a5b0a7265ad27df96abd0b4caa1b1885b Mon Sep 17 00:00:00 2001 From: Florian Gutmann Date: Thu, 26 May 2022 00:15:00 +0000 Subject: [PATCH] cluster-level lock for workload cluster client initialization Before this commit, workload cluster client initialization required a global lock to be held. If initialization of a single workload cluster client took time, all other reconcile-loops who require a workload cluster connection were blocked until initialization finished. Initialization of a workload cluster client can take a significant amount of time, because it requires to initialize the discovery client, which sends multiple request to the API-server. With this change initialization of a workload cluster client only requires to hold a lock for the specific cluster. This means reconciliation for other clusters is not affected by a long running workload cluster client initialization. --- controllers/remote/cluster_cache.go | 64 +++++++++++++++++------ controllers/remote/keyedmutex.go | 75 +++++++++++++++++++++++++++ controllers/remote/keyedmutex_test.go | 75 +++++++++++++++++++++++++++ 3 files changed, 198 insertions(+), 16 deletions(-) create mode 100644 controllers/remote/keyedmutex.go create mode 100644 controllers/remote/keyedmutex_test.go diff --git a/controllers/remote/cluster_cache.go b/controllers/remote/cluster_cache.go index 92f141855e54..ff00cfe0f32f 100644 --- a/controllers/remote/cluster_cache.go +++ b/controllers/remote/cluster_cache.go @@ -49,6 +49,7 @@ const ( healthCheckPollInterval = 10 * time.Second healthCheckRequestTimeout = 5 * time.Second healthCheckUnhealthyThreshold = 10 + initialCacheSyncTimeout = 5 * time.Minute clusterCacheControllerName = "cluster-cache-tracker" ) @@ -61,6 +62,7 @@ type ClusterCacheTracker struct { lock sync.RWMutex clusterAccessors map[client.ObjectKey]*clusterAccessor + clusterLock *keyedMutex indexes []Index } @@ -102,16 +104,14 @@ func NewClusterCacheTracker(manager ctrl.Manager, options ClusterCacheTrackerOpt client: manager.GetClient(), scheme: manager.GetScheme(), clusterAccessors: make(map[client.ObjectKey]*clusterAccessor), + clusterLock: newKeyedMutex(), indexes: options.Indexes, }, nil } // GetClient returns a cached client for the given cluster. func (t *ClusterCacheTracker) GetClient(ctx context.Context, cluster client.ObjectKey) (client.Client, error) { - t.lock.Lock() - defer t.lock.Unlock() - - accessor, err := t.getClusterAccessorLH(ctx, cluster, t.indexes...) + accessor, err := t.getClusterAccessor(ctx, cluster, t.indexes...) if err != nil { return nil, err } @@ -135,21 +135,48 @@ func (t *ClusterCacheTracker) clusterAccessorExists(cluster client.ObjectKey) bo return exists } -// getClusterAccessorLH first tries to return an already-created clusterAccessor for cluster, falling back to creating a -// new clusterAccessor if needed. Note, this method requires t.lock to already be held (LH=lock held). -func (t *ClusterCacheTracker) getClusterAccessorLH(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { - a := t.clusterAccessors[cluster] +// getClusterAccessor first tries to return an already-created clusterAccessor for cluster, falling back to creating a new clusterAccessor if needed. +func (t *ClusterCacheTracker) getClusterAccessor(ctx context.Context, cluster client.ObjectKey, indexes ...Index) (*clusterAccessor, error) { + log := ctrl.LoggerFrom(ctx, "cluster", cluster.Name) + + loadExistingAccessor := func() *clusterAccessor { + t.lock.RLock() + defer t.lock.RUnlock() + return t.clusterAccessors[cluster] + } + storeAccessor := func(a *clusterAccessor) { + t.lock.Lock() + defer t.lock.Unlock() + t.clusterAccessors[cluster] = a + } + + // if the accessor exists, return early + a := loadExistingAccessor() if a != nil { return a, nil } + // No cluster exists, we might need to initialize one. + // Lock on the cluster to ensure only one accessor is initialized for the cluster. + unlockCluster := t.clusterLock.Lock(cluster) + defer unlockCluster() + + // While we were waiting on the cluster lock, a different goroutine holding the lock might have initialized the accessor + // for this cluster successfully. If this is the case we return it. + a = loadExistingAccessor() + if a != nil { + return a, nil + } + + // We are the one who needs to initialize it. + log.V(4).Info("creating new cluster accessor") a, err := t.newClusterAccessor(ctx, cluster, indexes...) if err != nil { + log.V(4).Info("error creating new cluster accessor") return nil, errors.Wrap(err, "error creating client and cache for remote cluster") } - - t.clusterAccessors[cluster] = a - + log.V(4).Info("storing new cluster accessor") + storeAccessor(a) return a, nil } @@ -199,7 +226,12 @@ func (t *ClusterCacheTracker) newClusterAccessor(ctx context.Context, cluster cl // Start the cache!!! go cache.Start(cacheCtx) //nolint:errcheck - if !cache.WaitForCacheSync(cacheCtx) { + + // Wait until the cache is initially synced + cacheSyncCtx, cacheSyncCtxCancel := context.WithTimeout(ctx, initialCacheSyncTimeout) + defer cacheSyncCtxCancel() + if !cache.WaitForCacheSync(cacheSyncCtx) { + cache.Stop() return nil, fmt.Errorf("failed waiting for cache for remote cluster %v to sync: %w", cluster, err) } @@ -277,14 +309,14 @@ func (t *ClusterCacheTracker) Watch(ctx context.Context, input WatchInput) error return errors.New("input.Name is required") } - t.lock.Lock() - defer t.lock.Unlock() - - a, err := t.getClusterAccessorLH(ctx, input.Cluster, t.indexes...) + a, err := t.getClusterAccessor(ctx, input.Cluster, t.indexes...) if err != nil { return err } + unlock := t.clusterLock.Lock(input.Cluster) + defer unlock() + if a.watches.Has(input.Name) { t.log.V(6).Info("Watch already exists", "namespace", input.Cluster.Namespace, "cluster", input.Cluster.Name, "name", input.Name) return nil diff --git a/controllers/remote/keyedmutex.go b/controllers/remote/keyedmutex.go new file mode 100644 index 000000000000..b9b5ec005206 --- /dev/null +++ b/controllers/remote/keyedmutex.go @@ -0,0 +1,75 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import "sync" + +// keyedMutex is a mutex locking on the key provided to the Lock function. +// Only one caller can hold the lock for a specific key at a time. +type keyedMutex struct { + locksMtx sync.Mutex + locks map[interface{}]*keyLock +} + +// newKeyedMutex creates a new keyed mutex ready for use. +func newKeyedMutex() *keyedMutex { + return &keyedMutex{ + locks: make(map[interface{}]*keyLock), + } +} + +// keyLock is the lock for a single specific key. +type keyLock struct { + sync.Mutex + // users is the number of callers attempting to acquire the mutex, including the one currently holding it. + users uint +} + +// unlock unlocks a currently locked key. +type unlock func() + +// Lock locks the passed in key, blocking if the key is locked. +// Returns the unlock function to release the lock on the key. +func (k *keyedMutex) Lock(key interface{}) unlock { + // Get an existing keyLock for the key or create a new one and increase the number of users. + l := func() *keyLock { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + l, ok := k.locks[key] + if !ok { + l = &keyLock{} + k.locks[key] = l + } + l.users++ + return l + }() + + l.Lock() + + // Unlocks the keyLock for the key, decreases the counter and removes the keyLock from the map if there are no more users left. + return func() { + k.locksMtx.Lock() + defer k.locksMtx.Unlock() + + l.Unlock() + l.users-- + if l.users == 0 { + delete(k.locks, key) + } + } +} diff --git a/controllers/remote/keyedmutex_test.go b/controllers/remote/keyedmutex_test.go new file mode 100644 index 000000000000..1e2caca84402 --- /dev/null +++ b/controllers/remote/keyedmutex_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package remote + +import ( + "testing" + + . "github.com/onsi/gomega" +) + +func TestKeyedMutex(t *testing.T) { + t.Run("blocks on a locked key until unlocked", func(t *testing.T) { + t.Parallel() + g := NewWithT(t) + + routineStarted := make(chan bool) + routineCompleted := make(chan bool) + key := "key1" + + km := newKeyedMutex() + unlock := km.Lock(key) + + // start a routine which tries to lock the same key + go func() { + routineStarted <- true + unlock := km.Lock(key) + unlock() + routineCompleted <- true + }() + + <-routineStarted + g.Consistently(routineCompleted).ShouldNot(Receive()) + + // routine should be able to acquire the lock for the key after we unlock + unlock() + g.Eventually(routineCompleted).Should(Receive()) + + // ensure that the lock was cleaned up from the internal map + g.Expect(km.locks).To(HaveLen(0)) + }) + + t.Run("can lock different keys without blocking", func(t *testing.T) { + g := NewWithT(t) + km := newKeyedMutex() + keys := []string{"a", "b", "c", "d"} + unlocks := make([]unlock, 0, len(keys)) + + // lock all keys + for _, key := range keys { + unlocks = append(unlocks, km.Lock(key)) + } + + // unlock all keys + for _, unlock := range unlocks { + unlock() + } + + // ensure that the lock was cleaned up from the internal map + g.Expect(km.locks).To(HaveLen(0)) + }) +}