From 422bf120e2dc6b9b0994107fe1595f93c78af8d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miloslav=20Trma=C4=8D?= Date: Mon, 29 May 2023 23:14:03 +0200 Subject: [PATCH] WIP: Only obtain a bearer token once at a time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently, on pushes, we can start several concurrent layer pushes; each one will check for a bearer token in tokenCache, find none, and ask the server for one, and then write it into the cache. So, we can hammer the server with 6 basically-concurrent token requests. That's unnecessary, slower than just asking once, and potentially might impact rate limiting heuristics. Instead, serialize writes to a bearerToken so that we only have one request in flight at a time. This does not apply to pulls, where the first request is for a manifest; that obtains a token, so subsequent concurrent layer pulls will not request a token again. WIP: Clean up the debugging log entries. Signed-off-by: Miloslav Trmač --- docker/docker_client.go | 95 ++++++++++++++++++++++++++++++++--------- 1 file changed, 75 insertions(+), 20 deletions(-) diff --git a/docker/docker_client.go b/docker/docker_client.go index 5f740d80a..75ce9e17f 100644 --- a/docker/docker_client.go +++ b/docker/docker_client.go @@ -32,6 +32,7 @@ import ( digest "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" "github.com/sirupsen/logrus" + "golang.org/x/sync/semaphore" ) const ( @@ -84,8 +85,19 @@ type extensionSignatureList struct { Signatures []extensionSignature `json:"signatures"` } -// bearerToken records a cached token we can use to authenticate. +// bearerToken records a cached token we can use to authenticate, or a pending process to obtain one. +// +// The goroutine obtaining the token holds lock to block concurrent token requests, and fills the structure (err and possibly the other fields) +// before releasing the lock. +// Other goroutines obtain lock to block on the token request, if any; and then inspect err to see if the token is usable. +// If it is not, they try to get a new one. type bearerToken struct { + // lock is held while obtaining the token. Potentially nested inside dockerClient.tokenCacheLock. + // This is a counting semaphore only because we need a cancellable lock operation. + lock *semaphore.Weighted + + // The following fields can only be accessed with lock held. + err error // nil if the token was successfully obtained (but may be expired); an error if the next lock holder _must_ obtain a new token. token string expirationTime time.Time } @@ -115,7 +127,7 @@ type dockerClient struct { supportsSignatures bool // Private state for setupRequestAuth (key: string, value: bearerToken) - tokenCacheLock sync.Mutex // Protects tokenCache + tokenCacheLock sync.Mutex // Protects tokenCache. tokenCache map[string]*bearerToken // Private state for detectProperties: detectPropertiesOnce sync.Once // detectPropertiesOnce is used to execute detectProperties() at most once. @@ -741,31 +753,74 @@ func (c *dockerClient) obtainBearerToken(ctx context.Context, challenge challeng scopes = append(scopes, *extraScope) } - var token *bearerToken - var inCache bool - func() { // A scope for defer + logrus.Debugf("REMOVE: Checking token cache for key %q", cacheKey) + token, newEntry, err := func() (*bearerToken, bool, error) { // A scope for defer c.tokenCacheLock.Lock() defer c.tokenCacheLock.Unlock() - token, inCache = c.tokenCache[cacheKey] - }() - if !inCache || time.Now().After(token.expirationTime) { - token = &bearerToken{} - - var err error - if c.auth.IdentityToken != "" { - err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes) + token, ok := c.tokenCache[cacheKey] + if ok { + return token, false, nil } else { - err = c.getBearerToken(ctx, token, challenge, scopes) + logrus.Debugf("REMOVE: No token cache for key %q, allocating one…", cacheKey) + token = &bearerToken{ + lock: semaphore.NewWeighted(1), + } + // If this is a new *bearerToken, lock the entry before adding it to the cache, so that any other goroutine that finds + // this entry blocks until we obtain the token for the first time, and does not see an empty object + // (and does not try to obtain the token itself when we are going to do so). + if err := token.lock.Acquire(ctx, 1); err != nil { + // We do not block on this Acquire, so we don’t really expect to fail here — but if ctx is canceled, + // there is no point in trying to continue anyway. + return nil, false, err + } + c.tokenCache[cacheKey] = token + return token, true, nil } - if err != nil { + }() + if err != nil { + return "", err + } + if !newEntry { + // If this is an existing *bearerToken, obtain the lock only after releasing c.tokenCacheLock, + // so that users of other cacheKey values are not blocked for the whole duration of our HTTP roundtrip. + logrus.Debugf("REMOVE: Found existing token cache for key %q, getting lock", cacheKey) + if err := token.lock.Acquire(ctx, 1); err != nil { return "", err } + logrus.Debugf("REMOVE: Locked existing token cache for key %q", cacheKey) + } - func() { // A scope for defer - c.tokenCacheLock.Lock() - defer c.tokenCacheLock.Unlock() - c.tokenCache[cacheKey] = token - }() + defer token.lock.Release(1) + + // Determine if the bearerToken is usable: if it is not, log the cause and fall through, otherwise return early. + switch { + case newEntry: + logrus.Debugf("REMOVE: New token cache entry for key %q, getting first token", cacheKey) + case token.err != nil: + // If obtaining a token fails for any reason, the request that triggered that will fail; + // other requests will see token.err and try obtaining their own token, one goroutine at a time. + // (Consider that a request can fail because a very short timeout was provided to _that one operation_ using a context.Context; + // that clearly shouldn’t prevent other operations from trying with a longer timeout.) + // + // If we got here while holding token.lock, we are the goroutine responsible for trying again; others are blocked + // on token.lock. + logrus.Debugf("REMOVE: Token cache for key %q records failure %v, getting new token", cacheKey, token.err) + case time.Now().After(token.expirationTime): + logrus.Debugf("REMOVE: Token cache for key %q is expired, getting new token", cacheKey) + + default: + return token.token, nil + } + + if c.auth.IdentityToken != "" { + err = c.getBearerTokenOAuth2(ctx, token, challenge, scopes) + } else { + err = c.getBearerToken(ctx, token, challenge, scopes) + } + logrus.Debugf("REMOVE: Obtaining a token for key %q, error %v", cacheKey, err) + token.err = err + if token.err != nil { + return "", token.err } return token.token, nil }