Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pkg/objstore/s3): support for prefixes #1392

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ config:
insecure_skip_verify: false
trace:
enable: false
prefix: ""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be curious and actually I think it's must-have to actually make this change on bucket level, not each bucket client level.. WDYT? (:

part_size: 134217728
```

Expand Down
31 changes: 23 additions & 8 deletions pkg/objstore/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {
PutUserMetadata map[string]string `yaml:"put_user_metadata"`
HTTPConfig HTTPConfig `yaml:"http_config"`
TraceConfig TraceConfig `yaml:"trace"`
Prefix string `yaml:"prefix"`
// PartSize used for multipart upload. Only used if uploaded object size is known and larger than configured PartSize.
PartSize uint64 `yaml:"part_size"`
}
Expand All @@ -78,6 +79,7 @@ type Bucket struct {
sse encrypt.ServerSide
putUserMetadata map[string]string
partSize uint64
objectPrefix string
}

// parseConfig unmarshals a buffer into a Config with default HTTPConfig values.
Expand All @@ -87,6 +89,11 @@ func parseConfig(conf []byte) (Config, error) {
return Config{}, err
}

// here for testing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if config.Prefix != "" {
config.Prefix = strings.TrimSuffix(config.Prefix, DirDelim) + DirDelim
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the configuration parsing test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding normal code just for tests doesn't sound like a good idea, consider removing this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright.

}

return config, nil
}

Expand Down Expand Up @@ -180,6 +187,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
sse: sse,
putUserMetadata: config.PutUserMetadata,
partSize: config.PartSize,
objectPrefix: config.Prefix,
}
return bkt, nil
}
Expand All @@ -202,6 +210,12 @@ func validate(conf Config) error {
if conf.AccessKey != "" && conf.SecretKey == "" {
return errors.New("no s3 secret_key specified while access_key is present in config file; either both should be present in config or envvars/IAM should be used.")
}

// trim trailing slash and insert one
jaredallard marked this conversation as resolved.
Show resolved Hide resolved
if conf.Prefix != "" {
conf.Prefix = strings.TrimSuffix(conf.Prefix, DirDelim) + DirDelim
}

return nil
}

Expand All @@ -224,20 +238,21 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error) err
dir = strings.TrimSuffix(dir, DirDelim) + DirDelim
}

for object := range b.client.ListObjects(b.name, dir, false, ctx.Done()) {
for object := range b.client.ListObjects(b.name, b.objectPrefix+dir, false, ctx.Done()) {
keyName := strings.TrimPrefix(object.Key, b.objectPrefix)
// Catch the error when failed to list objects.
if object.Err != nil {
return object.Err
}
// This sometimes happens with empty buckets.
if object.Key == "" {
if keyName == "" {
continue
}
// The s3 client can also return the directory itself in the ListObjects call above.
if object.Key == dir {
if keyName == dir {
continue
}
if err := f(object.Key); err != nil {
if err := f(keyName); err != nil {
return err
}
}
Expand All @@ -252,7 +267,7 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
}
r, err := b.client.GetObjectWithContext(ctx, b.name, name, *opts)
r, err := b.client.GetObjectWithContext(ctx, b.name, b.objectPrefix+name, *opts)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -281,7 +296,7 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (

// Exists checks if the given object exists.
func (b *Bucket) Exists(ctx context.Context, name string) (bool, error) {
_, err := b.client.StatObject(b.name, name, minio.StatObjectOptions{})
_, err := b.client.StatObject(b.name, b.objectPrefix+name, minio.StatObjectOptions{})
if err != nil {
if b.IsObjNotFoundErr(err) {
return false, nil
Expand Down Expand Up @@ -319,7 +334,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {
if _, err := b.client.PutObjectWithContext(
ctx,
b.name,
name,
b.objectPrefix+name,
r,
size,
minio.PutObjectOptions{
Expand All @@ -336,7 +351,7 @@ func (b *Bucket) Upload(ctx context.Context, name string, r io.Reader) error {

// Delete removes the object with the given name.
func (b *Bucket) Delete(ctx context.Context, name string) error {
return b.client.RemoveObject(b.name, name)
return b.client.RemoveObject(b.name, b.objectPrefix+name)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
Expand Down
22 changes: 22 additions & 0 deletions pkg/objstore/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,28 @@ http_config:
}
}

func TestParseConfig_PrefixNoSlash(t *testing.T) {
input := []byte(`bucket: abcd
prefix: ssss`)
cfg, err := parseConfig(input)
testutil.Ok(t, err)

if cfg.Prefix != "ssss/" {
t.Errorf("parsing of prefix failed: got '%s', expected 'ssss/'", cfg.Prefix)
}
}

func TestParseConfig_PrefixSlash(t *testing.T) {
input := []byte(`bucket: abcd
prefix: ssss/`)
cfg, err := parseConfig(input)
testutil.Ok(t, err)

if cfg.Prefix != "ssss/" {
t.Errorf("parsing of prefix failed: got '%s', expected 'ssss/'", cfg.Prefix)
}
}

func TestValidate_OK(t *testing.T) {
input := []byte(`bucket: "bucket-name"
endpoint: "s3-endpoint"
Expand Down