From 8a71b0e79a6387edd836e239d0178171ba29f178 Mon Sep 17 00:00:00 2001 From: April Schleck Date: Fri, 28 Jul 2023 16:05:37 +0000 Subject: [PATCH] S3: Allow setting a constant prefix for all created keys Adds a new option under the aws stanza named key_prefix. --- CHANGELOG.md | 1 + docs/sources/configure/_index.md | 8 +++ .../chunk/client/aws/s3_storage_client.go | 10 ++-- .../client/aws/s3_storage_client_test.go | 52 +++++++++++++++++++ 4 files changed, 67 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a7b302e794d0..32e758f77ad8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [10096](https://github.com/grafana/loki/pull/10096) **aschleck**: S3: Allow setting a constant prefix for all created keys * [10010](https://github.com/grafana/loki/pull/10010) **rasta-rocket**: feat(promtail): retrieve BotTags field from cloudflare * [9995](https://github.com/grafana/loki/pull/9995) **chaudum**: Add jitter to the flush interval to prevent multiple ingesters to flush at the same time. * [9797](https://github.com/grafana/loki/pull/9797) **chaudum**: Add new `loki_index_gateway_requests_total` counter metric to observe per-tenant RPS diff --git a/docs/sources/configure/_index.md b/docs/sources/configure/_index.md index fc78f4de82db..00950b0d984a 100644 --- a/docs/sources/configure/_index.md +++ b/docs/sources/configure/_index.md @@ -4141,6 +4141,10 @@ dynamodb: # CLI flag: -s3.buckets [bucketnames: | default = ""] +# Sets a constant prefix for all keys inserted in S3. Example: loki/ +# CLI flag: -s3.key-prefix +[key_prefix: | default = ""] + # S3 Endpoint to connect to. # CLI flag: -s3.endpoint [endpoint: | default = ""] @@ -4419,6 +4423,10 @@ The `s3_storage_config` block configures the connection to Amazon S3 object stor # CLI flag: -.storage.s3.buckets [bucketnames: | default = ""] +# Sets a constant prefix for all keys inserted in S3. Example: loki/ +# CLI flag: -.storage.s3.key-prefix +[key_prefix: | default = ""] + # S3 Endpoint to connect to. # CLI flag: -.storage.s3.endpoint [endpoint: | default = ""] diff --git a/pkg/storage/chunk/client/aws/s3_storage_client.go b/pkg/storage/chunk/client/aws/s3_storage_client.go index 70b19dc67d51..8516eec755f3 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client.go @@ -69,6 +69,7 @@ type S3Config struct { S3ForcePathStyle bool BucketNames string + KeyPrefix string `yaml:"key_prefix" doc:"description=Sets a constant prefix for all keys inserted in S3. Example: loki/"` Endpoint string `yaml:"endpoint"` Region string `yaml:"region"` AccessKeyID string `yaml:"access_key_id"` @@ -105,6 +106,7 @@ func (cfg *S3Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { "If only region is specified as a host, proper endpoint will be deduced. Use inmemory:/// to use a mock in-memory implementation.") f.BoolVar(&cfg.S3ForcePathStyle, prefix+"s3.force-path-style", false, "Set this to `true` to force the request to use path-style addressing.") f.StringVar(&cfg.BucketNames, prefix+"s3.buckets", "", "Comma separated list of bucket names to evenly distribute chunks over. Overrides any buckets specified in s3.url flag") + f.StringVar(&cfg.KeyPrefix, prefix+"s3.key-prefix", "", "The prefix to all keys inserted in s3. Example: loki-instances/west/") f.StringVar(&cfg.Endpoint, prefix+"s3.endpoint", "", "S3 Endpoint to connect to.") f.StringVar(&cfg.Region, prefix+"s3.region", "", "AWS region to use.") @@ -345,7 +347,7 @@ func (a *S3ObjectClient) DeleteObject(ctx context.Context, objectKey string) err return instrument.CollectedRequest(ctx, "S3.DeleteObject", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { deleteObjectInput := &s3.DeleteObjectInput{ Bucket: aws.String(a.bucketFromKey(objectKey)), - Key: aws.String(objectKey), + Key: aws.String(a.cfg.KeyPrefix + objectKey), } _, err := a.S3.DeleteObjectWithContext(ctx, deleteObjectInput) @@ -385,7 +387,7 @@ func (a *S3ObjectClient) GetObject(ctx context.Context, objectKey string) (io.Re var requestErr error resp, requestErr = a.hedgedS3.GetObjectWithContext(ctx, &s3.GetObjectInput{ Bucket: aws.String(bucket), - Key: aws.String(objectKey), + Key: aws.String(a.cfg.KeyPrefix + objectKey), }) return requestErr }) @@ -409,7 +411,7 @@ func (a *S3ObjectClient) PutObject(ctx context.Context, objectKey string, object putObjectInput := &s3.PutObjectInput{ Body: object, Bucket: aws.String(a.bucketFromKey(objectKey)), - Key: aws.String(objectKey), + Key: aws.String(a.cfg.KeyPrefix + objectKey), StorageClass: aws.String(a.cfg.StorageClass), } @@ -433,7 +435,7 @@ func (a *S3ObjectClient) List(ctx context.Context, prefix, delimiter string) ([] err := loki_instrument.TimeRequest(ctx, "S3.List", s3RequestDuration, instrument.ErrorCode, func(ctx context.Context) error { input := s3.ListObjectsV2Input{ Bucket: aws.String(a.bucketNames[i]), - Prefix: aws.String(prefix), + Prefix: aws.String(a.cfg.KeyPrefix + prefix), Delimiter: aws.String(delimiter), } diff --git a/pkg/storage/chunk/client/aws/s3_storage_client_test.go b/pkg/storage/chunk/client/aws/s3_storage_client_test.go index 00ec9eba4072..e77c2c4f63fb 100644 --- a/pkg/storage/chunk/client/aws/s3_storage_client_test.go +++ b/pkg/storage/chunk/client/aws/s3_storage_client_test.go @@ -29,6 +29,58 @@ func (f RoundTripperFunc) RoundTrip(req *http.Request) (*http.Response, error) { return f(req) } +func TestRequestPrefix(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, r.URL.Path) + })) + defer ts.Close() + + cfg := S3Config{ + Endpoint: ts.URL, + BucketNames: "buck-o", + KeyPrefix: "some/prefix/", + S3ForcePathStyle: true, + Insecure: true, + AccessKeyID: "key", + SecretAccessKey: flagext.SecretWithValue("secret"), + } + + tests := []struct { + name string + key string + expected string + }{ + { + name: "Single", + key: "key", + expected: "/buck-o/some/prefix/key", + }, + { + name: "Multi", + key: "some/random/key", + expected: "/buck-o/some/prefix/some/random/key", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewS3ObjectClient(cfg, hedging.Config{}) + require.NoError(t, err) + + readCloser, _, err := client.GetObject(context.Background(), tt.key) + require.NoError(t, err) + + buffer := make([]byte, 100) + _, err = readCloser.Read(buffer) + if err != io.EOF { + require.NoError(t, err) + } + + assert.Equal(t, tt.expected, strings.Trim(string(buffer), "\n\x00")) + }) + } +} + func TestRequestMiddleware(t *testing.T) { ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { fmt.Fprintln(w, r.Header.Get("echo-me"))