diff --git a/CHANGELOG.md b/CHANGELOG.md index 199dfec426..b14a61df1f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Added +- [#5337](https://github.com/thanos-io/thanos/pull/5337) Thanos Object Store: Add the `prefix` option to buckets - [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache. - [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support. diff --git a/docs/components/receive.md b/docs/components/receive.md index 16dfd8a42f..542546b6e1 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -44,6 +44,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` The example content of `hashring.json`: diff --git a/docs/components/sidecar.md b/docs/components/sidecar.md index 57b69311b4..7a7deeb8e9 100644 --- a/docs/components/sidecar.md +++ b/docs/components/sidecar.md @@ -56,6 +56,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ## Upload compacted blocks diff --git a/docs/components/store.md b/docs/components/store.md index 1162ad9946..3071e7b928 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -15,6 +15,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` In general, an average of 6 MB of local disk space is required per TSDB block stored in the object storage bucket, but for high cardinality blocks with large label set it can even go up to 30MB and more. It is for the pre-computed index, which includes symbols and postings offsets as well as metadata JSON. diff --git a/docs/components/tools.md b/docs/components/tools.md index 80ceed9585..776164aeb0 100644 --- a/docs/components/tools.md +++ b/docs/components/tools.md @@ -101,6 +101,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) . @@ -601,6 +602,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ```$ mdox-exec="thanos tools bucket downsample --help" @@ -675,6 +677,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ```$ mdox-exec="thanos tools bucket mark --help" diff --git a/docs/storage.md b/docs/storage.md index 7e10f5c98c..47f2bd3f19 100644 --- a/docs/storage.md +++ b/docs/storage.md @@ -96,12 +96,15 @@ config: kms_encryption_context: {} encryption_key: "" sts_endpoint: "" +prefix: "" ``` At a minimum, you will need to provide a value for the `bucket`, `endpoint`, `access_key`, and `secret_key` keys. The rest of the keys are optional. However if you set `aws_sdk_auth: true` Thanos will use the default authentication methods of the AWS SDK for go based on [known environment variables](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html) (`AWS_PROFILE`, `AWS_WEB_IDENTITY_TOKEN_FILE` ... etc) and known AWS config files (~/.aws/config). If you turn this on, then the `bucket` and `endpoint` are the required config keys. +The field `prefix` can be used to transparently use prefixes in your S3 bucket. This allows you to separate blocks coming from different sources into paths with different prefixes, making it easier to understand what's going on (i.e. you don't have to use Thanos tooling to know from where which blocks came). + The AWS region to endpoint mapping can be found in this [link](https://docs.aws.amazon.com/general/latest/gr/s3.html). Make sure you use a correct signature version. Currently AWS requires signature v4, so it needs `signature_version2: false`. If you don't specify it, you will get an `Access Denied` error. On the other hand, several S3 compatible APIs use `signature_version2: true`. @@ -255,6 +258,7 @@ type: GCS config: bucket: "" service_account: "" +prefix: "" ``` ##### Using GOOGLE_APPLICATION_CREDENTIALS @@ -356,6 +360,7 @@ config: key_file: "" server_name: "" insecure_skip_verify: false +prefix: "" ``` If `msi_resource` is used, authentication is done via system-assigned managed identity. The value for Azure should be `https://.blob.core.windows.net`. @@ -396,6 +401,7 @@ config: connect_timeout: 10s timeout: 5m use_dynamic_large_objects: false +prefix: "" ``` #### Tencent COS @@ -421,6 +427,7 @@ config: max_idle_conns: 100 max_idle_conns_per_host: 100 max_conns_per_host: 0 +prefix: "" ``` The `secret_key` and `secret_id` field is required. The `http_config` field is optional for optimize HTTP transport settings. There are two ways to configure the required bucket information: @@ -442,6 +449,7 @@ config: bucket: "" access_key_id: "" access_key_secret: "" +prefix: "" ``` Use --objstore.config-file to reference to this configuration file. @@ -457,6 +465,7 @@ config: endpoint: "" access_key: "" secret_key: "" +prefix: "" ``` #### Filesystem @@ -469,6 +478,7 @@ NOTE: This storage type is experimental and might be inefficient. It is NOT advi type: FILESYSTEM config: directory: "" +prefix: "" ``` ### How to add a new client to Thanos? diff --git a/pkg/objstore/client/factory.go b/pkg/objstore/client/factory.go index f066bde362..2061079014 100644 --- a/pkg/objstore/client/factory.go +++ b/pkg/objstore/client/factory.go @@ -41,6 +41,7 @@ const ( type BucketConfig struct { Type ObjProvider `yaml:"type"` Config interface{} `yaml:"config"` + Prefix string `yaml:"prefix" default:""` } // NewBucket initializes and returns new object storage clients. @@ -81,5 +82,6 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe if err != nil { return nil, errors.Wrap(err, fmt.Sprintf("create %s client", bucketConf.Type)) } - return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), bucket, reg)), nil + + return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), objstore.NewPrefixedBucket(bucket, bucketConf.Prefix), reg)), nil } diff --git a/pkg/objstore/objtesting/foreach.go b/pkg/objstore/objtesting/foreach.go index 6d1cad859f..139e724271 100644 --- a/pkg/objstore/objtesting/foreach.go +++ b/pkg/objstore/objtesting/foreach.go @@ -64,6 +64,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) b, err := filesystem.NewBucket(dir) testutil.Ok(t, err) testFn(t, b) + testFn(t, objstore.NewPrefixedBucket(b, "some_prefix")) }) // Optional GCS. @@ -77,6 +78,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) // TODO(bwplotka): Add goleak when https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1025 is resolved. testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } @@ -95,6 +97,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) // This needs to be investigated more. testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } @@ -108,6 +111,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) defer closeFn() testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } @@ -121,6 +125,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) defer closeFn() testFn(t, container) + testFn(t, objstore.NewPrefixedBucket(container, "some_prefix")) }) } @@ -134,6 +139,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) defer closeFn() testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } @@ -147,6 +153,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) defer closeFn() testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } @@ -160,6 +167,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) defer closeFn() testFn(t, bkt) + testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix")) }) } } diff --git a/pkg/objstore/prefixed_bucket.go b/pkg/objstore/prefixed_bucket.go new file mode 100644 index 0000000000..130f14d439 --- /dev/null +++ b/pkg/objstore/prefixed_bucket.go @@ -0,0 +1,97 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package objstore + +import ( + "context" + "io" + "strings" +) + +type PrefixedBucket struct { + bkt Bucket + prefix string +} + +func NewPrefixedBucket(bkt Bucket, prefix string) Bucket { + if validPrefix(prefix) { + return &PrefixedBucket{bkt: bkt, prefix: strings.Trim(prefix, DirDelim)} + } + + return bkt +} + +func validPrefix(prefix string) bool { + prefix = strings.Replace(prefix, "/", "", -1) + return len(prefix) > 0 +} + +func conditionalPrefix(prefix, name string) string { + if len(name) > 0 { + return withPrefix(prefix, name) + } + + return name +} + +func withPrefix(prefix, name string) string { + return prefix + DirDelim + name +} + +func (p *PrefixedBucket) Close() error { + return p.bkt.Close() +} + +// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full +// object name including the prefix of the inspected directory. +// Entries are passed to function in sorted order. +func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error { + pdir := withPrefix(p.prefix, dir) + + return p.bkt.Iter(ctx, pdir, func(s string) error { + return f(strings.TrimPrefix(s, p.prefix+DirDelim)) + }, options...) +} + +// Get returns a reader for the given object name. +func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { + return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name)) +} + +// GetRange returns a new range reader for the given object name and range. +func (p *PrefixedBucket) GetRange(ctx context.Context, name string, off int64, length int64) (io.ReadCloser, error) { + return p.bkt.GetRange(ctx, conditionalPrefix(p.prefix, name), off, length) +} + +// Exists checks if the given object exists in the bucket. +func (p *PrefixedBucket) Exists(ctx context.Context, name string) (bool, error) { + return p.bkt.Exists(ctx, conditionalPrefix(p.prefix, name)) +} + +// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. +func (p *PrefixedBucket) IsObjNotFoundErr(err error) bool { + return p.bkt.IsObjNotFoundErr(err) +} + +// Attributes returns information about the specified object. +func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { + return p.bkt.Attributes(ctx, conditionalPrefix(p.prefix, name)) +} + +// Upload the contents of the reader as an object into the bucket. +// Upload should be idempotent. +func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error { + return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r) +} + +// Delete removes the object with the given name. +// If object does not exists in the moment of deletion, Delete should throw error. +func (p *PrefixedBucket) Delete(ctx context.Context, name string) error { + return p.bkt.Delete(ctx, conditionalPrefix(p.prefix, name)) +} + +// Name returns the bucket name for the provider. +func (p *PrefixedBucket) Name() string { + return p.bkt.Name() +} diff --git a/pkg/objstore/prefixed_bucket_test.go b/pkg/objstore/prefixed_bucket_test.go new file mode 100644 index 0000000000..6e93583052 --- /dev/null +++ b/pkg/objstore/prefixed_bucket_test.go @@ -0,0 +1,92 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package objstore + +import ( + "context" + "io/ioutil" + "sort" + "strings" + "testing" + + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestPrefixedBucket_Acceptance(t *testing.T) { + + prefixes := []string{ + "/someprefix/anotherprefix/", + "someprefix/anotherprefix/", + "someprefix/anotherprefix", + "someprefix/", + "someprefix"} + + for _, prefix := range prefixes { + AcceptanceTest(t, NewPrefixedBucket(NewInMemBucket(), prefix)) + UsesPrefixTest(t, NewInMemBucket(), prefix) + } +} + +func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) { + testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/file1.jpg", strings.NewReader("test-data1"))) + + pBkt := NewPrefixedBucket(bkt, prefix) + rc1, err := pBkt.Get(context.Background(), "file1.jpg") + testutil.Ok(t, err) + + testutil.Ok(t, err) + defer func() { testutil.Ok(t, rc1.Close()) }() + content, err := ioutil.ReadAll(rc1) + testutil.Ok(t, err) + testutil.Equals(t, "test-data1", string(content)) + + testutil.Ok(t, pBkt.Upload(context.Background(), "file2.jpg", strings.NewReader("test-data2"))) + rc2, err := bkt.Get(context.Background(), strings.Trim(prefix, "/")+"/file2.jpg") + testutil.Ok(t, err) + defer func() { testutil.Ok(t, rc2.Close()) }() + contentUpload, err := ioutil.ReadAll(rc2) + testutil.Ok(t, err) + testutil.Equals(t, "test-data2", string(contentUpload)) + + testutil.Ok(t, pBkt.Delete(context.Background(), "file2.jpg")) + _, err = bkt.Get(context.Background(), strings.Trim(prefix, "/")+"/file2.jpg") + testutil.NotOk(t, err) + testutil.Assert(t, pBkt.IsObjNotFoundErr(err), "expected not found error got %s", err) + + rc3, err := pBkt.GetRange(context.Background(), "file1.jpg", 1, 3) + testutil.Ok(t, err) + defer func() { testutil.Ok(t, rc3.Close()) }() + content, err = ioutil.ReadAll(rc3) + testutil.Ok(t, err) + testutil.Equals(t, "est", string(content)) + + ok, err := pBkt.Exists(context.Background(), "file1.jpg") + testutil.Ok(t, err) + testutil.Assert(t, ok, "expected exits") + + attrs, err := pBkt.Attributes(context.Background(), "file1.jpg") + testutil.Ok(t, err) + testutil.Assert(t, attrs.Size == 10, "expected size to be equal to 10") + + testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1"))) + seen := []string{} + testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error { + seen = append(seen, fn) + return nil + }, WithRecursiveIter)) + expected := []string{"dir/file1.jpg", "file1.jpg"} + sort.Strings(expected) + sort.Strings(seen) + testutil.Equals(t, expected, seen) + + seen = []string{} + testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error { + seen = append(seen, fn) + return nil + })) + expected = []string{"dir/", "file1.jpg"} + sort.Strings(expected) + sort.Strings(seen) + testutil.Equals(t, expected, seen) +}