Skip to content

Commit

Permalink
refactor(storage): replace manual retries in tests (#6510)
Browse files Browse the repository at this point in the history
Fixes #5032

Also adds a retry to the HMAC key test that fixes #6544
  • Loading branch information
BrennaEpp committed Aug 28, 2022
1 parent c85704f commit c220ca2
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 153 deletions.
280 changes: 128 additions & 152 deletions storage/integration_test.go
Expand Up @@ -611,32 +611,32 @@ func TestIntegration_BucketPolicyOnly(t *testing.T) {
t.Fatal("got a zero time value, want a populated value")
}

// Confirm BucketAccessControl returns error.
err = retry(ctx, func() error {
_, err = bkt.ACL().List(ctx)
return nil
}, func() error {
if err == nil {
return fmt.Errorf("ACL.List: expected bucket ACL list to fail")
}
return nil
})
if err != nil {
t.Fatal(err)
// Confirm BucketAccessControl returns error, since we cannot get legacy ACL
// for a bucket that has uniform bucket-level access.

// Metadata updates may be delayed up to 10s. Since we expect an error from
// this call, we retry on a nil error until we get the non-retryable error
// that we are expecting.
idempotentOrNilRetry := func(err error) bool {
return err == nil || ShouldRetry(err)
}

// Confirm ObjectAccessControl returns error.
err = retry(ctx, func() error {
_, err = o.ACL().List(ctx)
return nil
}, func() error {
if err == nil {
return fmt.Errorf("ACL.List: expected object ACL list to fail")
}
return nil
})
if err != nil {
t.Fatal(err)
ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, time.Second*10)

b := bkt.Retryer(WithErrorFunc(idempotentOrNilRetry))
_, err = b.ACL().List(ctxWithTimeout)
cancelCtx()
if err == nil {
t.Errorf("ACL.List: expected bucket ACL list to fail")
}

// Confirm ObjectAccessControl returns error, for same reason as above.
ctxWithTimeout, cancelCtx = context.WithTimeout(ctx, time.Second*10)

_, err = o.Retryer(WithErrorFunc(idempotentOrNilRetry)).ACL().List(ctxWithTimeout)
cancelCtx()
if err == nil {
t.Errorf("ACL.List: expected object ACL list to fail")
}

// Disable BucketPolicyOnly.
Expand All @@ -647,21 +647,15 @@ func TestIntegration_BucketPolicyOnly(t *testing.T) {
}

// Check that the object ACLs are the same.
var acls []ACLRule
err = retry(ctx, func() error {
acls, err = o.ACL().List(ctx)
if err != nil {
return fmt.Errorf("ACL.List: object ACL list failed: %v", err)
}
return nil
}, func() error {
if !containsACL(acls, aclEntity, RoleReader) {
return fmt.Errorf("containsACL: expected ACLs %v to include custom ACL entity %v", acls, aclEntity)
}
return nil
})
ctxWithTimeout, cancelCtx = context.WithTimeout(ctx, time.Second*10)
acls, err := o.Retryer(WithPolicy(RetryAlways)).ACL().List(ctxWithTimeout)
cancelCtx()
if err != nil {
t.Fatal(err)
t.Errorf("ACL.List: object ACL list failed: %v", err)
}

if !containsACL(acls, aclEntity, RoleReader) {
t.Errorf("containsACL: expected ACLs %v to include custom ACL entity %v", acls, aclEntity)
}
}

Expand Down Expand Up @@ -702,31 +696,27 @@ func TestIntegration_UniformBucketLevelAccess(t *testing.T) {
}

// Confirm BucketAccessControl returns error.
err = retry(ctx, func() error {
_, err = bkt.ACL().List(ctx)
return nil
}, func() error {
if err == nil {
return fmt.Errorf("ACL.List: expected bucket ACL list to fail")
}
return nil
})
if err != nil {
t.Fatal(err)
// We retry on nil to account for propagation delay in metadata update.
idempotentOrNilRetry := func(err error) bool {
return err == nil || ShouldRetry(err)
}

ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, time.Second*10)

b := bkt.Retryer(WithErrorFunc(idempotentOrNilRetry))
_, err = b.ACL().List(ctxWithTimeout)
cancelCtx()
if err == nil {
t.Errorf("ACL.List: expected bucket ACL list to fail")
}

// Confirm ObjectAccessControl returns error.
err = retry(ctx, func() error {
_, err = o.ACL().List(ctx)
return nil
}, func() error {
if err == nil {
return fmt.Errorf("ACL.List: expected object ACL list to fail")
}
return nil
})
if err != nil {
t.Fatal(err)
ctxWithTimeout, cancelCtx = context.WithTimeout(ctx, time.Second*10)

_, err = o.Retryer(WithErrorFunc(idempotentOrNilRetry)).ACL().List(ctxWithTimeout)
cancelCtx()
if err == nil {
t.Errorf("ACL.List: expected object ACL list to fail")
}

// Disable UniformBucketLevelAccess.
Expand All @@ -737,21 +727,15 @@ func TestIntegration_UniformBucketLevelAccess(t *testing.T) {
}

// Check that the object ACLs are the same.
var acls []ACLRule
err = retry(ctx, func() error {
acls, err = o.ACL().List(ctx)
if err != nil {
return fmt.Errorf("ACL.List: object ACL list failed: %v", err)
}
return nil
}, func() error {
if !containsACL(acls, aclEntity, RoleReader) {
return fmt.Errorf("containsACL: expected ACLs %v to include custom ACL entity %v", acls, aclEntity)
}
return nil
})
ctxWithTimeout, cancelCtx = context.WithTimeout(ctx, time.Second*10)
acls, err := o.Retryer(WithPolicy(RetryAlways)).ACL().List(ctxWithTimeout)
cancelCtx()
if err != nil {
t.Fatal(err)
t.Errorf("ACL.List: object ACL list failed: %v", err)
}

if !containsACL(acls, aclEntity, RoleReader) {
t.Errorf("containsACL: expected ACLs %v to include custom ACL entity %v", acls, aclEntity)
}
}

Expand Down Expand Up @@ -808,14 +792,19 @@ func TestIntegration_PublicAccessPrevention(t *testing.T) {

// Now, making object public or making bucket public should succeed. Run with
// retry because ACL settings may take time to propagate.
if err := retry(ctx,
func() error {
a = o.ACL()
return a.Set(ctx, AllUsers, RoleReader)
},
nil); err != nil {
idempotentOrNilRetry := func(err error) bool {
return err == nil || ShouldRetry(err)
}

ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, time.Second*10)

a = o.Retryer(WithErrorFunc(idempotentOrNilRetry)).ACL()
a.Set(ctxWithTimeout, AllUsers, RoleReader)
cancelCtx()
if err != nil {
t.Errorf("ACL.Set: making object public failed: %v", err)
}

policy, err = bkt.IAM().V3().Policy(ctx)
if err != nil {
t.Fatalf("fetching bucket IAM policy: %v", err)
Expand Down Expand Up @@ -880,17 +869,12 @@ func TestIntegration_ObjectsRangeReader(t *testing.T) {
obj := bkt.Object(objName)
contents := []byte("Hello, world this is a range request")

if err := retry(ctx, func() error {
w := obj.NewWriter(ctx)
if _, err := w.Write(contents); err != nil {
return fmt.Errorf("Failed to write contents: %v", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("Failed to close writer: %v", err)
}
return nil
}, nil); err != nil {
t.Fatal(err)
w := obj.If(Conditions{DoesNotExist: true}).NewWriter(ctx)
if _, err := w.Write(contents); err != nil {
t.Errorf("Failed to write contents: %v", err)
}
if err := w.Close(); err != nil {
t.Errorf("Failed to close writer: %v", err)
}

last5s := []struct {
Expand Down Expand Up @@ -2369,27 +2353,26 @@ func TestIntegration_ACL(t *testing.T) {
aclObjects := []string{"acl1", "acl2"}
name := aclObjects[0]
o := bkt.Object(name)
err = retry(ctx, func() error {
for _, obj := range aclObjects {
c := randomContents()
if err := writeObject(ctx, bkt.Object(obj), "", c); err != nil {
return fmt.Errorf("Write for %v failed with %v", obj, err)
}
}
acl, err = o.ACL().List(ctx)
if err != nil {
return fmt.Errorf("ACL.List: can't retrieve ACL of %v", name)
}
return nil
}, func() error {
if !hasRule(acl, rule) {
return fmt.Errorf("hasRule: object ACL missing %+v", rule)

for _, obj := range aclObjects {
c := randomContents()
if err := writeObject(ctx, bkt.Object(obj).If(Conditions{DoesNotExist: true}), "", c); err != nil {
t.Errorf("Write for %v failed with %v", obj, err)
}
return nil
})
}

retryAllErrors := func(err error) bool { return err != nil }

ctxWithTimeout, cancelCtx := context.WithTimeout(ctx, time.Second*10)
acl, err = o.Retryer(WithErrorFunc(retryAllErrors)).ACL().List(ctxWithTimeout)
cancelCtx()
if err != nil {
t.Fatal(err)
t.Errorf("ACL.List: can't retrieve ACL of %v", name)
}
if !hasRule(acl, rule) {
t.Errorf("hasRule: object ACL missing %+v", rule)
}

if err := o.ACL().Delete(ctx, entity); err != nil {
t.Errorf("object ACL: could not delete entity %s", entity)
}
Expand All @@ -2405,26 +2388,22 @@ func TestIntegration_ACL(t *testing.T) {
if err := bkt.ACL().Set(ctx, entity2, RoleReader); err != nil {
t.Errorf("Error while putting bucket ACL rule: %v", err)
}

var bACL []ACLRule
err = retry(ctx, func() error {
bACL, err = bkt.ACL().List(ctx)
if err != nil {
return fmt.Errorf("ACL.List: error while getting the ACL of the bucket: %v", err)
}
return nil
}, func() error {
if !hasRule(bACL, rule2) {
return fmt.Errorf("hasRule: bucket ACL missing %+v", rule2)
}
return nil
})
ctxWithTimeout, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

bACL, err = bkt.Retryer(WithErrorFunc(retryAllErrors)).ACL().List(ctxWithTimeout)
if err != nil {
t.Error(err)
t.Errorf("ACL.List: error while getting the ACL of the bucket: %v", err)
}
if !hasRule(bACL, rule2) {
t.Errorf("hasRule: bucket ACL missing %+v", rule2)
}

if err := bkt.ACL().Delete(ctx, entity2); err != nil {
t.Errorf("Error while deleting bucket ACL rule: %v", err)
}

}

func TestIntegration_ValidObjectNames(t *testing.T) {
Expand Down Expand Up @@ -3713,24 +3692,27 @@ func TestIntegration_DeleteObjectInBucketWithRetentionPolicy(t *testing.T) {
bkt := client.Bucket(uidSpace.New())
h.mustCreate(bkt, testutil.ProjID(), &BucketAttrs{RetentionPolicy: &RetentionPolicy{RetentionPeriod: 25 * time.Hour}})

oh := bkt.Object("some-object")
if err := writeObject(ctx, oh, "text/plain", []byte("hello world")); err != nil {
o := bkt.Object("some-object")
if err := writeObject(ctx, o, "text/plain", []byte("hello world")); err != nil {
t.Fatal(err)
}

if err := oh.Delete(ctx); err == nil {
if err := o.Delete(ctx); err == nil {
t.Fatal("expected to err deleting an object in a bucket with retention period, but got nil")
}

// Remove the retention period
h.mustUpdateBucket(bkt, BucketAttrsToUpdate{RetentionPolicy: &RetentionPolicy{}}, h.mustBucketAttrs(bkt).MetaGeneration)
// Deleting with retry, as bucket metadata changes

// Delete with retry, as bucket metadata changes
// can take some time to propagate.
err := retry(ctx, func() error {
return oh.Delete(ctx)
}, nil)
if err != nil {
h.t.Fatalf("%s: object delete: %v", loc(), err)
retry := func(err error) bool { return err != nil }
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

o = o.Retryer(WithErrorFunc(retry), WithPolicy(RetryAlways))
if err := o.Delete(ctx); err != nil {
t.Fatalf("object delete: %v", err)
}
h.mustDeleteBucket(bkt)
}
Expand Down Expand Up @@ -4087,26 +4069,19 @@ func TestIntegration_NewReaderWithContentEncodingGzip(t *testing.T) {
obj := bkt.Object("decompressive-transcoding")
original := bytes.Repeat([]byte("a"), 4<<10)

// Wrap the file upload in a retry.
// TODO: Investigate removing retry after resolving
// https://github.com/googleapis/google-api-go-client/issues/392.
err := retry(ctx, func() error {
// Firstly upload the gzip compressed file.
w := obj.NewWriter(ctx)
// Compress and upload the content.
gzw := gzip.NewWriter(w)
if _, err := gzw.Write(original); err != nil {
return fmt.Errorf("Failed to compress content: %v", err)
}
if err := gzw.Close(); err != nil {
return fmt.Errorf("Failed to compress content: %v", err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("Failed to finish uploading the file: %v", err)
}
return nil
},
nil)
// Firstly upload the gzip compressed file.
w := obj.If(Conditions{DoesNotExist: true}).NewWriter(ctx)
// Compress and upload the content.
gzw := gzip.NewWriter(w)
if _, err := gzw.Write(original); err != nil {
t.Fatalf("Failed to compress content: %v", err)
}
if err := gzw.Close(); err != nil {
t.Errorf("Failed to compress content: %v", err)
}
if err := w.Close(); err != nil {
t.Errorf("Failed to finish uploading the file: %v", err)
}

defer h.mustDeleteObject(obj)

Expand Down Expand Up @@ -4160,6 +4135,7 @@ func TestIntegration_HMACKey(t *testing.T) {
ctx := context.Background()
client := testConfig(ctx, t)
defer client.Close()
client.SetRetry(WithPolicy(RetryAlways))

projectID := testutil.ProjID()

Expand Down
1 change: 0 additions & 1 deletion storage/retry_conformance_test.go
Expand Up @@ -224,7 +224,6 @@ var methods = map[string][]retryFunc{
},
},
// Conditionally idempotent operations
// (all conditionally idempotent operations currently fail)
"storage.buckets.patch": {
func(ctx context.Context, c *Client, fs *resources, preconditions bool) error {
uattrs := BucketAttrsToUpdate{StorageClass: "ARCHIVE"}
Expand Down

0 comments on commit c220ca2

Please sign in to comment.