diff --git a/go.mod b/go.mod index db6fcdfa298..b500cd1be2a 100644 --- a/go.mod +++ b/go.mod @@ -45,6 +45,7 @@ require ( k8s.io/pod-security-admission v0.0.0 k8s.io/utils v0.0.0-20230726121419-3b25d923346b sigs.k8s.io/controller-runtime v0.17.2 + golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 ) require ( diff --git a/go.sum b/go.sum index 675123945f4..4d281a542f7 100644 --- a/go.sum +++ b/go.sum @@ -1935,11 +1935,13 @@ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.2.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/kms/secretskms.go b/internal/kms/secretskms.go index 4b4866c79f1..e103190bdf8 100644 --- a/internal/kms/secretskms.go +++ b/internal/kms/secretskms.go @@ -29,6 +29,7 @@ import ( "github.com/ceph/ceph-csi/internal/util/k8s" "golang.org/x/crypto/scrypt" + "golang.org/x/sync/semaphore" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -48,6 +49,8 @@ const ( metadataSecretNamespaceKey = "secretNamespace" ) +var scryptSem = semaphore.NewWeighted(int64(1)) + // secretsKMS is default KMS implementation that means no KMS is in use. type secretsKMS struct { integratedDEK @@ -271,6 +274,13 @@ func (kms secretsMetadataKMS) GetSecret(volumeID string) (string, error) { // generateCipher returns a AEAD cipher based on a passphrase and salt // (volumeID). The cipher can then be used to encrypt/decrypt the DEK. func generateCipher(passphrase, salt string) (cipher.AEAD, error) { + // Note: This is memory heavy! + // Acquire blocks concurrent access so that only 1 worker can call scrypt.Key at a time. + if err := scryptSem.Acquire(context.TODO(), 1); err != nil { + return nil, err + } + defer scryptSem.Release(1) + key, err := scrypt.Key([]byte(passphrase), []byte(salt), 32768, 8, 1, 32) if err != nil { return nil, err diff --git a/internal/kms/secretskms_test.go b/internal/kms/secretskms_test.go index 835d00db4b3..947a74c575a 100644 --- a/internal/kms/secretskms_test.go +++ b/internal/kms/secretskms_test.go @@ -62,6 +62,27 @@ func TestGenerateCipher(t *testing.T) { assert.NotNil(t, aead) } +func TestGenerateCipherConcurrent(t *testing.T) { + t.Parallel() + // nolint:gosec // this passphrase is intentionally hardcoded + passphrase := "my-cool-luks-passphrase" + salt := "unique-id-for-the-volume" + + runGenerateCipher := func(passphrase string, salt string) { + aead, err := generateCipher(passphrase, salt) + assert.NoError(t, err) + assert.NotNil(t, aead) + } + + for i := 0; i < 5; i++ { + go runGenerateCipher(passphrase, salt) + } + + for i := 0; i < 5; i++ { + runGenerateCipher(passphrase, salt) + } +} + func TestInitSecretsMetadataKMS(t *testing.T) { t.Parallel() args := ProviderInitArgs{ diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go new file mode 100644 index 00000000000..30f632c577b --- /dev/null +++ b/vendor/golang.org/x/sync/semaphore/semaphore.go @@ -0,0 +1,136 @@ +// Copyright 2017 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package semaphore provides a weighted semaphore implementation. +package semaphore // import "golang.org/x/sync/semaphore" + +import ( + "container/list" + "context" + "sync" +) + +type waiter struct { + n int64 + ready chan<- struct{} // Closed when semaphore acquired. +} + +// NewWeighted creates a new weighted semaphore with the given +// maximum combined weight for concurrent access. +func NewWeighted(n int64) *Weighted { + w := &Weighted{size: n} + return w +} + +// Weighted provides a way to bound concurrent access to a resource. +// The callers can request access with a given weight. +type Weighted struct { + size int64 + cur int64 + mu sync.Mutex + waiters list.List +} + +// Acquire acquires the semaphore with a weight of n, blocking until resources +// are available or ctx is done. On success, returns nil. On failure, returns +// ctx.Err() and leaves the semaphore unchanged. +// +// If ctx is already done, Acquire may still succeed without blocking. +func (s *Weighted) Acquire(ctx context.Context, n int64) error { + s.mu.Lock() + if s.size-s.cur >= n && s.waiters.Len() == 0 { + s.cur += n + s.mu.Unlock() + return nil + } + + if n > s.size { + // Don't make other Acquire calls block on one that's doomed to fail. + s.mu.Unlock() + <-ctx.Done() + return ctx.Err() + } + + ready := make(chan struct{}) + w := waiter{n: n, ready: ready} + elem := s.waiters.PushBack(w) + s.mu.Unlock() + + select { + case <-ctx.Done(): + err := ctx.Err() + s.mu.Lock() + select { + case <-ready: + // Acquired the semaphore after we were canceled. Rather than trying to + // fix up the queue, just pretend we didn't notice the cancelation. + err = nil + default: + isFront := s.waiters.Front() == elem + s.waiters.Remove(elem) + // If we're at the front and there're extra tokens left, notify other waiters. + if isFront && s.size > s.cur { + s.notifyWaiters() + } + } + s.mu.Unlock() + return err + + case <-ready: + return nil + } +} + +// TryAcquire acquires the semaphore with a weight of n without blocking. +// On success, returns true. On failure, returns false and leaves the semaphore unchanged. +func (s *Weighted) TryAcquire(n int64) bool { + s.mu.Lock() + success := s.size-s.cur >= n && s.waiters.Len() == 0 + if success { + s.cur += n + } + s.mu.Unlock() + return success +} + +// Release releases the semaphore with a weight of n. +func (s *Weighted) Release(n int64) { + s.mu.Lock() + s.cur -= n + if s.cur < 0 { + s.mu.Unlock() + panic("semaphore: released more than held") + } + s.notifyWaiters() + s.mu.Unlock() +} + +func (s *Weighted) notifyWaiters() { + for { + next := s.waiters.Front() + if next == nil { + break // No more waiters blocked. + } + + w := next.Value.(waiter) + if s.size-s.cur < w.n { + // Not enough tokens for the next waiter. We could keep going (to try to + // find a waiter with a smaller request), but under load that could cause + // starvation for large requests; instead, we leave all remaining waiters + // blocked. + // + // Consider a semaphore used as a read-write lock, with N tokens, N + // readers, and one writer. Each reader can Acquire(1) to obtain a read + // lock. The writer can Acquire(N) to obtain a write lock, excluding all + // of the readers. If we allow the readers to jump ahead in the queue, + // the writer will starve — there is always one token available for every + // reader. + break + } + + s.cur += w.n + s.waiters.Remove(next) + close(w.ready) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 647f8c99810..1b4ffac15e9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -681,6 +681,9 @@ golang.org/x/net/websocket ## explicit; go 1.18 golang.org/x/oauth2 golang.org/x/oauth2/internal +# golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 +## explicit +golang.org/x/sync/semaphore # golang.org/x/sync v0.5.0 ## explicit; go 1.18 golang.org/x/sync/singleflight