From d38096a3f936125f2f59db79d4535f4949104bcb Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 23 Nov 2023 06:58:27 +0000 Subject: [PATCH 1/6] feat: add cache provider implementation currently the only two implementation of cache are redis and inmemory. This commit introduces cache providers that let you implement your own blbodescriptor cache of choice and plug it in. Co-authored-by: Gladkov Alexey Signed-off-by: Milos Gajdos --- docs/content/about/configuration.md | 1 + registry/handlers/app.go | 16 +++++++- .../storage/cache/provider/cacheprovider.go | 39 +++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 registry/storage/cache/provider/cacheprovider.go diff --git a/docs/content/about/configuration.md b/docs/content/about/configuration.md index 4107c0b3ae..762883ea7a 100644 --- a/docs/content/about/configuration.md +++ b/docs/content/about/configuration.md @@ -507,6 +507,7 @@ Use the `cache` structure to enable caching of data accessed in the storage backend. Currently, the only available cache provides fast access to layer metadata, which uses the `blobdescriptor` field if configured. +Currently the only two available cache implementations are `redis` and `inmemory`. You can set `blobdescriptor` field to `redis` or `inmemory`. If set to `redis`,a Redis pool caches layer metadata. If set to `inmemory`, an in-memory map caches layer metadata. diff --git a/registry/handlers/app.go b/registry/handlers/app.go index fb8e9dd298..8ca1e0da49 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -32,6 +32,7 @@ import ( "github.com/distribution/distribution/v3/registry/proxy" "github.com/distribution/distribution/v3/registry/storage" memorycache "github.com/distribution/distribution/v3/registry/storage/cache/memory" + cacheprovider "github.com/distribution/distribution/v3/registry/storage/cache/provider" rediscache "github.com/distribution/distribution/v3/registry/storage/cache/redis" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver/factory" @@ -279,7 +280,20 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { dcontext.GetLogger(app).Infof("using inmemory blob descriptor cache") default: if v != "" { - dcontext.GetLogger(app).Warnf("unknown cache type %q, caching disabled", config.Storage["cache"]) + name, ok := v.(string) + if !ok { + panic(fmt.Sprintf("unexpected type of value %T (string expected)", v)) + } + cacheProvider, err := cacheprovider.Get(app, name, cc) + if err != nil { + panic("unable to initialize cache provider: " + err.Error()) + } + localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) + app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) + if err != nil { + panic("could not create registry: " + err.Error()) + } + dcontext.GetLogger(app).Infof("using %s blob descriptor cache", name) } } } diff --git a/registry/storage/cache/provider/cacheprovider.go b/registry/storage/cache/provider/cacheprovider.go new file mode 100644 index 0000000000..49408b9cce --- /dev/null +++ b/registry/storage/cache/provider/cacheprovider.go @@ -0,0 +1,39 @@ +package cacheprovider + +import ( + "context" + "fmt" + + "github.com/distribution/distribution/v3/registry/storage/cache" +) + +// InitFunc is the type of a CacheProvider factory function and is +// used to register the constructor for different CacheProvider backends. +type InitFunc func(ctx context.Context, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) + +var cacheProviders map[string]InitFunc + +// Register is used to register an InitFunc for +// a CacheProvider backend with the given name. +func Register(name string, initFunc InitFunc) error { + if cacheProviders == nil { + cacheProviders = make(map[string]InitFunc) + } + if _, exists := cacheProviders[name]; exists { + return fmt.Errorf("name already registered: %s", name) + } + + cacheProviders[name] = initFunc + + return nil +} + +// Get constructs a CacheProvider with the given options using the named backend. +func Get(ctx context.Context, name string, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) { + if cacheProviders != nil { + if initFunc, exists := cacheProviders[name]; exists { + return initFunc(ctx, options) + } + } + return nil, fmt.Errorf("no cache Provider registered with name: %s", name) +} From 30f8cf74b69f51054825642e22b68ed70be41690 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 7 Dec 2023 23:17:17 +0000 Subject: [PATCH 2/6] Update registry/storage/cache/provider/cacheprovider.go Co-authored-by: Cory Snider Signed-off-by: Milos Gajdos --- registry/storage/cache/provider/cacheprovider.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/registry/storage/cache/provider/cacheprovider.go b/registry/storage/cache/provider/cacheprovider.go index 49408b9cce..94ec29e3ec 100644 --- a/registry/storage/cache/provider/cacheprovider.go +++ b/registry/storage/cache/provider/cacheprovider.go @@ -30,10 +30,8 @@ func Register(name string, initFunc InitFunc) error { // Get constructs a CacheProvider with the given options using the named backend. func Get(ctx context.Context, name string, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) { - if cacheProviders != nil { - if initFunc, exists := cacheProviders[name]; exists { - return initFunc(ctx, options) - } + if initFunc, exists := cacheProviders[name]; exists { + return initFunc(ctx, options) } return nil, fmt.Errorf("no cache Provider registered with name: %s", name) } From 90665beea672d180c909426a8af7906ad02a57a7 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Thu, 7 Dec 2023 23:35:39 +0000 Subject: [PATCH 3/6] update: make cacheprovider.Register panic on error This func is meant to be used from init funcs of specific cacheprovider implementations. If they fail to register we should panic. Signed-off-by: Milos Gajdos --- registry/storage/cache/provider/cacheprovider.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/registry/storage/cache/provider/cacheprovider.go b/registry/storage/cache/provider/cacheprovider.go index 94ec29e3ec..1beb5c6575 100644 --- a/registry/storage/cache/provider/cacheprovider.go +++ b/registry/storage/cache/provider/cacheprovider.go @@ -15,17 +15,17 @@ var cacheProviders map[string]InitFunc // Register is used to register an InitFunc for // a CacheProvider backend with the given name. -func Register(name string, initFunc InitFunc) error { +// It's meant to be called from init() function +// of the cache provider. +func Register(name string, initFunc InitFunc) { if cacheProviders == nil { cacheProviders = make(map[string]InitFunc) } if _, exists := cacheProviders[name]; exists { - return fmt.Errorf("name already registered: %s", name) + panic(fmt.Sprintf("name already registered: %s", name)) } cacheProviders[name] = initFunc - - return nil } // Get constructs a CacheProvider with the given options using the named backend. From 15bdb51221136390bacc40aff1392fe410d21d2a Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 8 Dec 2023 19:08:32 +0000 Subject: [PATCH 4/6] refactor: complete refactoring of cache providers and cache config This commit reworks cache provider setup and configuration. Cache now has a dedicated configuration structure instead of being part of the storage configuration. This simplifies the code quite a bit at the expense of sad configuration decoding. This commit now allows for adding of new cache providers should there be such need in the future. Currently we continue to provide only redis and inmemory cache. In line with this the existing cache implementation: inmemory and redis have now been "turned" into cache providers which also simplifies the code. Cache configuration is now expected to be set the following options: provider - name of the cache provider params - arbitrary set of config params for the given cache provider. Each cache provider must register itself via init func so it's loaded on startup and available if the config requires it. Redis configuration has been removed from the main configuration because its only ever been used for configuring caches and nothing else so it makes no sense to keep it in the global config. Documentation has been updated accordingly. Signed-off-by: Milos Gajdos --- cmd/registry/config-cache.yml | 23 +- cmd/registry/config-dev.yml | 5 +- cmd/registry/config-example.yml | 5 +- cmd/registry/main.go | 2 + configuration/configuration.go | 69 +++++- configuration/configuration_test.go | 89 ++++--- docs/content/about/configuration.md | 223 +++++++++--------- internal/client/repository.go | 7 +- notifications/listener_test.go | 8 +- registry/handlers/app.go | 125 ++-------- registry/handlers/app_test.go | 8 +- registry/proxy/proxyblobstore_test.go | 14 +- registry/proxy/proxymanifeststore_test.go | 16 +- registry/storage/blob_test.go | 43 +++- registry/storage/cache/cache.go | 29 +++ registry/storage/cache/memory/memory.go | 37 ++- registry/storage/cache/memory/memory_test.go | 8 +- .../storage/cache/provider/cacheprovider.go | 37 --- registry/storage/cache/redis/redis.go | 130 +++++++++- registry/storage/cache/redis/redis_test.go | 31 +-- registry/storage/catalog_test.go | 16 +- registry/storage/manifeststore_test.go | 26 +- tests/conf-e2e-cloud-storage.yml | 25 +- tests/conf-local-cloud.yml | 5 +- 24 files changed, 622 insertions(+), 359 deletions(-) delete mode 100644 registry/storage/cache/provider/cacheprovider.go diff --git a/cmd/registry/config-cache.yml b/cmd/registry/config-cache.yml index d648303d9c..c2da495219 100644 --- a/cmd/registry/config-cache.yml +++ b/cmd/registry/config-cache.yml @@ -5,13 +5,23 @@ log: service: registry environment: development storage: - cache: - blobdescriptor: redis filesystem: rootdirectory: /var/lib/registry-cache maintenance: uploadpurging: enabled: false +cache: + blobdescriptor: + provider: redis + params: + addr: localhost:6379 + pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s + dialtimeout: 10ms + readtimeout: 10ms + writetimeout: 10ms http: addr: :5000 secret: asecretforlocaldevelopment @@ -19,15 +29,6 @@ http: addr: localhost:5001 headers: X-Content-Type-Options: [nosniff] -redis: - addr: localhost:6379 - pool: - maxidle: 16 - maxactive: 64 - idletimeout: 300s - dialtimeout: 10ms - readtimeout: 10ms - writetimeout: 10ms notifications: events: includereferences: true diff --git a/cmd/registry/config-dev.yml b/cmd/registry/config-dev.yml index 9bf36583ea..01c8164059 100644 --- a/cmd/registry/config-dev.yml +++ b/cmd/registry/config-dev.yml @@ -7,13 +7,14 @@ log: storage: delete: enabled: true - cache: - blobdescriptor: inmemory filesystem: rootdirectory: /var/lib/registry maintenance: uploadpurging: enabled: false +cache: + blobdescriptor: + provider: inmemory http: addr: :5000 debug: diff --git a/cmd/registry/config-example.yml b/cmd/registry/config-example.yml index c760cd567f..dd3db5db90 100644 --- a/cmd/registry/config-example.yml +++ b/cmd/registry/config-example.yml @@ -3,10 +3,11 @@ log: fields: service: registry storage: - cache: - blobdescriptor: inmemory filesystem: rootdirectory: /var/lib/registry +cache: + blobdescriptor: + provider: inmemory http: addr: :5000 headers: diff --git a/cmd/registry/main.go b/cmd/registry/main.go index de160f3014..b2f9d486f7 100644 --- a/cmd/registry/main.go +++ b/cmd/registry/main.go @@ -8,6 +8,8 @@ import ( _ "github.com/distribution/distribution/v3/registry/auth/silly" _ "github.com/distribution/distribution/v3/registry/auth/token" _ "github.com/distribution/distribution/v3/registry/proxy" + _ "github.com/distribution/distribution/v3/registry/storage/cache/memory" + _ "github.com/distribution/distribution/v3/registry/storage/cache/redis" _ "github.com/distribution/distribution/v3/registry/storage/driver/azure" _ "github.com/distribution/distribution/v3/registry/storage/driver/filesystem" _ "github.com/distribution/distribution/v3/registry/storage/driver/gcs" diff --git a/configuration/configuration.go b/configuration/configuration.go index e4d4311d5f..18f9d4fea4 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -55,6 +55,9 @@ type Configuration struct { // Storage is the configuration for the registry's storage driver Storage Storage `yaml:"storage"` + // Cache is the configuration for the registry's storage cache + Cache Cache `yaml:"cache"` + // Auth allows configuration of various authorization methods that may be // used to gate requests. Auth Auth `yaml:"auth,omitempty"` @@ -166,9 +169,6 @@ type Configuration struct { // registry events are dispatched. Notifications Notifications `yaml:"notifications,omitempty"` - // Redis configures the redis pool available to the registry webapp. - Redis Redis `yaml:"redis,omitempty"` - Health Health `yaml:"health,omitempty"` Catalog Catalog `yaml:"catalog,omitempty"` @@ -429,8 +429,6 @@ func (storage Storage) Type() string { switch k { case "maintenance": // allow configuration of maintenance - case "cache": - // allow configuration of caching case "delete": // allow configuration of delete case "redirect": @@ -507,6 +505,67 @@ func (storage Storage) MarshalYAML() (interface{}, error) { return map[string]Parameters(storage), nil } +// Cache defines the configuration for registry cache. +type Cache map[string]Parameters + +// Type returns the cache type, such as memory or redis. +func (cache Cache) Type() string { + // Return only key in this map + for k := range cache { + return k + } + return "" +} + +// Parameters returns the Parameters map for an cache configuration +func (cache Cache) Parameters() Parameters { + return cache[cache.Type()] +} + +// setParameter changes the parameter at the provided key to the new value +func (cache Cache) setParameter(key string, value interface{}) { + cache[cache.Type()][key] = value +} + +// UnmarshalYAML implements the yaml.Unmarshaler interface +// Unmarshals a single item map into a Storage or a string into a Storage type with no parameters +func (cache *Cache) UnmarshalYAML(unmarshal func(interface{}) error) error { + var m map[string]Parameters + err := unmarshal(&m) + if err == nil { + if len(m) > 1 { + types := make([]string, 0, len(m)) + for k := range m { + types = append(types, k) + } + + // TODO(stevvooe): May want to change this slightly for + // authorization to allow multiple challenges. + return fmt.Errorf("must provide exactly one type. Provided: %v", types) + + } + *cache = m + return nil + } + + var cacheType string + err = unmarshal(&cacheType) + if err == nil { + *cache = Cache{cacheType: Parameters{}} + return nil + } + + return err +} + +// MarshalYAML implements the yaml.Marshaler interface +func (cache Cache) MarshalYAML() (interface{}, error) { + if cache.Parameters() == nil { + return cache.Type(), nil + } + return map[string]Parameters(cache), nil +} + // Auth defines the configuration for registry authorization. type Auth map[string]Parameters diff --git a/configuration/configuration_test.go b/configuration/configuration_test.go index 0ac8dbdcac..48308b7f1d 100644 --- a/configuration/configuration_test.go +++ b/configuration/configuration_test.go @@ -44,6 +44,25 @@ var configStruct = Configuration{ "path1": "/some-path", }, }, + Cache: Cache{ + "blobdescriptor": Parameters{ + "provider": "redis", + "params": map[interface{}]interface{}{ + "addr": "localhost:6379", + "username": "alice", + "password": "123456", + "db": 1, + "pool": map[interface{}]interface{}{ + "maxidle": 16, + "maxactive": 64, + "idletimeout": "300s", + }, + "dialtimeout": "10ms", + "readtimeout": "10ms", + "writetimeout": "10ms", + }, + }, + }, Auth: Auth{ "silly": Parameters{ "realm": "silly", @@ -126,24 +145,6 @@ var configStruct = Configuration{ Disabled: false, }, }, - Redis: Redis{ - Addr: "localhost:6379", - Username: "alice", - Password: "123456", - DB: 1, - Pool: struct { - MaxIdle int `yaml:"maxidle,omitempty"` - MaxActive int `yaml:"maxactive,omitempty"` - IdleTimeout time.Duration `yaml:"idletimeout,omitempty"` - }{ - MaxIdle: 16, - MaxActive: 64, - IdleTimeout: time.Second * 300, - }, - DialTimeout: time.Millisecond * 10, - ReadTimeout: time.Millisecond * 10, - WriteTimeout: time.Millisecond * 10, - }, } // configYamlV0_1 is a Version 0.1 yaml document representing configStruct @@ -163,6 +164,21 @@ storage: int1: 42 url1: "https://foo.example.com" path1: "/some-path" +cache: + blobdescriptor: + provider: redis + params: + addr: localhost:6379 + username: alice + password: "123456" + db: 1 + pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s + dialtimeout: 10ms + readtimeout: 10ms + writetimeout: 10ms auth: silly: realm: silly @@ -185,18 +201,6 @@ http: - /path/to/ca.pem headers: X-Content-Type-Options: [nosniff] -redis: - addr: localhost:6379 - username: alice - password: 123456 - db: 1 - pool: - maxidle: 16 - maxactive: 64 - idletimeout: 300s - dialtimeout: 10ms - readtimeout: 10ms - writetimeout: 10ms ` // inmemoryConfigYamlV0_1 is a Version 0.1 yaml document specifying an inmemory @@ -206,6 +210,21 @@ version: 0.1 log: level: info storage: inmemory +cache: + blobdescriptor: + provider: redis + params: + addr: localhost:6379 + username: alice + password: "123456" + db: 1 + pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s + dialtimeout: 10ms + readtimeout: 10ms + writetimeout: 10ms auth: silly: realm: silly @@ -263,7 +282,6 @@ func (suite *ConfigSuite) TestParseSimple(c *check.C) { func (suite *ConfigSuite) TestParseInmemory(c *check.C) { suite.expectedConfig.Storage = Storage{"inmemory": Parameters{}} suite.expectedConfig.Log.Fields = nil - suite.expectedConfig.Redis = Redis{} config, err := Parse(bytes.NewReader([]byte(inmemoryConfigYamlV0_1))) c.Assert(err, check.IsNil) @@ -283,7 +301,7 @@ func (suite *ConfigSuite) TestParseIncomplete(c *check.C) { suite.expectedConfig.Auth = Auth{"silly": Parameters{"realm": "silly"}} suite.expectedConfig.Notifications = Notifications{} suite.expectedConfig.HTTP.Headers = nil - suite.expectedConfig.Redis = Redis{} + suite.expectedConfig.Cache = nil // Note: this also tests that REGISTRY_STORAGE and // REGISTRY_STORAGE_FILESYSTEM_ROOTDIRECTORY can be used together @@ -536,6 +554,11 @@ func copyConfig(config Configuration) *Configuration { configCopy.Storage.setParameter(k, v) } + configCopy.Cache = Cache{config.Cache.Type(): Parameters{}} + for k, v := range config.Cache.Parameters() { + configCopy.Cache.setParameter(k, v) + } + configCopy.Auth = Auth{config.Auth.Type(): Parameters{}} for k, v := range config.Auth.Parameters() { configCopy.Auth.setParameter(k, v) @@ -549,7 +572,5 @@ func copyConfig(config Configuration) *Configuration { configCopy.HTTP.Headers[k] = v } - configCopy.Redis = config.Redis - return configCopy } diff --git a/docs/content/about/configuration.md b/docs/content/about/configuration.md index 762883ea7a..bcd64d2ac8 100644 --- a/docs/content/about/configuration.md +++ b/docs/content/about/configuration.md @@ -145,9 +145,6 @@ storage: enabled: false redirect: disable: false - cache: - blobdescriptor: redis - blobdescriptorsize: 10000 maintenance: uploadpurging: enabled: true @@ -156,6 +153,22 @@ storage: dryrun: false readonly: enabled: false +cache: + blobdescriptor: + provider: redis + params: + addr: localhost:6379 + password: asecret + db: 0 + dialtimeout: 10ms + readtimeout: 10ms + writetimeout: 10ms + pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s + tls: + enabled: false auth: silly: realm: silly-realm @@ -238,19 +251,6 @@ notifications: - application/octet-stream actions: - pull -redis: - addr: localhost:6379 - password: asecret - db: 0 - dialtimeout: 10ms - readtimeout: 10ms - writetimeout: 10ms - pool: - maxidle: 16 - maxactive: 64 - idletimeout: 300s - tls: - enabled: false health: storagedriver: enabled: true @@ -415,9 +415,6 @@ storage: inmemory: delete: enabled: false - cache: - blobdescriptor: inmemory - blobdescriptorsize: 10000 maintenance: uploadpurging: enabled: true @@ -428,6 +425,11 @@ storage: enabled: false redirect: disable: false +cache: + blobdescriptor: + provider: inmemory + params: + size: 10000 ``` The `storage` option is **required** and defines which storage backend is in @@ -501,25 +503,6 @@ delete: enabled: true ``` -### `cache` - -Use the `cache` structure to enable caching of data accessed in the storage -backend. Currently, the only available cache provides fast access to layer -metadata, which uses the `blobdescriptor` field if configured. - -Currently the only two available cache implementations are `redis` and `inmemory`. -You can set `blobdescriptor` field to `redis` or `inmemory`. If set to `redis`,a -Redis pool caches layer metadata. If set to `inmemory`, an in-memory map caches -layer metadata. - -> **NOTE**: Formerly, `blobdescriptor` was known as `layerinfo`. While these -> are equivalent, `layerinfo` has been deprecated. - -If `blobdescriptor` is set to `inmemory`, the optional `blobdescriptorsize` -parameter sets a limit on the number of descriptors to store in the cache. -The default value is 10000. If this parameter is set to 0, the cache is allowed -to grow with no size limit. - ### `redirect` The `redirect` subsection provides configuration for managing redirects from @@ -537,6 +520,102 @@ redirect: disable: true ``` +## `cache` + +Use the `cache` structure to enable caching of data accessed in the storage +backend. Currently, the only avalable cache provides fast access to layer +metadata, which uses the `blobdescriptor`field if configured. + +> **NOTE**: Formerly, `blobdescriptor` was known as `layerinfo`. While these +> are equivalent, `layerinfo` has been deprecated. + +There are curretly two available cache provider implementations: `redis` and `inmemory`. +You can specify the name of the cache provider via the `provider` field and configure +it via the `params` field. + +### `memory` + +If `provider` is set to `inmemory`, the optional `size` parameter sets a limit + on the number of descriptors to store in the cache. The default value is 10000. +If this parameter is set to 0, the cache is allowed to grow with no size limit. + +```yaml +cache: + provider: inmemory + params: + size: 1000 +``` + +### `redis` + +If `provider` is set to `redis`, a Redis pool caches layer metadata. +layer metadata. See the redis configuration options below for all available options. + +```yaml +redis: + addr: localhost:6379 + password: asecret + db: 0 + dialtimeout: 10ms + readtimeout: 10ms + writetimeout: 10ms + pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s + tls: + enabled: false +``` + +Declare parameters for constructing the `redis` connections. Registry instances +may use the Redis instance for several applications. Currently, it caches +information about immutable blobs. Most of the `redis` options control +how the registry connects to the `redis` instance. You can control the pool's +behavior with the [pool](#pool) subsection. Additionally, you can control +TLS connection settings with the [tls](#tls) subsection (in-transit encryption). + +You should configure Redis with the **allkeys-lru** eviction policy, because the +registry does not set an expiration value on keys. + +| Parameter | Required | Description | +|-----------|----------|-------------------------------------------------------| +| `addr` | yes | The address (host and port) of the Redis instance. | +| `password`| no | A password used to authenticate to the Redis instance.| +| `db` | no | The name of the database to use for each connection. | +| `dialtimeout` | no | The timeout for connecting to the Redis instance. | +| `readtimeout` | no | The timeout for reading from the Redis instance. | +| `writetimeout` | no | The timeout for writing to the Redis instance. | + +#### `pool` + +```yaml +pool: + maxidle: 16 + maxactive: 64 + idletimeout: 300s +``` + +Use these settings to configure the behavior of the Redis connection pool. + +| Parameter | Required | Description | +|-----------|----------|-------------------------------------------------------| +| `maxidle` | no | The maximum number of idle connections in the pool. | +| `maxactive`| no | The maximum number of connections which can be open before blocking a connection request. | +| `idletimeout`| no | How long to wait before closing inactive connections. | + +#### `tls` + +```yaml +tls: + enabled: false +``` + +Use these settings to configure Redis TLS. + +| Parameter | Required | Description | +|-----------|----------|-------------------------------------- | +| `enabled` | no | Whether or not to use TLS in-transit. | + ## `auth` ```yaml @@ -936,74 +1015,6 @@ The `events` structure configures the information provided in event notification |-----------|----------|-------------------------------------------------------| | `includereferences` | no | If `true`, include reference information in manifest events. | -## `redis` - -```yaml -redis: - addr: localhost:6379 - password: asecret - db: 0 - dialtimeout: 10ms - readtimeout: 10ms - writetimeout: 10ms - pool: - maxidle: 16 - maxactive: 64 - idletimeout: 300s - tls: - enabled: false -``` - -Declare parameters for constructing the `redis` connections. Registry instances -may use the Redis instance for several applications. Currently, it caches -information about immutable blobs. Most of the `redis` options control -how the registry connects to the `redis` instance. You can control the pool's -behavior with the [pool](#pool) subsection. Additionally, you can control -TLS connection settings with the [tls](#tls) subsection (in-transit encryption). - -You should configure Redis with the **allkeys-lru** eviction policy, because the -registry does not set an expiration value on keys. - -| Parameter | Required | Description | -|-----------|----------|-------------------------------------------------------| -| `addr` | yes | The address (host and port) of the Redis instance. | -| `password`| no | A password used to authenticate to the Redis instance.| -| `db` | no | The name of the database to use for each connection. | -| `dialtimeout` | no | The timeout for connecting to the Redis instance. | -| `readtimeout` | no | The timeout for reading from the Redis instance. | -| `writetimeout` | no | The timeout for writing to the Redis instance. | - -### `pool` - -```yaml -pool: - maxidle: 16 - maxactive: 64 - idletimeout: 300s -``` - -Use these settings to configure the behavior of the Redis connection pool. - -| Parameter | Required | Description | -|-----------|----------|-------------------------------------------------------| -| `maxidle` | no | The maximum number of idle connections in the pool. | -| `maxactive`| no | The maximum number of connections which can be open before blocking a connection request. | -| `idletimeout`| no | How long to wait before closing inactive connections. | - -### `tls` - -```yaml -tls: - enabled: false -``` - -Use these settings to configure Redis TLS. - -| Parameter | Required | Description | -|-----------|----------|-------------------------------------- | -| `enabled` | no | Whether or not to use TLS in-transit. | - - ## `health` ```yaml diff --git a/internal/client/repository.go b/internal/client/repository.go index 4080ec1348..44785db0a8 100644 --- a/internal/client/repository.go +++ b/internal/client/repository.go @@ -158,11 +158,16 @@ func (r *repository) Named() reference.Named { } func (r *repository) Blobs(ctx context.Context) distribution.BlobStore { + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cacheProvider, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + panic(err) + } return &blobs{ name: r.name, ub: r.ub, client: r.client, - statter: cache.NewCachedBlobStatter(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize), &blobStatter{ + statter: cache.NewCachedBlobStatter(cacheProvider, &blobStatter{ name: r.name, ub: r.ub, client: r.client, diff --git a/notifications/listener_test.go b/notifications/listener_test.go index 5781d45374..81f0bb514d 100644 --- a/notifications/listener_test.go +++ b/notifications/listener_test.go @@ -19,8 +19,14 @@ import ( func TestListener(t *testing.T) { ctx := dcontext.Background() + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + registry, err := storage.NewRegistry(ctx, inmemory.New(), - storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), + storage.BlobDescriptorCacheProvider(cache), storage.EnableDelete, storage.EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) diff --git a/registry/handlers/app.go b/registry/handlers/app.go index 8ca1e0da49..27739e2f1f 100644 --- a/registry/handlers/app.go +++ b/registry/handlers/app.go @@ -3,7 +3,6 @@ package handlers import ( "context" "crypto/rand" - "expvar" "fmt" "math" "math/big" @@ -13,7 +12,6 @@ import ( "os" "regexp" "runtime" - "strconv" "strings" "time" @@ -31,9 +29,7 @@ import ( repositorymiddleware "github.com/distribution/distribution/v3/registry/middleware/repository" "github.com/distribution/distribution/v3/registry/proxy" "github.com/distribution/distribution/v3/registry/storage" - memorycache "github.com/distribution/distribution/v3/registry/storage/cache/memory" - cacheprovider "github.com/distribution/distribution/v3/registry/storage/cache/provider" - rediscache "github.com/distribution/distribution/v3/registry/storage/cache/redis" + "github.com/distribution/distribution/v3/registry/storage/cache" storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" "github.com/distribution/distribution/v3/registry/storage/driver/factory" storagemiddleware "github.com/distribution/distribution/v3/registry/storage/driver/middleware" @@ -42,8 +38,6 @@ import ( events "github.com/docker/go-events" "github.com/docker/go-metrics" "github.com/gorilla/mux" - "github.com/redis/go-redis/extra/redisotel/v9" - "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" ) @@ -78,8 +72,6 @@ type App struct { source notifications.SourceRecord } - redis *redis.Client - // isCache is true if this registry is configured as a pull through cache isCache bool @@ -156,7 +148,6 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { app.configureSecret(config) app.configureEvents(config) - app.configureRedis(config) app.configureLogHook(config) options := registrymiddleware.GetRegistryOptions() @@ -238,63 +229,34 @@ func NewApp(ctx context.Context, config *configuration.Configuration) *App { } // configure storage caches - if cc, ok := config.Storage["cache"]; ok { - v, ok := cc["blobdescriptor"] - if !ok { - // Backwards compatible: "layerinfo" == "blobdescriptor" - v = cc["layerinfo"] - } - - switch v { - case "redis": - if app.redis == nil { - panic("redis configuration required to use for layerinfo cache") + // NOTE(milosgajdos): we only provide blobdescriptor cache + // Any other cache configuration is ignored + if bd, ok := config.Cache["blobdescriptor"]; ok { + var ( + err error + cacheProvider cache.BlobDescriptorCacheProvider + ) + if p, ok := bd["provider"]; ok { + name, ok := p.(string) + if !ok { + panic(fmt.Sprintf("unexpected type of value %T for cache provider name, expected string", p)) } - if _, ok := cc["blobdescriptorsize"]; ok { - dcontext.GetLogger(app).Warnf("blobdescriptorsize parameter is not supported with redis cache") + cacheParams := config.Cache.Parameters() + if cacheParams == nil { + cacheParams = make(configuration.Parameters) } - cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redis) - localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) - app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) + cacheProvider, err = cache.Get(ctx, name, cacheParams) if err != nil { - panic("could not create registry: " + err.Error()) - } - dcontext.GetLogger(app).Infof("using redis blob descriptor cache") - case "inmemory": - blobDescriptorSize := memorycache.DefaultSize - configuredSize, ok := cc["blobdescriptorsize"] - if ok { - // Since Parameters is not strongly typed, render to a string and convert back - blobDescriptorSize, err = strconv.Atoi(fmt.Sprint(configuredSize)) - if err != nil { - panic(fmt.Sprintf("invalid blobdescriptorsize value %s: %s", configuredSize, err)) - } + dcontext.GetLogger(app).Infof("failed to initialize %s cache: %v", name, err) } + } - cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider(blobDescriptorSize) + if cacheProvider != nil { localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) if err != nil { panic("could not create registry: " + err.Error()) } - dcontext.GetLogger(app).Infof("using inmemory blob descriptor cache") - default: - if v != "" { - name, ok := v.(string) - if !ok { - panic(fmt.Sprintf("unexpected type of value %T (string expected)", v)) - } - cacheProvider, err := cacheprovider.Get(app, name, cc) - if err != nil { - panic("unable to initialize cache provider: " + err.Error()) - } - localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider)) - app.registry, err = storage.NewRegistry(app, app.driver, localOptions...) - if err != nil { - panic("could not create registry: " + err.Error()) - } - dcontext.GetLogger(app).Infof("using %s blob descriptor cache", name) - } } } @@ -504,55 +466,6 @@ func (app *App) configureEvents(configuration *configuration.Configuration) { } } -func (app *App) configureRedis(cfg *configuration.Configuration) { - if cfg.Redis.Addr == "" { - dcontext.GetLogger(app).Infof("redis not configured") - return - } - - app.redis = app.createPool(cfg.Redis) - - // Enable metrics instrumentation. - if err := redisotel.InstrumentMetrics(app.redis); err != nil { - dcontext.GetLogger(app).Errorf("failed to instrument metrics on redis: %v", err) - } - - // setup expvar - registry := expvar.Get("registry") - if registry == nil { - registry = expvar.NewMap("registry") - } - - registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} { - stats := app.redis.PoolStats() - return map[string]interface{}{ - "Config": cfg, - "Active": stats.TotalConns - stats.IdleConns, - } - })) -} - -func (app *App) createPool(cfg configuration.Redis) *redis.Client { - return redis.NewClient(&redis.Options{ - Addr: cfg.Addr, - OnConnect: func(ctx context.Context, cn *redis.Conn) error { - res := cn.Ping(ctx) - return res.Err() - }, - Username: cfg.Username, - Password: cfg.Password, - DB: cfg.DB, - MaxRetries: 3, - DialTimeout: cfg.DialTimeout, - ReadTimeout: cfg.ReadTimeout, - WriteTimeout: cfg.WriteTimeout, - PoolFIFO: false, - MaxIdleConns: cfg.Pool.MaxIdle, - PoolSize: cfg.Pool.MaxActive, - ConnMaxIdleTime: cfg.Pool.IdleTimeout, - }) -} - // configureLogHook prepares logging hook parameters. func (app *App) configureLogHook(configuration *configuration.Configuration) { entry, ok := dcontext.GetLogger(app).(*logrus.Entry) diff --git a/registry/handlers/app_test.go b/registry/handlers/app_test.go index d18625926e..bc92a32413 100644 --- a/registry/handlers/app_test.go +++ b/registry/handlers/app_test.go @@ -26,7 +26,13 @@ import ( func TestAppDispatcher(t *testing.T) { driver := inmemory.New() ctx := dcontext.Background() - registry, err := storage.NewRegistry(ctx, driver, storage.BlobDescriptorCacheProvider(memorycache.NewInMemoryBlobDescriptorCacheProvider(0)), storage.EnableDelete, storage.EnableRedirect) + cacheOpts := memorycache.NewCacheOptions(0) + cache, err := memorycache.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + registry, err := storage.NewRegistry(ctx, driver, storage.BlobDescriptorCacheProvider(cache), storage.EnableDelete, storage.EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/proxy/proxyblobstore_test.go b/registry/proxy/proxyblobstore_test.go index dfcbfaa383..24d83f1434 100644 --- a/registry/proxy/proxyblobstore_test.go +++ b/registry/proxy/proxyblobstore_test.go @@ -123,6 +123,7 @@ func makeTestEnv(t *testing.T, name string) *testEnv { } ctx := context.Background() + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) truthDir := t.TempDir() cacheDir := t.TempDir() @@ -134,8 +135,12 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unable to create filesystem driver: %s", err) } + localCache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } // todo: create a tempfile area here - localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), storage.EnableRedirect, storage.DisableDigestResumption) + localRegistry, err := storage.NewRegistry(ctx, localDriver, storage.BlobDescriptorCacheProvider(localCache), storage.EnableRedirect, storage.DisableDigestResumption) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -151,7 +156,12 @@ func makeTestEnv(t *testing.T, name string) *testEnv { t.Fatalf("unable to create filesystem driver: %s", err) } - truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize))) + truthCache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + truthRegistry, err := storage.NewRegistry(ctx, cacheDriver, storage.BlobDescriptorCacheProvider(truthCache)) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/proxy/proxymanifeststore_test.go b/registry/proxy/proxymanifeststore_test.go index 67313f66ce..a54cf753c3 100644 --- a/registry/proxy/proxymanifeststore_test.go +++ b/registry/proxy/proxymanifeststore_test.go @@ -87,8 +87,15 @@ func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestE } ctx := context.Background() + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + + truthCache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + truthRegistry, err := storage.NewRegistry(ctx, inmemory.New(), - storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize))) + storage.BlobDescriptorCacheProvider(truthCache)) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -110,8 +117,13 @@ func newManifestStoreTestEnv(t *testing.T, name, tag string) *manifestStoreTestE t.Fatalf(err.Error()) } + localCache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + localRegistry, err := storage.NewRegistry(ctx, inmemory.New(), - storage.BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), storage.EnableRedirect, storage.DisableDigestResumption) + storage.BlobDescriptorCacheProvider(localCache), storage.EnableRedirect, storage.DisableDigestResumption) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/storage/blob_test.go b/registry/storage/blob_test.go index 768cd65c32..bbfeedc02b 100644 --- a/registry/storage/blob_test.go +++ b/registry/storage/blob_test.go @@ -24,7 +24,13 @@ func TestWriteSeek(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() - registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -60,7 +66,12 @@ func TestSimpleBlobUpload(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() - registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -232,7 +243,12 @@ func TestSimpleBlobUpload(t *testing.T) { } // Reuse state to test delete with a delete-disabled registry - registry, err = NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect) + delDisabledCache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + registry, err = NewRegistry(ctx, driver, BlobDescriptorCacheProvider(delDisabledCache), EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -254,7 +270,12 @@ func TestSimpleBlobRead(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() - registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -366,7 +387,12 @@ func TestBlobMount(t *testing.T) { imageName, _ := reference.WithName("foo/bar") sourceImageName, _ := reference.WithName("foo/source") driver := inmemory.New() - registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -516,7 +542,12 @@ func TestLayerUploadZeroLength(t *testing.T) { ctx := context.Background() imageName, _ := reference.WithName("foo/bar") driver := inmemory.New() - registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + registry, err := NewRegistry(ctx, driver, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/storage/cache/cache.go b/registry/storage/cache/cache.go index 0aa9dd4a80..a8eae887b1 100644 --- a/registry/storage/cache/cache.go +++ b/registry/storage/cache/cache.go @@ -3,11 +3,18 @@ package cache import ( + "context" "fmt" "github.com/distribution/distribution/v3" ) +// InitFunc is the type of a CacheProvider factory function and is +// used to register the constructor for different CacheProvider backends. +type InitFunc func(ctx context.Context, options map[string]interface{}) (BlobDescriptorCacheProvider, error) + +var cacheProviders map[string]InitFunc + // BlobDescriptorCacheProvider provides repository scoped // BlobDescriptorService cache instances and a global descriptor cache. type BlobDescriptorCacheProvider interface { @@ -33,3 +40,25 @@ func ValidateDescriptor(desc distribution.Descriptor) error { return nil } + +// Register is used to register an InitFunc for +// a BlobDescriptorCacheProvider with the given name. +// It's meant to be called from init() function. +func Register(name string, initFunc InitFunc) { + if cacheProviders == nil { + cacheProviders = make(map[string]InitFunc) + } + if _, exists := cacheProviders[name]; exists { + panic(fmt.Sprintf("name already registered: %s", name)) + } + + cacheProviders[name] = initFunc +} + +// Get constructs a CacheProvider with the given options using the named backend. +func Get(ctx context.Context, name string, options map[string]interface{}) (BlobDescriptorCacheProvider, error) { + if initFunc, exists := cacheProviders[name]; exists { + return initFunc(ctx, options) + } + return nil, fmt.Errorf("no cache provider registered with name: %s", name) +} diff --git a/registry/storage/cache/memory/memory.go b/registry/storage/cache/memory/memory.go index 8097939f70..b418294ad8 100644 --- a/registry/storage/cache/memory/memory.go +++ b/registry/storage/cache/memory/memory.go @@ -8,9 +8,15 @@ import ( "github.com/distribution/distribution/v3/registry/storage/cache" "github.com/distribution/reference" "github.com/hashicorp/golang-lru/arc/v2" + "github.com/mitchellh/mapstructure" "github.com/opencontainers/go-digest" ) +// init registers the inmemory cacheprovider. +func init() { + cache.Register("inmemory", NewBlobDescriptorCacheProvider) +} + const ( // DefaultSize is the default cache size to use if no size is explicitly // configured. @@ -29,20 +35,27 @@ type inMemoryBlobDescriptorCacheProvider struct { lru *arc.ARCCache[descriptorCacheKey, distribution.Descriptor] } -// NewInMemoryBlobDescriptorCacheProvider returns a new mapped-based cache for +// NewBlobDescriptorCacheProvider returns a new mapped-based cache for // storing blob descriptor data. -func NewInMemoryBlobDescriptorCacheProvider(size int) cache.BlobDescriptorCacheProvider { - if size <= 0 { +func NewBlobDescriptorCacheProvider(ctx context.Context, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) { + var c Memory + if err := mapstructure.Decode(options["params"], &c); err != nil { + return nil, err + } + + size := DefaultSize + if c.Size <= 0 { size = math.MaxInt } + lruCache, err := arc.NewARC[descriptorCacheKey, distribution.Descriptor](size) if err != nil { // NewARC can only fail if size is <= 0, so this unreachable - panic(err) + return nil, err } return &inMemoryBlobDescriptorCacheProvider{ lru: lruCache, - } + }, nil } func (imbdcp *inMemoryBlobDescriptorCacheProvider) RepositoryScoped(repo string) (distribution.BlobDescriptorService, error) { @@ -156,3 +169,17 @@ func (rsimbdcp *repositoryScopedInMemoryBlobDescriptorCache) SetDescriptor(ctx c rsimbdcp.parent.lru.Add(key, desc) return rsimbdcp.parent.SetDescriptor(ctx, dgst, desc) } + +// Memory configures inmemory cache +type Memory struct { + Size int `yaml:"size,omitempty"` +} + +// NewCacheOptions returns new memory cache options. +func NewCacheOptions(size int) map[string]interface{} { + return map[string]interface{}{ + "params": map[interface{}]interface{}{ + "size": size, + }, + } +} diff --git a/registry/storage/cache/memory/memory_test.go b/registry/storage/cache/memory/memory_test.go index 3abbdfeae4..a8e2bcf2ca 100644 --- a/registry/storage/cache/memory/memory_test.go +++ b/registry/storage/cache/memory/memory_test.go @@ -1,6 +1,7 @@ package memory import ( + "context" "testing" "github.com/distribution/distribution/v3/registry/storage/cache/cachecheck" @@ -9,5 +10,10 @@ import ( // TestInMemoryBlobInfoCache checks the in memory implementation is working // correctly. func TestInMemoryBlobInfoCache(t *testing.T) { - cachecheck.CheckBlobDescriptorCache(t, NewInMemoryBlobDescriptorCacheProvider(UnlimitedSize)) + opts := NewCacheOptions(UnlimitedSize) + cache, err := NewBlobDescriptorCacheProvider(context.Background(), opts) + if err != nil { + t.Fatalf("init cache: %v", err) + } + cachecheck.CheckBlobDescriptorCache(t, cache) } diff --git a/registry/storage/cache/provider/cacheprovider.go b/registry/storage/cache/provider/cacheprovider.go deleted file mode 100644 index 1beb5c6575..0000000000 --- a/registry/storage/cache/provider/cacheprovider.go +++ /dev/null @@ -1,37 +0,0 @@ -package cacheprovider - -import ( - "context" - "fmt" - - "github.com/distribution/distribution/v3/registry/storage/cache" -) - -// InitFunc is the type of a CacheProvider factory function and is -// used to register the constructor for different CacheProvider backends. -type InitFunc func(ctx context.Context, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) - -var cacheProviders map[string]InitFunc - -// Register is used to register an InitFunc for -// a CacheProvider backend with the given name. -// It's meant to be called from init() function -// of the cache provider. -func Register(name string, initFunc InitFunc) { - if cacheProviders == nil { - cacheProviders = make(map[string]InitFunc) - } - if _, exists := cacheProviders[name]; exists { - panic(fmt.Sprintf("name already registered: %s", name)) - } - - cacheProviders[name] = initFunc -} - -// Get constructs a CacheProvider with the given options using the named backend. -func Get(ctx context.Context, name string, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) { - if initFunc, exists := cacheProviders[name]; exists { - return initFunc(ctx, options) - } - return nil, fmt.Errorf("no cache Provider registered with name: %s", name) -} diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index 87ec43e3fd..b0ddfd7a44 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -2,17 +2,35 @@ package redis import ( "context" + "errors" + "expvar" "fmt" "strconv" + "time" "github.com/distribution/distribution/v3" + "github.com/distribution/distribution/v3/internal/dcontext" "github.com/distribution/distribution/v3/registry/storage/cache" "github.com/distribution/distribution/v3/registry/storage/cache/metrics" "github.com/distribution/reference" + "github.com/mitchellh/mapstructure" "github.com/opencontainers/go-digest" + "github.com/redis/go-redis/extra/redisotel/v9" "github.com/redis/go-redis/v9" ) +// init registers the redis cacheprovider. +func init() { + cache.Register("redis", NewBlobDescriptorCacheProvider) +} + +var ( + // ErrMissingConfig is returned when redis config is missing. + ErrMissingConfig = errors.New("missing configuration") + // ErrMissingAddr is returned when redis congig misses address. + ErrMissingAddr = errors.New("missing address") +) + // redisBlobDescriptorService provides an implementation of // BlobDescriptorCacheProvider based on redis. Blob descriptors are stored in // two parts. The first provide fast access to repository membership through a @@ -35,16 +53,63 @@ type redisBlobDescriptorService struct { var _ distribution.BlobDescriptorService = &redisBlobDescriptorService{} -// NewRedisBlobDescriptorCacheProvider returns a new redis-based +// NewBlobDescriptorCacheProvider returns a new redis-based // BlobDescriptorCacheProvider using the provided redis connection pool. -func NewRedisBlobDescriptorCacheProvider(pool *redis.Client) cache.BlobDescriptorCacheProvider { +func NewBlobDescriptorCacheProvider(ctx context.Context, options map[string]interface{}) (cache.BlobDescriptorCacheProvider, error) { + params, ok := options["params"] + if !ok { + return nil, ErrMissingConfig + } + + var c Redis + + // NOTE(milosgajdos): mapstructure does not decode time types such as duration + // which we need in the timeout configuration values out of the box + // but it provides a way to do this via DecodeHook functions. + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + WeaklyTypedInput: true, + Result: &c, + }) + if err != nil { + return nil, err + } + if err := dec.Decode(params); err != nil { + return nil, err + } + + if c.Addr == "" { + return nil, ErrMissingAddr + } + + pool := createPool(c) + + // Enable metrics instrumentation. + if err := redisotel.InstrumentMetrics(pool); err != nil { + dcontext.GetLogger(ctx).Errorf("failed to instrument metrics on redis: %v", err) + } + + // setup expvar + registry := expvar.Get("registry") + if registry == nil { + registry = expvar.NewMap("registry") + } + + registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} { + stats := pool.PoolStats() + return map[string]interface{}{ + "Config": c, + "Active": stats.TotalConns - stats.IdleConns, + } + })) + return metrics.NewPrometheusCacheProvider( &redisBlobDescriptorService{ pool: pool, }, "cache_redis", "Number of seconds taken by redis", - ) + ), nil } // RepositoryScoped returns the scoped cache. @@ -272,3 +337,62 @@ func (rsrbds *repositoryScopedRedisBlobDescriptorService) blobDescriptorHashKey( func (rsrbds *repositoryScopedRedisBlobDescriptorService) repositoryBlobSetKey(repo string) string { return "repository::" + rsrbds.repo + "::blobs" } + +// Redis configures the redis pool available to the registry. +type Redis struct { + // Addr specifies the the redis instance available to the application. + Addr string `yaml:"addr,omitempty"` + + // Usernames can be used as a finer-grained permission control since the introduction of the redis 6.0. + Username string `yaml:"username,omitempty"` + + // Password string to use when making a connection. + Password string `yaml:"password,omitempty"` + + // DB specifies the database to connect to on the redis instance. + DB int `yaml:"db,omitempty"` + + // TLS configures settings for redis in-transit encryption + TLS struct { + Enabled bool `yaml:"enabled,omitempty"` + } `yaml:"tls,omitempty"` + + DialTimeout time.Duration `yaml:"dialtimeout,omitempty"` // timeout for connect + ReadTimeout time.Duration `yaml:"readtimeout,omitempty"` // timeout for reads of data + WriteTimeout time.Duration `yaml:"writetimeout,omitempty"` // timeout for writes of data + + // Pool configures the behavior of the redis connection pool. + Pool struct { + // MaxIdle sets the maximum number of idle connections. + MaxIdle int `yaml:"maxidle,omitempty"` + + // MaxActive sets the maximum number of connections that should be + // opened before blocking a connection request. + MaxActive int `yaml:"maxactive,omitempty"` + + // IdleTimeout sets the amount time to wait before closing + // inactive connections. + IdleTimeout time.Duration `yaml:"idletimeout,omitempty"` + } `yaml:"pool,omitempty"` +} + +func createPool(cfg Redis) *redis.Client { + return redis.NewClient(&redis.Options{ + Addr: cfg.Addr, + OnConnect: func(ctx context.Context, cn *redis.Conn) error { + res := cn.Ping(ctx) + return res.Err() + }, + Username: cfg.Username, + Password: cfg.Password, + DB: cfg.DB, + MaxRetries: 3, + DialTimeout: cfg.DialTimeout, + ReadTimeout: cfg.ReadTimeout, + WriteTimeout: cfg.WriteTimeout, + PoolFIFO: false, + MaxIdleConns: cfg.Pool.MaxIdle, + PoolSize: cfg.Pool.MaxActive, + ConnMaxIdleTime: cfg.Pool.IdleTimeout, + }) +} diff --git a/registry/storage/cache/redis/redis_test.go b/registry/storage/cache/redis/redis_test.go index 328a2ec93a..b1166521ca 100644 --- a/registry/storage/cache/redis/redis_test.go +++ b/registry/storage/cache/redis/redis_test.go @@ -7,7 +7,6 @@ import ( "testing" "github.com/distribution/distribution/v3/registry/storage/cache/cachecheck" - "github.com/redis/go-redis/v9" ) var redisAddr string @@ -16,6 +15,17 @@ func init() { flag.StringVar(&redisAddr, "test.registry.storage.cache.redis.addr", "", "configure the address of a test instance of redis") } +func makeOptions(addr string) map[string]interface{} { + return map[string]interface{}{ + "params": map[interface{}]interface{}{ + "addr": addr, + "pool": map[interface{}]interface{}{ + "maxactive": 3, + }, + }, + } +} + // TestRedisLayerInfoCache exercises a live redis instance using the cache // implementation. func TestRedisBlobDescriptorCacheProvider(t *testing.T) { @@ -29,22 +39,15 @@ func TestRedisBlobDescriptorCacheProvider(t *testing.T) { t.Skip("please set -test.registry.storage.cache.redis.addr to test layer info cache against redis") } - pool := redis.NewClient(&redis.Options{ - Addr: redisAddr, - OnConnect: func(ctx context.Context, cn *redis.Conn) error { - res := cn.Ping(ctx) - return res.Err() - }, - MaxRetries: 3, - PoolSize: 2, - }) - // Clear the database ctx := context.Background() - err := pool.FlushDB(ctx).Err() + + opts := makeOptions(redisAddr) + cache, err := NewBlobDescriptorCacheProvider(ctx, opts) if err != nil { - t.Fatalf("unexpected error flushing redis db: %v", err) + t.Fatalf("init redis cache: %v", err) } - cachecheck.CheckBlobDescriptorCache(t, NewRedisBlobDescriptorCacheProvider(pool)) + // TODO(milosgajdos): figure out how to flush redis DB before test + cachecheck.CheckBlobDescriptorCache(t, cache) } diff --git a/registry/storage/catalog_test.go b/registry/storage/catalog_test.go index 26491bc70d..df0531cca3 100644 --- a/registry/storage/catalog_test.go +++ b/registry/storage/catalog_test.go @@ -26,7 +26,13 @@ type setupEnv struct { func setupFS(t *testing.T) *setupEnv { d := inmemory.New() ctx := context.Background() - registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -209,7 +215,13 @@ func testEq(a, b []string, size int) bool { func setupBadWalkEnv(t *testing.T) *setupEnv { d := newBadListDriver() ctx := context.Background() - registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + registry, err := NewRegistry(ctx, d, BlobDescriptorCacheProvider(cache), EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } diff --git a/registry/storage/manifeststore_test.go b/registry/storage/manifeststore_test.go index 3c1cb9ddef..d66305ae57 100644 --- a/registry/storage/manifeststore_test.go +++ b/registry/storage/manifeststore_test.go @@ -54,7 +54,13 @@ func newManifestStoreTestEnv(t *testing.T, name reference.Named, tag string, opt } func TestManifestStorage(t *testing.T) { - testManifestStorage(t, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableDelete, EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(context.Background(), cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + testManifestStorage(t, BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) } func testManifestStorage(t *testing.T, options ...RegistryOption) { @@ -277,7 +283,13 @@ func testManifestStorage(t *testing.T, options ...RegistryOption) { t.Errorf("Deleted manifest get returned non-nil") } - r, err := NewRegistry(ctx, env.driver, BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), EnableRedirect) + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + + r, err := NewRegistry(ctx, env.driver, BlobDescriptorCacheProvider(cache), EnableRedirect) if err != nil { t.Fatalf("error creating registry: %v", err) } @@ -311,12 +323,18 @@ func testOCIManifestStorage(t *testing.T, testname string, includeMediaTypes boo indexMediaType = "" } + ctx := context.Background() + cacheOpts := memory.NewCacheOptions(memory.UnlimitedSize) + cache, err := memory.NewBlobDescriptorCacheProvider(ctx, cacheOpts) + if err != nil { + t.Fatalf("memory cache: %v", err) + } + repoName, _ := reference.WithName("foo/bar") env := newManifestStoreTestEnv(t, repoName, "thetag", - BlobDescriptorCacheProvider(memory.NewInMemoryBlobDescriptorCacheProvider(memory.UnlimitedSize)), + BlobDescriptorCacheProvider(cache), EnableDelete, EnableRedirect) - ctx := context.Background() ms, err := env.repository.Manifests(ctx) if err != nil { t.Fatal(err) diff --git a/tests/conf-e2e-cloud-storage.yml b/tests/conf-e2e-cloud-storage.yml index 63a8778c70..8f90746ae2 100644 --- a/tests/conf-e2e-cloud-storage.yml +++ b/tests/conf-e2e-cloud-storage.yml @@ -16,21 +16,9 @@ log: service: registry formatter: text level: debug -redis: - addr: redis:6379 - db: 0 - dialtimeout: 5s - readtimeout: 10ms - writetimeout: 10ms - pool: - idletimeout: 60s - maxactive: 64 - maxidle: 16 storage: redirect: disable: true - cache: - blobdescriptor: redis maintenance: uploadpurging: enabled: false @@ -46,3 +34,16 @@ storage: chunksize: 33554432 secure: true v4auth: true +cache: + blobdescriptor: + provider: redis + params: + addr: redis:6379 + db: 0 + dialtimeout: 5s + readtimeout: 10ms + writetimeout: 10ms + pool: + idletimeout: 60s + maxactive: 64 + maxidle: 16 diff --git a/tests/conf-local-cloud.yml b/tests/conf-local-cloud.yml index 5e82f8cf32..d422e5a8e4 100644 --- a/tests/conf-local-cloud.yml +++ b/tests/conf-local-cloud.yml @@ -19,8 +19,6 @@ log: storage: delete: enabled: true - cache: - blobdescriptor: inmemory maintenance: uploadpurging: enabled: false @@ -36,3 +34,6 @@ storage: chunksize: 33554432 secure: true v4auth: true +cache: + blobdescriptor: + provider: inmemory From 6e079b877d8e000b0c5891f2d7e0d2dfb480636a Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 8 Dec 2023 21:14:47 +0000 Subject: [PATCH 5/6] Update cache.go Co-authored-by: Cory Snider Signed-off-by: Milos Gajdos --- registry/storage/cache/cache.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/registry/storage/cache/cache.go b/registry/storage/cache/cache.go index a8eae887b1..6eca7155cf 100644 --- a/registry/storage/cache/cache.go +++ b/registry/storage/cache/cache.go @@ -49,7 +49,7 @@ func Register(name string, initFunc InitFunc) { cacheProviders = make(map[string]InitFunc) } if _, exists := cacheProviders[name]; exists { - panic(fmt.Sprintf("name already registered: %s", name)) + panic(fmt.Sprintf("cache provider already registered with the name %q", name)) } cacheProviders[name] = initFunc From 7bf9aaeb2a42a164c739e823bb8e2b83a856e736 Mon Sep 17 00:00:00 2001 From: Milos Gajdos Date: Fri, 8 Dec 2023 21:14:55 +0000 Subject: [PATCH 6/6] Update redis.go Co-authored-by: Cory Snider Signed-off-by: Milos Gajdos --- registry/storage/cache/redis/redis.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/registry/storage/cache/redis/redis.go b/registry/storage/cache/redis/redis.go index b0ddfd7a44..0693779833 100644 --- a/registry/storage/cache/redis/redis.go +++ b/registry/storage/cache/redis/redis.go @@ -26,9 +26,9 @@ func init() { var ( // ErrMissingConfig is returned when redis config is missing. - ErrMissingConfig = errors.New("missing configuration") - // ErrMissingAddr is returned when redis congig misses address. - ErrMissingAddr = errors.New("missing address") + ErrMissingConfig = errors.New("redis: missing configuration") + // ErrMissingAddr is returned when address is missing from redis config. + ErrMissingAddr = errors.New("redis: missing address") ) // redisBlobDescriptorService provides an implementation of