Skip to content

Commit

Permalink
sidecar/compact/store/receiver - Add the prefix option to buckets (#5337
Browse files Browse the repository at this point in the history
)

* Create prefixed bucket

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* started PrefixedBucket tests

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* finish objstore tests

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* Simplify string removal logic

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Test more prefix cases on PrefixedBucket

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Only use a prefixedbucket if we have a valid prefix

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Add single unit test for prefixedBucket prefix

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* test other prefixes on UsesPrefixTest

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* add remaining methods to UsesPrefixTest

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* add prefix to docs examples

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* Simplify Iter method

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* add prefix explanation to S3 docs

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* Conclusion of prefix sentence on docs

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Use DirDelim instead of magic string

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Add log when using prefixed bucket

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Remove "@" from test string to make them simpler

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* fix BucketConfig Config type - back to interface

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* add changelog

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* add missing checks in UsesPrefixTest

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* fix linter and test errors

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* Add license to new files

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Remove autogenerated docs

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Remove duplicated transformation of string->[]byte

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Add prefixed bucket on all e2e tests for S3

The idea is that if it works, we can add for all other providers.
Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Add e2e tests using prefixed bucket to all providers

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* refactor: move validPrefix to prefixed_bucket logic

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* Enhance the documentation about prefix.

Signed-off-by: jademcosta <jademcosta@gmail.com>

* Fix format
Signed-off-by: jademcosta <jademcosta@gmail.com>

* Add prefix entry on bucket config example

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Removing redundancies on prefix checks and tests

We already check if the prefix if not empty when creating the bucket.

Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Remove redundant YAML unmarshal
Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Remove unused parameter
Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* Remove docs that should be auto-geneated
Signed-off-by: jademcosta <jade.costa@nubank.com.br>

* refactor: move prefix to config root level

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* add auto generated docs

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

* fix changelog

Signed-off-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>

Co-authored-by: Maria Eduarda Duarte <dudammduarte@yahoo.com.br>
  • Loading branch information
jademcosta and dudaduarte committed Jun 9, 2022
1 parent 296c4ab commit 5da60e0
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Added

