diff --git a/cmd/registry/config-cache.yml b/cmd/registry/config-cache.yml index d648303d9c..72da9d6029 100644 --- a/cmd/registry/config-cache.yml +++ b/cmd/registry/config-cache.yml @@ -12,6 +12,8 @@ storage: maintenance: uploadpurging: enabled: false + tag: + concurrencylimit: 8 http: addr: :5000 secret: asecretforlocaldevelopment diff --git a/cmd/registry/config-dev.yml b/cmd/registry/config-dev.yml index 9bf36583ea..68fd83c8f7 100644 --- a/cmd/registry/config-dev.yml +++ b/cmd/registry/config-dev.yml @@ -14,6 +14,8 @@ storage: maintenance: uploadpurging: enabled: false + tag: + concurrencylimit: 8 http: addr: :5000 debug: diff --git a/cmd/registry/config-example.yml b/cmd/registry/config-example.yml index c760cd567f..2cb402062f 100644 --- a/cmd/registry/config-example.yml +++ b/cmd/registry/config-example.yml @@ -7,6 +7,8 @@ storage: blobdescriptor: inmemory filesystem: rootdirectory: /var/lib/registry + tag: + concurrencylimit: 8 http: addr: :5000 headers: diff --git a/configuration/configuration.go b/configuration/configuration.go index 427081977c..aa74e3cb4e 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -441,6 +441,8 @@ func (storage Storage) Type() string { // allow configuration of delete case "redirect": // allow configuration of redirect + case "tag": + // allow configuration of tag default: storageType = append(storageType, k) } @@ -454,6 +456,19 @@ func (storage Storage) Type() string { return "" } +// TagParameters returns the Parameters map for a Storage tag configuration +func (storage Storage) TagParameters() Parameters { + return storage["tag"] +} + +// setTagParameter changes the parameter at the provided key to the new value +func (storage Storage) setTagParameter(key string, value interface{}) { + if _, ok := storage["tag"]; !ok { + storage["tag"] = make(Parameters) + } + storage["tag"][key] = value +} + // Parameters returns the Parameters map for a Storage configuration func (storage Storage) Parameters() Parameters { return storage[storage.Type()] @@ -482,6 +497,8 @@ func (storage *Storage) UnmarshalYAML(unmarshal func(interface{}) error) error { // allow configuration of delete case "redirect": // allow configuration of redirect + case "tag": + // allow configuration of tag default: types = append(types, k) } diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 2139f8f1a9..e3bf1bf1c9 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -39,6 +39,9 @@ var configStruct = Configuration{ "url1": "https://foo.example.com", "path1": "/some-path", }, + "tag": Parameters{ + "concurrencylimit": 10, + }, }, Auth: Auth{ "silly": Parameters{ @@ -167,6 +170,8 @@ storage: int1: 42 url1: "https://foo.example.com" path1: "/some-path" + tag: + concurrencylimit: 10 auth: silly: realm: silly @@ -542,6 +547,9 @@ func copyConfig(config Configuration) *Configuration { for k, v := range config.Storage.Parameters() { configCopy.Storage.setParameter(k, v) } + for k, v := range config.Storage.TagParameters() { + configCopy.Storage.setTagParameter(k, v) + } configCopy.Auth = Auth{config.Auth.Type(): Parameters{}} for k, v := range config.Auth.Parameters() { diff --git a/docs/content/about/configuration.md b/docs/content/about/configuration.md index 482a40ca74..645332b8e7 100644 --- a/docs/content/about/configuration.md +++ b/docs/content/about/configuration.md @@ -141,6 +141,8 @@ storage: usedualstack: false loglevel: debug inmemory: # This driver takes no parameters + tag: + concurrencylimit: 8 delete: enabled: false redirect: @@ -521,6 +523,25 @@ parameter sets a limit on the number of descriptors to store in the cache. The default value is 10000. If this parameter is set to 0, the cache is allowed to grow with no size limit. +### `tag` + +The `tag` subsection provides configuration to set concurrency limit for tag lookup. +When user calls into the registry to delete the manifest, which in turn then does a +lookup for all tags that reference the deleted manifest. To find the tag references, +the registry will iterate every tag in the repository and read it's link file to check +if it matches the deleted manifest (i.e. to see if uses the same sha256 digest). +So, the more tags in repository, the worse the performance will be (as there will +be more S3 API calls occurring for the tag directory lookups and tag file reads if +using S3 storage driver). + +Therefore, add a single flag `concurrencylimit` to set concurrency limit to optimize tag +lookup performance under the `tag` section. When a value is not provided, `GOMAXPROCS` will be used. + +```yaml +tag: + concurrencylimit: 8 +``` + ### `redirect` The `redirect` subsection provides configuration for managing redirects from diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 5a9620829d..a8956372c6 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -188,6 +188,21 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { } } + // configure tag lookup concurrency limit + if p := config.Storage.TagParameters(); p != nil { + l, ok := p["concurrencylimit"] + if ok { + limit, ok := l.(int) + if !ok { + panic("tag lookup concurrency limit config key must have a integer value") + } + if limit <= 0 { + panic("tag lookup concurrency limit should be a positive integer value") + } + options = append(options, storage.TagLookupConcurrencyLimit(limit)) + } + } + // configure redirects var redirectDisabled bool if redirectConfig, ok := config.Storage["redirect"]; ok { diff --git a/registry/handlers/manifests.go b/registry/handlers/manifests.go index 704a8ab7f2..144c04482e 100644 --- a/registry/handlers/manifests.go +++ b/registry/handlers/manifests.go @@ -6,6 +6,7 @@ import ( "mime" "net/http" "strings" + "sync" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/internal/dcontext" @@ -13,11 +14,13 @@ import ( "github.com/distribution/distribution/v3/manifest/ocischema" "github.com/distribution/distribution/v3/manifest/schema2" "github.com/distribution/distribution/v3/registry/api/errcode" + "github.com/distribution/distribution/v3/registry/storage" "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/reference" "github.com/gorilla/handlers" "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" + "golang.org/x/sync/errgroup" ) const ( @@ -481,12 +484,26 @@ func (imh *manifestHandler) DeleteManifest(w http.ResponseWriter, r *http.Reques return } + var ( + errs []error + mu sync.Mutex + ) + g := errgroup.Group{} + g.SetLimit(storage.DefaultConcurrencyLimit) for _, tag := range referencedTags { - if err := tagService.Untag(imh, tag); err != nil { - imh.Errors = append(imh.Errors, err) - return - } + tag := tag + + g.Go(func() error { + if err := tagService.Untag(imh, tag); err != nil { + mu.Lock() + errs = append(errs, err) + mu.Unlock() + } + return nil + }) } + _ = g.Wait() // imh will record all errors, so ignore the error of Wait() + imh.Errors = errs w.WriteHeader(http.StatusAccepted) } diff --git a/registry/storage/registry.go b/registry/storage/registry.go index ecf483bf9c..5bc5295c9d 100644 --- a/registry/storage/registry.go +++ b/registry/storage/registry.go @@ -3,6 +3,7 @@ package storage import ( "context" "regexp" + "runtime" "github.com/distribution/distribution/v3" "github.com/distribution/distribution/v3/registry/storage/cache" @@ -10,6 +11,10 @@ import ( "github.com/distribution/reference" ) +var ( + DefaultConcurrencyLimit = runtime.GOMAXPROCS(0) +) + // registry is the top-level implementation of Registry for use in the storage // package. All instances should descend from this object. type registry struct { @@ -18,6 +23,7 @@ type registry struct { statter *blobStatter // global statter service. blobDescriptorCacheProvider cache.BlobDescriptorCacheProvider deleteEnabled bool + tagLookupConcurrencyLimit int resumableDigestEnabled bool blobDescriptorServiceFactory distribution.BlobDescriptorServiceFactory manifestURLs manifestURLs @@ -40,6 +46,13 @@ func EnableRedirect(registry *registry) error { return nil } +func TagLookupConcurrencyLimit(concurrencyLimit int) RegistryOption { + return func(registry *registry) error { + registry.tagLookupConcurrencyLimit = concurrencyLimit + return nil + } +} + // EnableDelete is a functional option for NewRegistry. It enables deletion on // the registry. func EnableDelete(registry *registry) error { @@ -184,9 +197,14 @@ func (repo *repository) Named() reference.Named { } func (repo *repository) Tags(ctx context.Context) distribution.TagService { + limit := DefaultConcurrencyLimit + if repo.tagLookupConcurrencyLimit > 0 { + limit = repo.tagLookupConcurrencyLimit + } tags := &tagStore{ - repository: repo, - blobStore: repo.registry.blobStore, + repository: repo, + blobStore: repo.registry.blobStore, + concurrencyLimit: limit, } return tags diff --git a/registry/storage/tagstore.go b/registry/storage/tagstore.go index 29dcf4e3f1..a481df94df 100644 --- a/registry/storage/tagstore.go +++ b/registry/storage/tagstore.go @@ -4,10 +4,13 @@ import ( "context" "path" "sort" + "sync" + + "github.com/opencontainers/go-digest" + "golang.org/x/sync/errgroup" "github.com/distribution/distribution/v3" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" - "github.com/opencontainers/go-digest" ) var _ distribution.TagService = &tagStore{} @@ -18,8 +21,9 @@ var _ distribution.TagService = &tagStore{} // which only makes use of the Digest field of the returned distribution.Descriptor // but does not enable full roundtripping of Descriptor objects type tagStore struct { - repository *repository - blobStore *blobStore + repository *repository + blobStore *blobStore + concurrencyLimit int } // All returns all tags @@ -145,26 +149,48 @@ func (ts *tagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([ return nil, err } - var tags []string + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(ts.concurrencyLimit) + + var ( + tags []string + mu sync.Mutex + ) for _, tag := range allTags { - tagLinkPathSpec := manifestTagCurrentPathSpec{ - name: ts.repository.Named().Name(), - tag: tag, + if ctx.Err() != nil { + break } + tag := tag - tagLinkPath, _ := pathFor(tagLinkPathSpec) - tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) - if err != nil { - switch err.(type) { - case storagedriver.PathNotFoundError: - continue + g.Go(func() error { + tagLinkPathSpec := manifestTagCurrentPathSpec{ + name: ts.repository.Named().Name(), + tag: tag, } - return nil, err - } - if tagDigest == desc.Digest { - tags = append(tags, tag) - } + tagLinkPath, _ := pathFor(tagLinkPathSpec) + tagDigest, err := ts.blobStore.readlink(ctx, tagLinkPath) + if err != nil { + switch err.(type) { + case storagedriver.PathNotFoundError: + return nil + } + return err + } + + if tagDigest == desc.Digest { + mu.Lock() + tags = append(tags, tag) + mu.Unlock() + } + + return nil + }) + } + + err = g.Wait() + if err != nil { + return nil, err } return tags, nil