diff --git a/CHANGELOG.md b/CHANGELOG.md index d4b47b1d3a23..64d5010e9638 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ##### Enhancements * [11003](https://github.com/grafana/loki/pull/11003) **MichelHollands**: Add the `metrics-namespace` flag to change the namespace of metrics currently using cortex as namespace. +* [10096](https://github.com/grafana/loki/pull/10096) **aschleck**: Storage: Allow setting a constant prefix for all created keys * [11038](https://github.com/grafana/loki/pull/11038) **kavirajk**: Remove already deprecated `store.max-look-back-period`. * [10906](https://github.com/grafana/loki/pull/10906) **kavirajk**: Support Loki ruler to notify WAL writes to remote storage. * [10613](https://github.com/grafana/loki/pull/10613) **ngc4579**: Helm: allow GrafanaAgent tolerations diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index 6dcfedb74746..27120c71b9b9 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -2100,6 +2100,11 @@ congestion_control: # CLI flag: -store.congestion-control.hedge.strategy [strategy: | default = ""] +# Experimental. Sets a constant prefix for all keys inserted into object +# storage. Example: loki/ +# CLI flag: -store.object-prefix +[object_prefix: | default = ""] + # The cache block configures the cache backend. # The CLI flags prefix for this block configuration is: store.index-cache-read [index_queries_cache_config: ] diff --git a/pkg/compactor/compactor.go b/pkg/compactor/compactor.go index 33c78f3e00a7..53a8c8c4c659 100644 --- a/pkg/compactor/compactor.go +++ b/pkg/compactor/compactor.go @@ -294,6 +294,7 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie if c.cfg.RetentionEnabled { var ( + raw client.ObjectClient encoder client.KeyEncoder name = fmt.Sprintf("%s_%s", period.ObjectType, period.From.String()) retentionWorkDir = filepath.Join(c.cfg.WorkingDirectory, "retention", name) @@ -313,7 +314,12 @@ func (c *Compactor) init(objectStoreClients map[config.DayTime]client.ObjectClie // remove markers from the store dir after copying them to period specific dirs. legacyMarkerDirs[period.ObjectType] = struct{}{} - if _, ok := objectClient.(*local.FSObjectClient); ok { + if casted, ok := objectClient.(client.PrefixedObjectClient); ok { + raw = casted.GetDownstream() + } else { + raw = objectClient + } + if _, ok := raw.(*local.FSObjectClient); ok { encoder = client.FSEncoder } chunkClient := client.NewClient(objectClient, encoder, schemaConfig) diff --git a/pkg/storage/chunk/client/prefixed_object_client.go b/pkg/storage/chunk/client/prefixed_object_client.go new file mode 100644 index 000000000000..aa792b21b9a7 --- /dev/null +++ b/pkg/storage/chunk/client/prefixed_object_client.go @@ -0,0 +1,69 @@ +package client + +import ( + "context" + "io" + "strings" +) + +type PrefixedObjectClient struct { + downstreamClient ObjectClient + prefix string +} + +func NewPrefixedObjectClient(downstreamClient ObjectClient, prefix string) ObjectClient { + return PrefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix} +} + +func (p PrefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { + return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object) +} + +func (p PrefixedObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { + return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey) +} + +func (p PrefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { + return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) +} + +func (p PrefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]StorageObject, []StorageCommonPrefix, error) { + objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter) + if err != nil { + return nil, nil, err + } + + for i := range objects { + objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix) + } + + for i := range commonPrefixes { + commonPrefixes[i] = StorageCommonPrefix(strings.TrimPrefix(string(commonPrefixes[i]), p.prefix)) + } + + return objects, commonPrefixes, nil +} + +func (p PrefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error { + return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey) +} + +func (p PrefixedObjectClient) IsObjectNotFoundErr(err error) bool { + return p.downstreamClient.IsObjectNotFoundErr(err) +} + +func (p PrefixedObjectClient) IsRetryableErr(err error) bool { + return p.downstreamClient.IsRetryableErr(err) +} + +func (p PrefixedObjectClient) Stop() { + p.downstreamClient.Stop() +} + +func (p PrefixedObjectClient) GetDownstream() ObjectClient { + return p.downstreamClient +} + +func (p PrefixedObjectClient) GetPrefix() string { + return p.prefix +} diff --git a/pkg/storage/factory.go b/pkg/storage/factory.go index 4aa9f5ee674a..0eb1c74d89d8 100644 --- a/pkg/storage/factory.go +++ b/pkg/storage/factory.go @@ -328,6 +328,7 @@ type Config struct { COSConfig ibmcloud.COSConfig `yaml:"cos"` IndexCacheValidity time.Duration `yaml:"index_cache_validity"` CongestionControl congestion.Config `yaml:"congestion_control,omitempty"` + ObjectPrefix string `yaml:"object_prefix" doc:"description=Experimental. Sets a constant prefix for all keys inserted into object storage. Example: loki/"` IndexQueriesCacheConfig cache.Config `yaml:"index_queries_cache_config"` DisableBroadIndexQueries bool `yaml:"disable_broad_index_queries"` @@ -362,6 +363,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.IndexQueriesCacheConfig.RegisterFlagsWithPrefix("store.index-cache-read.", "", f) f.DurationVar(&cfg.IndexCacheValidity, "store.index-cache-validity", 5*time.Minute, "Cache validity for active index entries. Should be no higher than -ingester.max-chunk-idle.") + f.StringVar(&cfg.ObjectPrefix, "store.object-prefix", "", "The prefix to all keys inserted in object storage. Example: loki-instances/west/") f.BoolVar(&cfg.DisableBroadIndexQueries, "store.disable-broad-index-queries", false, "Disable broad index queries which results in reduced cache usage and faster query performance at the expense of somewhat higher QPS on the index store.") f.IntVar(&cfg.MaxParallelGetChunk, "store.max-parallel-get-chunk", 150, "Maximum number of parallel chunk reads.") cfg.BoltDBShipperConfig.RegisterFlags(f) @@ -634,8 +636,23 @@ func (c *ClientMetrics) Unregister() { c.AzureMetrics.Unregister() } -// NewObjectClient makes a new StorageClient of the desired types. +// NewObjectClient makes a new StorageClient with the prefix in the front. func NewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { + actual, err := internalNewObjectClient(name, cfg, clientMetrics) + if err != nil { + return nil, err + } + + if cfg.ObjectPrefix == "" { + return actual, nil + } else { + prefix := strings.Trim(cfg.ObjectPrefix, "/") + "/" + return client.NewPrefixedObjectClient(actual, prefix), nil + } +} + +// internalNewObjectClient makes the underlying StorageClient of the desired types. +func internalNewObjectClient(name string, cfg Config, clientMetrics ClientMetrics) (client.ObjectClient, error) { var ( namedStore string storeType = name diff --git a/pkg/storage/factory_test.go b/pkg/storage/factory_test.go index ea11f36fd309..2588c9dc69dd 100644 --- a/pkg/storage/factory_test.go +++ b/pkg/storage/factory_test.go @@ -12,6 +12,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/storage/chunk/client" "github.com/grafana/loki/pkg/storage/chunk/client/cassandra" "github.com/grafana/loki/pkg/storage/chunk/client/local" "github.com/grafana/loki/pkg/storage/config" @@ -227,6 +228,58 @@ func TestNamedStores_populateStoreType(t *testing.T) { }) } +func TestNewObjectClient_prefixing(t *testing.T) { + t.Run("no prefix", func(t *testing.T) { + var cfg Config + flagext.DefaultValues(&cfg) + + objectClient, err := NewObjectClient("inmemory", cfg, cm) + require.NoError(t, err) + + _, ok := objectClient.(client.PrefixedObjectClient) + assert.False(t, ok) + }) + + t.Run("prefix with trailing /", func(t *testing.T) { + var cfg Config + flagext.DefaultValues(&cfg) + cfg.ObjectPrefix = "my/prefix/" + + objectClient, err := NewObjectClient("inmemory", cfg, cm) + require.NoError(t, err) + + prefixed, ok := objectClient.(client.PrefixedObjectClient) + assert.True(t, ok) + assert.Equal(t, "my/prefix/", prefixed.GetPrefix()) + }) + + t.Run("prefix without trailing /", func(t *testing.T) { + var cfg Config + flagext.DefaultValues(&cfg) + cfg.ObjectPrefix = "my/prefix" + + objectClient, err := NewObjectClient("inmemory", cfg, cm) + require.NoError(t, err) + + prefixed, ok := objectClient.(client.PrefixedObjectClient) + assert.True(t, ok) + assert.Equal(t, "my/prefix/", prefixed.GetPrefix()) + }) + + t.Run("prefix with starting and trailing /", func(t *testing.T) { + var cfg Config + flagext.DefaultValues(&cfg) + cfg.ObjectPrefix = "/my/prefix/" + + objectClient, err := NewObjectClient("inmemory", cfg, cm) + require.NoError(t, err) + + prefixed, ok := objectClient.(client.PrefixedObjectClient) + assert.True(t, ok) + assert.Equal(t, "my/prefix/", prefixed.GetPrefix()) + }) +} + // DefaultSchemaConfig creates a simple schema config for testing func DefaultSchemaConfig(store, schema string, from model.Time) config.SchemaConfig { s := config.SchemaConfig{ diff --git a/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go b/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go index 1f88d1b7347c..6e2c8a5def26 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/cached_client_test.go @@ -57,6 +57,14 @@ func (m *mockObjectClient) List(ctx context.Context, prefix, delimiter string) ( } func TestCachedObjectClient_List(t *testing.T) { + objectKeys := func(items []client.StorageObject) []string { + keys := make([]string, 0, len(items)) + for _, item := range items { + keys = append(keys, item.Key) + } + return keys + } + t.Run("refresh table name cache if requested table is not in cache", func(t *testing.T) { ctx := context.Background() @@ -81,14 +89,6 @@ func TestCachedObjectClient_List(t *testing.T) { // replace mock object client with one that returns more tables cachedObjectClient.ObjectClient = newMockObjectClient(t, newObjectsInStorage) - objectKeys := func(items []client.StorageObject) []string { - keys := make([]string, 0, len(items)) - for _, item := range items { - keys = append(keys, item.Key) - } - return keys - } - // list contents of a table that is in table name cache objects, _, err = cachedObjectClient.listTable(ctx, "table1") require.Nil(t, err) @@ -110,6 +110,24 @@ func TestCachedObjectClient_List(t *testing.T) { objectsFromListCall, _, _ = cachedObjectClient.List(ctx, "table3/", "/", false) require.Equal(t, objectsFromListCall, objects) }) + + t.Run("supports prefixed clients", func(t *testing.T) { + ctx := context.Background() + + prefix := "my/amazing/prefix/" + objectsInStorage := []string{ + prefix + "table1/db.gz", + prefix + "table2/db.gz", + prefix + "table2/db2.gz", + } + objectClient := newMockObjectClient(t, objectsInStorage) + prefixedClient := client.NewPrefixedObjectClient(objectClient, prefix) + cachedObjectClient := newCachedObjectClient(prefixedClient) + + objects, _, err := cachedObjectClient.List(ctx, "table2/", "/", false) + require.Nil(t, err) + require.Equal(t, []string{"table2/db.gz", "table2/db2.gz"}, objectKeys(objects)) + }) } func TestCachedObjectClient(t *testing.T) { diff --git a/pkg/storage/stores/shipper/indexshipper/storage/client.go b/pkg/storage/stores/shipper/indexshipper/storage/client.go index fc6feab38ac5..e8a3f30a2d82 100644 --- a/pkg/storage/stores/shipper/indexshipper/storage/client.go +++ b/pkg/storage/stores/shipper/indexshipper/storage/client.go @@ -50,7 +50,7 @@ type IndexFile struct { } func NewIndexStorageClient(origObjectClient client.ObjectClient, storagePrefix string) Client { - objectClient := newCachedObjectClient(newPrefixedObjectClient(origObjectClient, storagePrefix)) + objectClient := newCachedObjectClient(client.NewPrefixedObjectClient(origObjectClient, storagePrefix)) return &indexStorageClient{objectClient: objectClient} } diff --git a/pkg/storage/stores/shipper/indexshipper/storage/prefixed_object_client.go b/pkg/storage/stores/shipper/indexshipper/storage/prefixed_object_client.go deleted file mode 100644 index f47752a4b60f..000000000000 --- a/pkg/storage/stores/shipper/indexshipper/storage/prefixed_object_client.go +++ /dev/null @@ -1,63 +0,0 @@ -package storage - -import ( - "context" - "io" - "strings" - - "github.com/grafana/loki/pkg/storage/chunk/client" -) - -type prefixedObjectClient struct { - downstreamClient client.ObjectClient - prefix string -} - -func newPrefixedObjectClient(downstreamClient client.ObjectClient, prefix string) client.ObjectClient { - return prefixedObjectClient{downstreamClient: downstreamClient, prefix: prefix} -} - -func (p prefixedObjectClient) PutObject(ctx context.Context, objectKey string, object io.ReadSeeker) error { - return p.downstreamClient.PutObject(ctx, p.prefix+objectKey, object) -} - -func (p prefixedObjectClient) ObjectExists(ctx context.Context, objectKey string) (bool, error) { - return p.downstreamClient.ObjectExists(ctx, p.prefix+objectKey) -} - -func (p prefixedObjectClient) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) { - return p.downstreamClient.GetObject(ctx, p.prefix+objectKey) -} - -func (p prefixedObjectClient) List(ctx context.Context, prefix, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) { - objects, commonPrefixes, err := p.downstreamClient.List(ctx, p.prefix+prefix, delimiter) - if err != nil { - return nil, nil, err - } - - for i := range objects { - objects[i].Key = strings.TrimPrefix(objects[i].Key, p.prefix) - } - - for i := range commonPrefixes { - commonPrefixes[i] = client.StorageCommonPrefix(strings.TrimPrefix(string(commonPrefixes[i]), p.prefix)) - } - - return objects, commonPrefixes, nil -} - -func (p prefixedObjectClient) DeleteObject(ctx context.Context, objectKey string) error { - return p.downstreamClient.DeleteObject(ctx, p.prefix+objectKey) -} - -func (p prefixedObjectClient) IsObjectNotFoundErr(err error) bool { - return p.downstreamClient.IsObjectNotFoundErr(err) -} - -func (p prefixedObjectClient) IsRetryableErr(err error) bool { - return p.downstreamClient.IsRetryableErr(err) -} - -func (p prefixedObjectClient) Stop() { - p.downstreamClient.Stop() -}