Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkg/objstore/s3): support for prefixes #1392

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ config:
trace:
enable: false
part_size: 0
prefix: ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be curious and actually I think it's must-have to actually make this change on bucket level, not each bucket client level.. WDYT? (:

```

At a minimum, you will need to provide a value for the `bucket`, `endpoint`, `access_key`, and `secret_key` keys. The rest of the keys are optional.
Expand Down
36 changes: 28 additions & 8 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type Config struct {
HTTPConfig HTTPConfig `yaml:"http_config"`
TraceConfig TraceConfig `yaml:"trace"`
PartSize uint64 `yaml:"part_size"`
Prefix string `yaml:"prefix"`
}

type TraceConfig struct {
Expand All @@ -71,6 +72,7 @@ type Bucket struct {
sse encrypt.ServerSide
putUserMetadata map[string]string
partSize uint64
objectPrefix string
}

// parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
Expand All @@ -92,6 +94,11 @@ func parseConfig(conf []byte) (Config, error) {
config.PartSize = defaultMinPartSize
}

// here for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if config.Prefix != "" {
config.Prefix = strings.TrimSuffix(config.Prefix, DirDelim) + DirDelim
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the configuration parsing test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding normal code just for tests doesn't sound like a good idea, consider removing this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright.

}

return config, nil
}

Expand All @@ -105,7 +112,7 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error
return NewBucketWithConfig(logger, config, component)
}

// NewBucket returns a new Bucket using the provided s3 config values.
// NewBucketWithConfig returns a new Bucket using the provided s3 config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
var chain []credentials.Provider

Expand Down Expand Up @@ -186,6 +193,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
sse: sse,
putUserMetadata: config.PutUserMetadata,
partSize: config.PartSize,
objectPrefix: config.Prefix,
}
return bkt, nil
}
Expand All @@ -208,6 +216,17 @@ func validate(conf Config) error {
if conf.AccessKey != "" && conf.SecretKey == "" {
return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
}

// ensure that the prefix isn't too long
if conf.Prefix != "" && len(conf.Prefix) >= 1024 {
return errors.New("prefix is too long (limited by Amazon at 1024 bytes long)")
jaredallard marked this conversation as resolved.
Show resolved Hide resolved
}

// trim trailing slash and insert one
jaredallard marked this conversation as resolved.
Show resolved Hide resolved
if conf.Prefix != "" {
conf.Prefix = strings.TrimSuffix(conf.Prefix, DirDelim) + DirDelim
}

return nil
}

Expand All @@ -230,16 +249,17 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}

for object := range b.client.ListObjects(b.name, dir, false, ctx.Done()) {
for object := range b.client.ListObjects(b.name, b.objectPrefix+dir, false, ctx.Done()) {
keyName := strings.Replace(object.Key, b.objectPrefix, "", 1)
jaredallard marked this conversation as resolved.
Show resolved Hide resolved
// Catch the error when failed to list objects.
if object.Err != nil {
return object.Err
}
// This sometimes happens with empty buckets.
if object.Key == "" {
if keyName == "" {
continue
}
if err := f(object.Key); err != nil {
if err := f(keyName); err != nil {
return err
}
}
Expand All @@ -254,7 +274,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
}
r, err := b.client.GetObjectWithContext(ctx, b.name, name, *opts)
r, err := b.client.GetObjectWithContext(ctx, b.name, b.objectPrefix+name, *opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -283,7 +303,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
_, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
_, err := b.client.StatObject(b.name, b.objectPrefix+name, minio.StatObjectOptions{})
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
Expand Down Expand Up @@ -316,7 +336,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.PutObjectWithContext(
ctx,
b.name,
name,
b.objectPrefix+name,
r,
fileSize,
minio.PutObjectOptions{
Expand All @@ -333,7 +353,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.client.RemoveObject(b.name, name)
return b.client.RemoveObject(b.name, b.objectPrefix+name)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
Expand Down
22 changes: 22 additions & 0 deletions pkg/objstore/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ http_config:
}
}

func TestParseConfig_PrefixNoSlash(t *testing.T) {
input := []byte(`bucket: abcd
prefix: ssss`)
cfg, err := parseConfig(input)
testutil.Ok(t, err)

if cfg.Prefix != "ssss/" {
t.Errorf("parsing of prefix failed: got '%s', expected 'ssss/'", cfg.Prefix)
}
}

func TestParseConfig_PrefixSlash(t *testing.T) {
input := []byte(`bucket: abcd
prefix: ssss/`)
cfg, err := parseConfig(input)
testutil.Ok(t, err)

if cfg.Prefix != "ssss/" {
t.Errorf("parsing of prefix failed: got '%s', expected 'ssss/'", cfg.Prefix)
}
}

func TestValidate_OK(t *testing.T) {
input := []byte(`bucket: "bucket-name"
endpoint: "s3-endpoint"
Expand Down