- [#5337](https://github.com/thanos-io/thanos/pull/5337) Thanos Object Store: Add the `prefix` option to buckets
- [#5352](https://github.com/thanos-io/thanos/pull/5352) Cache: Add cache metrics to groupcache.
- [#5391](https://github.com/thanos-io/thanos/pull/5391) Receive: Add relabeling support.

Expand Down
1 change: 1 addition & 0 deletions docs/components/receive.md
Expand Up @@ -44,6 +44,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

The example content of `hashring.json`:
Expand Down
1 change: 1 addition & 0 deletions docs/components/sidecar.md
Expand Up @@ -56,6 +56,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

## Upload compacted blocks
Expand Down
1 change: 1 addition & 0 deletions docs/components/store.md
Expand Up @@ -15,6 +15,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

In general, an average of 6 MB of local disk space is required per TSDB block stored in the object storage bucket, but for high cardinality blocks with large label set it can even go up to 30MB and more. It is for the pre-computed index, which includes symbols and postings offsets as well as metadata JSON.
Expand Down
3 changes: 3 additions & 0 deletions docs/components/tools.md
Expand Up @@ -101,6 +101,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

Bucket can be extended to add more subcommands that will be helpful when working with object storage buckets by adding a new command within [`/cmd/thanos/tools_bucket.go`](../../cmd/thanos/tools_bucket.go) .
Expand Down Expand Up @@ -601,6 +602,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket downsample --help"
Expand Down Expand Up @@ -675,6 +677,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

```$ mdox-exec="thanos tools bucket mark --help"
Expand Down
10 changes: 10 additions & 0 deletions docs/storage.md
Expand Up @@ -96,12 +96,15 @@ config:
kms_encryption_context: {}
encryption_key: ""
sts_endpoint: ""
prefix: ""
```

At a minimum, you will need to provide a value for the `bucket`, `endpoint`, `access_key`, and `secret_key` keys. The rest of the keys are optional.

However if you set `aws_sdk_auth: true` Thanos will use the default authentication methods of the AWS SDK for go based on [known environment variables](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html) (`AWS_PROFILE`, `AWS_WEB_IDENTITY_TOKEN_FILE` ... etc) and known AWS config files (~/.aws/config). If you turn this on, then the `bucket` and `endpoint` are the required config keys.

The field `prefix` can be used to transparently use prefixes in your S3 bucket. This allows you to separate blocks coming from different sources into paths with different prefixes, making it easier to understand what's going on (i.e. you don't have to use Thanos tooling to know from where which blocks came).

The AWS region to endpoint mapping can be found in this [link](https://docs.aws.amazon.com/general/latest/gr/s3.html).

Make sure you use a correct signature version. Currently AWS requires signature v4, so it needs `signature_version2: false`. If you don't specify it, you will get an `Access Denied` error. On the other hand, several S3 compatible APIs use `signature_version2: true`.
Expand Down Expand Up @@ -255,6 +258,7 @@ type: GCS
config:
bucket: ""
service_account: ""
prefix: ""
```

##### Using GOOGLE_APPLICATION_CREDENTIALS
Expand Down Expand Up @@ -356,6 +360,7 @@ config:
key_file: ""
server_name: ""
insecure_skip_verify: false
prefix: ""
```

If `msi_resource` is used, authentication is done via system-assigned managed identity. The value for Azure should be `https://<storage-account-name>.blob.core.windows.net`.
Expand Down Expand Up @@ -396,6 +401,7 @@ config:
connect_timeout: 10s
timeout: 5m
use_dynamic_large_objects: false
prefix: ""
```

#### Tencent COS
Expand All @@ -421,6 +427,7 @@ config:
max_idle_conns: 100
max_idle_conns_per_host: 100
max_conns_per_host: 0
prefix: ""
```

The `secret_key` and `secret_id` field is required. The `http_config` field is optional for optimize HTTP transport settings. There are two ways to configure the required bucket information:
Expand All @@ -442,6 +449,7 @@ config:
bucket: ""
access_key_id: ""
access_key_secret: ""
prefix: ""
```

Use --objstore.config-file to reference to this configuration file.
Expand All @@ -457,6 +465,7 @@ config:
endpoint: ""
access_key: ""
secret_key: ""
prefix: ""
```

#### Filesystem
Expand All @@ -469,6 +478,7 @@ NOTE: This storage type is experimental and might be inefficient. It is NOT advi
type: FILESYSTEM
config:
directory: ""
prefix: ""
```

### How to add a new client to Thanos?
Expand Down
4 changes: 3 additions & 1 deletion pkg/objstore/client/factory.go
Expand Up @@ -41,6 +41,7 @@ const (
type BucketConfig struct {
Type ObjProvider `yaml:"type"`
Config interface{} `yaml:"config"`
Prefix string `yaml:"prefix" default:""`
}

// NewBucket initializes and returns new object storage clients.
Expand Down Expand Up @@ -81,5 +82,6 @@ func NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registe
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("create %s client", bucketConf.Type))
}
return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), bucket, reg)), nil

return objstore.NewTracingBucket(objstore.BucketWithMetrics(bucket.Name(), objstore.NewPrefixedBucket(bucket, bucketConf.Prefix), reg)), nil
}
8 changes: 8 additions & 0 deletions pkg/objstore/objtesting/foreach.go
Expand Up @@ -64,6 +64,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
b, err := filesystem.NewBucket(dir)
testutil.Ok(t, err)
testFn(t, b)
testFn(t, objstore.NewPrefixedBucket(b, "some_prefix"))
})

// Optional GCS.
Expand All @@ -77,6 +78,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))

// TODO(bwplotka): Add goleak when https://github.com/GoogleCloudPlatform/google-cloud-go/issues/1025 is resolved.
testFn(t, bkt)
testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix"))
})
}

Expand All @@ -95,6 +97,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
// This needs to be investigated more.

testFn(t, bkt)
testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix"))
})
}

Expand All @@ -108,6 +111,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
defer closeFn()

testFn(t, bkt)
testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix"))
})
}

Expand All @@ -121,6 +125,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
defer closeFn()

testFn(t, container)
testFn(t, objstore.NewPrefixedBucket(container, "some_prefix"))
})
}

Expand All @@ -134,6 +139,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
defer closeFn()

testFn(t, bkt)
testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix"))
})
}

Expand All @@ -147,6 +153,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
defer closeFn()

testFn(t, bkt)
testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix"))
})
}

Expand All @@ -160,6 +167,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket))
defer closeFn()

testFn(t, bkt)
testFn(t, objstore.NewPrefixedBucket(bkt, "some_prefix"))
})
}
}
97 changes: 97 additions & 0 deletions pkg/objstore/prefixed_bucket.go
@@ -0,0 +1,97 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package objstore

import (
"context"
"io"
"strings"
)

type PrefixedBucket struct {
bkt Bucket
prefix string
}

func NewPrefixedBucket(bkt Bucket, prefix string) Bucket {
if validPrefix(prefix) {
return &PrefixedBucket{bkt: bkt, prefix: strings.Trim(prefix, DirDelim)}
}

return bkt
}

func validPrefix(prefix string) bool {
prefix = strings.Replace(prefix, "/", "", -1)
return len(prefix) > 0
}

func conditionalPrefix(prefix, name string) string {
if len(name) > 0 {
return withPrefix(prefix, name)
}

return name
}

func withPrefix(prefix, name string) string {
return prefix + DirDelim + name
}

func (p *PrefixedBucket) Close() error {
return p.bkt.Close()
}

// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.
// Entries are passed to function in sorted order.
func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
pdir := withPrefix(p.prefix, dir)

return p.bkt.Iter(ctx, pdir, func(s string) error {
return f(strings.TrimPrefix(s, p.prefix+DirDelim))
}, options...)
}

// Get returns a reader for the given object name.
func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name))
}

// GetRange returns a new range reader for the given object name and range.
func (p *PrefixedBucket) GetRange(ctx context.Context, name string, off int64, length int64) (io.ReadCloser, error) {
return p.bkt.GetRange(ctx, conditionalPrefix(p.prefix, name), off, length)
}

// Exists checks if the given object exists in the bucket.
func (p *PrefixedBucket) Exists(ctx context.Context, name string) (bool, error) {
return p.bkt.Exists(ctx, conditionalPrefix(p.prefix, name))
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (p *PrefixedBucket) IsObjNotFoundErr(err error) bool {
return p.bkt.IsObjNotFoundErr(err)
}

// Attributes returns information about the specified object.
func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
return p.bkt.Attributes(ctx, conditionalPrefix(p.prefix, name))
}

// Upload the contents of the reader as an object into the bucket.
// Upload should be idempotent.
func (p *PrefixedBucket) Upload(ctx context.Context, name string, r io.Reader) error {
return p.bkt.Upload(ctx, conditionalPrefix(p.prefix, name), r)
}

// Delete removes the object with the given name.
// If object does not exists in the moment of deletion, Delete should throw error.
func (p *PrefixedBucket) Delete(ctx context.Context, name string) error {
return p.bkt.Delete(ctx, conditionalPrefix(p.prefix, name))
}

// Name returns the bucket name for the provider.
func (p *PrefixedBucket) Name() string {
return p.bkt.Name()
}
92 changes: 92 additions & 0 deletions pkg/objstore/prefixed_bucket_test.go
@@ -0,0 +1,92 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package objstore

import (
"context"
"io/ioutil"
"sort"
"strings"
"testing"

"github.com/thanos-io/thanos/pkg/testutil"
)

func TestPrefixedBucket_Acceptance(t *testing.T) {

prefixes := []string{
"/someprefix/anotherprefix/",
"someprefix/anotherprefix/",
"someprefix/anotherprefix",
"someprefix/",
"someprefix"}

for _, prefix := range prefixes {
AcceptanceTest(t, NewPrefixedBucket(NewInMemBucket(), prefix))
UsesPrefixTest(t, NewInMemBucket(), prefix)
}
}

func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/file1.jpg", strings.NewReader("test-data1")))

pBkt := NewPrefixedBucket(bkt, prefix)
rc1, err := pBkt.Get(context.Background(), "file1.jpg")
testutil.Ok(t, err)

testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc1.Close()) }()
content, err := ioutil.ReadAll(rc1)
testutil.Ok(t, err)
testutil.Equals(t, "test-data1", string(content))

testutil.Ok(t, pBkt.Upload(context.Background(), "file2.jpg", strings.NewReader("test-data2")))
rc2, err := bkt.Get(context.Background(), strings.Trim(prefix, "/")+"/file2.jpg")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc2.Close()) }()
contentUpload, err := ioutil.ReadAll(rc2)
testutil.Ok(t, err)
testutil.Equals(t, "test-data2", string(contentUpload))

testutil.Ok(t, pBkt.Delete(context.Background(), "file2.jpg"))
_, err = bkt.Get(context.Background(), strings.Trim(prefix, "/")+"/file2.jpg")
testutil.NotOk(t, err)
testutil.Assert(t, pBkt.IsObjNotFoundErr(err), "expected not found error got %s", err)

rc3, err := pBkt.GetRange(context.Background(), "file1.jpg", 1, 3)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc3.Close()) }()
content, err = ioutil.ReadAll(rc3)
testutil.Ok(t, err)
testutil.Equals(t, "est", string(content))

ok, err := pBkt.Exists(context.Background(), "file1.jpg")
testutil.Ok(t, err)
testutil.Assert(t, ok, "expected exits")

attrs, err := pBkt.Attributes(context.Background(), "file1.jpg")
testutil.Ok(t, err)
testutil.Assert(t, attrs.Size == 10, "expected size to be equal to 10")

testutil.Ok(t, bkt.Upload(context.Background(), strings.Trim(prefix, "/")+"/dir/file1.jpg", strings.NewReader("test-data1")))
seen := []string{}
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
seen = append(seen, fn)
return nil
}, WithRecursiveIter))
expected := []string{"dir/file1.jpg", "file1.jpg"}
sort.Strings(expected)
sort.Strings(seen)
testutil.Equals(t, expected, seen)

seen = []string{}
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
seen = append(seen, fn)
return nil
}))
expected = []string{"dir/", "file1.jpg"}
sort.Strings(expected)
sort.Strings(seen)
testutil.Equals(t, expected, seen)
}

0 comments on commit 5da60e0

Please sign in to comment.