diff --git a/storage/acl.go b/storage/acl.go index 9ee0a3a0b89..e0ab60073c2 100644 --- a/storage/acl.go +++ b/storage/acl.go @@ -21,7 +21,6 @@ import ( "cloud.google.com/go/internal/trace" storagepb "cloud.google.com/go/storage/internal/apiv2/stubs" - "google.golang.org/api/googleapi" raw "google.golang.org/api/storage/v1" ) @@ -121,111 +120,46 @@ func (a *ACLHandle) List(ctx context.Context) (rules []ACLRule, err error) { } func (a *ACLHandle) bucketDefaultList(ctx context.Context) ([]ACLRule, error) { - var acls *raw.ObjectAccessControls - var err error - req := a.c.raw.DefaultObjectAccessControls.List(a.bucket) - a.configureCall(ctx, req) - err = run(ctx, func() error { - acls, err = req.Do() - return err - }, a.retry, true, setRetryHeaderHTTP(req)) - if err != nil { - return nil, err - } - return toObjectACLRules(acls.Items), nil + opts := makeStorageOpts(true, a.retry, a.userProject) + return a.c.tc.ListDefaultObjectACLs(ctx, a.bucket, opts...) } func (a *ACLHandle) bucketDefaultDelete(ctx context.Context, entity ACLEntity) error { - req := a.c.raw.DefaultObjectAccessControls.Delete(a.bucket, string(entity)) - a.configureCall(ctx, req) - - return run(ctx, func() error { - return req.Do() - }, a.retry, false, setRetryHeaderHTTP(req)) + opts := makeStorageOpts(false, a.retry, a.userProject) + return a.c.tc.DeleteDefaultObjectACL(ctx, a.bucket, entity, opts...) } func (a *ACLHandle) bucketList(ctx context.Context) ([]ACLRule, error) { - var acls *raw.BucketAccessControls - var err error - req := a.c.raw.BucketAccessControls.List(a.bucket) - a.configureCall(ctx, req) - err = run(ctx, func() error { - acls, err = req.Do() - return err - }, a.retry, true, setRetryHeaderHTTP(req)) - if err != nil { - return nil, err - } - return toBucketACLRules(acls.Items), nil + opts := makeStorageOpts(true, a.retry, a.userProject) + return a.c.tc.ListBucketACLs(ctx, a.bucket, opts...) } func (a *ACLHandle) bucketSet(ctx context.Context, entity ACLEntity, role ACLRole) error { - acl := &raw.BucketAccessControl{ - Bucket: a.bucket, - Entity: string(entity), - Role: string(role), - } - req := a.c.raw.BucketAccessControls.Update(a.bucket, string(entity), acl) - a.configureCall(ctx, req) - return run(ctx, func() error { - _, err := req.Do() - return err - }, a.retry, false, setRetryHeaderHTTP(req)) + opts := makeStorageOpts(false, a.retry, a.userProject) + return a.c.tc.UpdateBucketACL(ctx, a.bucket, entity, role, opts...) } func (a *ACLHandle) bucketDelete(ctx context.Context, entity ACLEntity) error { - req := a.c.raw.BucketAccessControls.Delete(a.bucket, string(entity)) - a.configureCall(ctx, req) - return run(ctx, func() error { - return req.Do() - }, a.retry, false, setRetryHeaderHTTP(req)) + opts := makeStorageOpts(false, a.retry, a.userProject) + return a.c.tc.DeleteBucketACL(ctx, a.bucket, entity, opts...) } func (a *ACLHandle) objectList(ctx context.Context) ([]ACLRule, error) { - var acls *raw.ObjectAccessControls - var err error - req := a.c.raw.ObjectAccessControls.List(a.bucket, a.object) - a.configureCall(ctx, req) - err = run(ctx, func() error { - acls, err = req.Do() - return err - }, a.retry, true, setRetryHeaderHTTP(req)) - if err != nil { - return nil, err - } - return toObjectACLRules(acls.Items), nil + opts := makeStorageOpts(true, a.retry, a.userProject) + return a.c.tc.ListObjectACLs(ctx, a.bucket, a.object, opts...) } func (a *ACLHandle) objectSet(ctx context.Context, entity ACLEntity, role ACLRole, isBucketDefault bool) error { - type setRequest interface { - Do(opts ...googleapi.CallOption) (*raw.ObjectAccessControl, error) - Header() http.Header - } - - acl := &raw.ObjectAccessControl{ - Bucket: a.bucket, - Entity: string(entity), - Role: string(role), - } - var req setRequest + opts := makeStorageOpts(false, a.retry, a.userProject) if isBucketDefault { - req = a.c.raw.DefaultObjectAccessControls.Update(a.bucket, string(entity), acl) - } else { - req = a.c.raw.ObjectAccessControls.Update(a.bucket, a.object, string(entity), acl) + return a.c.tc.UpdateDefaultObjectACL(ctx, a.bucket, entity, role, opts...) } - a.configureCall(ctx, req) - return run(ctx, func() error { - _, err := req.Do() - return err - }, a.retry, false, setRetryHeaderHTTP(req)) + return a.c.tc.UpdateObjectACL(ctx, a.bucket, a.object, entity, role, opts...) } func (a *ACLHandle) objectDelete(ctx context.Context, entity ACLEntity) error { - req := a.c.raw.ObjectAccessControls.Delete(a.bucket, a.object, string(entity)) - a.configureCall(ctx, req) - return run(ctx, func() error { - return req.Do() - }, a.retry, false, setRetryHeaderHTTP(req)) + opts := makeStorageOpts(false, a.retry, a.userProject) + return a.c.tc.DeleteObjectACL(ctx, a.bucket, a.object, entity, opts...) } func (a *ACLHandle) configureCall(ctx context.Context, call interface{ Header() http.Header }) { diff --git a/storage/bucket.go b/storage/bucket.go index 84582bb8773..87789f3bc29 100644 --- a/storage/bucket.go +++ b/storage/bucket.go @@ -20,7 +20,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "reflect" "time" @@ -83,27 +82,11 @@ func (b *BucketHandle) Create(ctx context.Context, projectID string, attrs *Buck ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.Create") defer func() { trace.EndSpan(ctx, err) }() - var bkt *raw.Bucket - if attrs != nil { - bkt = attrs.toRawBucket() - } else { - bkt = &raw.Bucket{} - } - bkt.Name = b.name - // If there is lifecycle information but no location, explicitly set - // the location. This is a GCS quirk/bug. - if bkt.Location == "" && bkt.Lifecycle != nil { - bkt.Location = "US" - } - req := b.c.raw.Buckets.Insert(projectID, bkt) - setClientHeader(req.Header()) - if attrs != nil && attrs.PredefinedACL != "" { - req.PredefinedAcl(attrs.PredefinedACL) - } - if attrs != nil && attrs.PredefinedDefaultObjectACL != "" { - req.PredefinedDefaultObjectAcl(attrs.PredefinedDefaultObjectACL) + o := makeStorageOpts(true, b.retry, b.userProject) + if _, err := b.c.tc.CreateBucket(ctx, projectID, b.name, attrs, o...); err != nil { + return err } - return run(ctx, func() error { _, err := req.Context(ctx).Do(); return err }, b.retry, true, setRetryHeaderHTTP(req)) + return nil } // Delete deletes the Bucket. @@ -111,24 +94,8 @@ func (b *BucketHandle) Delete(ctx context.Context) (err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.Delete") defer func() { trace.EndSpan(ctx, err) }() - req, err := b.newDeleteCall() - if err != nil { - return err - } - - return run(ctx, func() error { return req.Context(ctx).Do() }, b.retry, true, setRetryHeaderHTTP(req)) -} - -func (b *BucketHandle) newDeleteCall() (*raw.BucketsDeleteCall, error) { - req := b.c.raw.Buckets.Delete(b.name) - setClientHeader(req.Header()) - if err := applyBucketConds("BucketHandle.Delete", b.conds, req); err != nil { - return nil, err - } - if b.userProject != "" { - req.UserProject(b.userProject) - } - return req, nil + o := makeStorageOpts(true, b.retry, b.userProject) + return b.c.tc.DeleteBucket(ctx, b.name, b.conds, o...) } // ACL returns an ACLHandle, which provides access to the bucket's access control list. @@ -177,35 +144,8 @@ func (b *BucketHandle) Attrs(ctx context.Context) (attrs *BucketAttrs, err error ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.Attrs") defer func() { trace.EndSpan(ctx, err) }() - req, err := b.newGetCall() - if err != nil { - return nil, err - } - var resp *raw.Bucket - err = run(ctx, func() error { - resp, err = req.Context(ctx).Do() - return err - }, b.retry, true, setRetryHeaderHTTP(req)) - var e *googleapi.Error - if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { - return nil, ErrBucketNotExist - } - if err != nil { - return nil, err - } - return newBucket(resp) -} - -func (b *BucketHandle) newGetCall() (*raw.BucketsGetCall, error) { - req := b.c.raw.Buckets.Get(b.name).Projection("full") - setClientHeader(req.Header()) - if err := applyBucketConds("BucketHandle.Attrs", b.conds, req); err != nil { - return nil, err - } - if b.userProject != "" { - req.UserProject(b.userProject) - } - return req, nil + o := makeStorageOpts(true, b.retry, b.userProject) + return b.c.tc.GetBucket(ctx, b.name, b.conds, o...) } // Update updates a bucket's attributes. @@ -213,43 +153,9 @@ func (b *BucketHandle) Update(ctx context.Context, uattrs BucketAttrsToUpdate) ( ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.Create") defer func() { trace.EndSpan(ctx, err) }() - req, err := b.newPatchCall(&uattrs) - if err != nil { - return nil, err - } - if uattrs.PredefinedACL != "" { - req.PredefinedAcl(uattrs.PredefinedACL) - } - if uattrs.PredefinedDefaultObjectACL != "" { - req.PredefinedDefaultObjectAcl(uattrs.PredefinedDefaultObjectACL) - } - isIdempotent := b.conds != nil && b.conds.MetagenerationMatch != 0 - - var rawBucket *raw.Bucket - call := func() error { - rb, err := req.Context(ctx).Do() - rawBucket = rb - return err - } - - if err := run(ctx, call, b.retry, isIdempotent, setRetryHeaderHTTP(req)); err != nil { - return nil, err - } - return newBucket(rawBucket) -} - -func (b *BucketHandle) newPatchCall(uattrs *BucketAttrsToUpdate) (*raw.BucketsPatchCall, error) { - rb := uattrs.toRawBucket() - req := b.c.raw.Buckets.Patch(b.name, rb).Projection("full") - setClientHeader(req.Header()) - if err := applyBucketConds("BucketHandle.Update", b.conds, req); err != nil { - return nil, err - } - if b.userProject != "" { - req.UserProject(b.userProject) - } - return req, nil + o := makeStorageOpts(isIdempotent, b.retry, b.userProject) + return b.c.tc.UpdateBucket(ctx, b.name, &uattrs, b.conds, o...) } // SignedURL returns a URL for the specified object. Signed URLs allow anyone @@ -1366,15 +1272,8 @@ func (b *BucketHandle) UserProject(projectID string) *BucketHandle { // most customers. It might be changed in backwards-incompatible ways and is not // subject to any SLA or deprecation policy. func (b *BucketHandle) LockRetentionPolicy(ctx context.Context) error { - var metageneration int64 - if b.conds != nil { - metageneration = b.conds.MetagenerationMatch - } - req := b.c.raw.Buckets.LockRetentionPolicy(b.name, metageneration) - return run(ctx, func() error { - _, err := req.Context(ctx).Do() - return err - }, b.retry, true, setRetryHeaderHTTP(req)) + o := makeStorageOpts(true, b.retry, b.userProject) + return b.c.tc.LockBucketRetentionPolicy(ctx, b.name, b.conds, o...) } // applyBucketConds modifies the provided call using the conditions in conds. @@ -1990,18 +1889,8 @@ func customPlacementFromProto(c *storagepb.Bucket_CustomPlacementConfig) *Custom // // Note: The returned iterator is not safe for concurrent operations without explicit synchronization. func (b *BucketHandle) Objects(ctx context.Context, q *Query) *ObjectIterator { - it := &ObjectIterator{ - ctx: ctx, - bucket: b, - } - it.pageInfo, it.nextFunc = iterator.NewPageInfo( - it.fetch, - func() int { return len(it.items) }, - func() interface{} { b := it.items; it.items = nil; return b }) - if q != nil { - it.query = *q - } - return it + o := makeStorageOpts(true, b.retry, b.userProject) + return b.c.tc.ListObjects(ctx, b.name, q, o...) } // Retryer returns a bucket handle that is configured with custom retry @@ -2036,7 +1925,6 @@ func (b *BucketHandle) Retryer(opts ...RetryOption) *BucketHandle { // Note: This iterator is not safe for concurrent operations without explicit synchronization. type ObjectIterator struct { ctx context.Context - bucket *BucketHandle query Query pageInfo *iterator.PageInfo nextFunc func() error @@ -2073,52 +1961,6 @@ func (it *ObjectIterator) Next() (*ObjectAttrs, error) { return item, nil } -func (it *ObjectIterator) fetch(pageSize int, pageToken string) (string, error) { - req := it.bucket.c.raw.Objects.List(it.bucket.name) - setClientHeader(req.Header()) - projection := it.query.Projection - if projection == ProjectionDefault { - projection = ProjectionFull - } - req.Projection(projection.String()) - req.Delimiter(it.query.Delimiter) - req.Prefix(it.query.Prefix) - req.StartOffset(it.query.StartOffset) - req.EndOffset(it.query.EndOffset) - req.Versions(it.query.Versions) - req.IncludeTrailingDelimiter(it.query.IncludeTrailingDelimiter) - if len(it.query.fieldSelection) > 0 { - req.Fields("nextPageToken", googleapi.Field(it.query.fieldSelection)) - } - req.PageToken(pageToken) - if it.bucket.userProject != "" { - req.UserProject(it.bucket.userProject) - } - if pageSize > 0 { - req.MaxResults(int64(pageSize)) - } - var resp *raw.Objects - var err error - err = run(it.ctx, func() error { - resp, err = req.Context(it.ctx).Do() - return err - }, it.bucket.retry, true, setRetryHeaderHTTP(req)) - if err != nil { - var e *googleapi.Error - if ok := errors.As(err, &e); ok && e.Code == http.StatusNotFound { - err = ErrBucketNotExist - } - return "", err - } - for _, item := range resp.Items { - it.items = append(it.items, newObject(item)) - } - for _, prefix := range resp.Prefixes { - it.items = append(it.items, &ObjectAttrs{Prefix: prefix}) - } - return resp.NextPageToken, nil -} - // Buckets returns an iterator over the buckets in the project. You may // optionally set the iterator's Prefix field to restrict the list to buckets // whose names begin with the prefix. By default, all buckets in the project @@ -2126,17 +1968,8 @@ func (it *ObjectIterator) fetch(pageSize int, pageToken string) (string, error) // // Note: The returned iterator is not safe for concurrent operations without explicit synchronization. func (c *Client) Buckets(ctx context.Context, projectID string) *BucketIterator { - it := &BucketIterator{ - ctx: ctx, - client: c, - projectID: projectID, - } - it.pageInfo, it.nextFunc = iterator.NewPageInfo( - it.fetch, - func() int { return len(it.buckets) }, - func() interface{} { b := it.buckets; it.buckets = nil; return b }) - - return it + o := makeStorageOpts(true, c.retry, "") + return c.tc.ListBuckets(ctx, projectID, o...) } // A BucketIterator is an iterator over BucketAttrs. @@ -2147,7 +1980,6 @@ type BucketIterator struct { Prefix string ctx context.Context - client *Client projectID string buckets []*BucketAttrs pageInfo *iterator.PageInfo @@ -2173,36 +2005,6 @@ func (it *BucketIterator) Next() (*BucketAttrs, error) { // Note: This method is not safe for concurrent operations without explicit synchronization. func (it *BucketIterator) PageInfo() *iterator.PageInfo { return it.pageInfo } -// TODO: When the transport-agnostic client interface is integrated into the Veneer, -// this method should be removed, and the iterator should be initialized by the -// transport-specific client implementations. -func (it *BucketIterator) fetch(pageSize int, pageToken string) (token string, err error) { - req := it.client.raw.Buckets.List(it.projectID) - setClientHeader(req.Header()) - req.Projection("full") - req.Prefix(it.Prefix) - req.PageToken(pageToken) - if pageSize > 0 { - req.MaxResults(int64(pageSize)) - } - var resp *raw.Buckets - err = run(it.ctx, func() error { - resp, err = req.Context(it.ctx).Do() - return err - }, it.client.retry, true, setRetryHeaderHTTP(req)) - if err != nil { - return "", err - } - for _, item := range resp.Items { - b, err := newBucket(item) - if err != nil { - return "", err - } - it.buckets = append(it.buckets, b) - } - return resp.NextPageToken, nil -} - // RPO (Recovery Point Objective) configures the turbo replication feature. See // https://cloud.google.com/storage/docs/managing-turbo-replication for more information. type RPO int diff --git a/storage/bucket_test.go b/storage/bucket_test.go index 1eaa2f5ecf6..90bb1b25e4c 100644 --- a/storage/bucket_test.go +++ b/storage/bucket_test.go @@ -15,9 +15,6 @@ package storage import ( - "context" - "net/http" - "reflect" "testing" "time" @@ -578,98 +575,6 @@ func TestAgeConditionBackwardCompat(t *testing.T) { } -func TestCallBuilders(t *testing.T) { - rc, err := raw.NewService(context.Background()) - if err != nil { - t.Fatal(err) - } - c := &Client{raw: rc} - const metagen = 17 - - b := c.Bucket("name") - bm := b.If(BucketConditions{MetagenerationMatch: metagen}).UserProject("p") - - equal := func(x, y interface{}) bool { - return testutil.Equal(x, y, - cmp.AllowUnexported( - raw.BucketsGetCall{}, - raw.BucketsDeleteCall{}, - raw.BucketsPatchCall{}, - ), - cmp.FilterPath(func(p cmp.Path) bool { - return p[len(p)-1].Type() == reflect.TypeOf(&raw.Service{}) - }, cmp.Ignore()), - ) - } - - for i, test := range []struct { - callFunc func(*BucketHandle) (interface{}, error) - want interface { - Header() http.Header - } - metagenFunc func(interface{}) - }{ - { - func(b *BucketHandle) (interface{}, error) { return b.newGetCall() }, - rc.Buckets.Get("name").Projection("full"), - func(req interface{}) { req.(*raw.BucketsGetCall).IfMetagenerationMatch(metagen).UserProject("p") }, - }, - { - func(b *BucketHandle) (interface{}, error) { return b.newDeleteCall() }, - rc.Buckets.Delete("name"), - func(req interface{}) { req.(*raw.BucketsDeleteCall).IfMetagenerationMatch(metagen).UserProject("p") }, - }, - { - func(b *BucketHandle) (interface{}, error) { - return b.newPatchCall(&BucketAttrsToUpdate{ - VersioningEnabled: false, - RequesterPays: false, - }) - }, - rc.Buckets.Patch("name", &raw.Bucket{ - Versioning: &raw.BucketVersioning{ - Enabled: false, - ForceSendFields: []string{"Enabled"}, - }, - Billing: &raw.BucketBilling{ - RequesterPays: false, - ForceSendFields: []string{"RequesterPays"}, - }, - }).Projection("full"), - func(req interface{}) { req.(*raw.BucketsPatchCall).IfMetagenerationMatch(metagen).UserProject("p") }, - }, - } { - got, err := test.callFunc(b) - if err != nil { - t.Fatal(err) - } - setClientHeader(test.want.Header()) - if !equal(got, test.want) { - t.Errorf("#%d: got %#v, want %#v", i, got, test.want) - } - got, err = test.callFunc(bm) - if err != nil { - t.Fatal(err) - } - test.metagenFunc(test.want) - if !equal(got, test.want) { - t.Errorf("#%d:\ngot %#v\nwant %#v", i, got, test.want) - } - } - - // Error. - bm = b.If(BucketConditions{MetagenerationMatch: 1, MetagenerationNotMatch: 2}) - if _, err := bm.newGetCall(); err == nil { - t.Errorf("got nil, want error") - } - if _, err := bm.newDeleteCall(); err == nil { - t.Errorf("got nil, want error") - } - if _, err := bm.newPatchCall(&BucketAttrsToUpdate{}); err == nil { - t.Errorf("got nil, want error") - } -} - func TestNewBucket(t *testing.T) { labels := map[string]string{"a": "b"} matchClasses := []string{"STANDARD"} diff --git a/storage/client.go b/storage/client.go index ef443b75585..40eb576ce67 100644 --- a/storage/client.go +++ b/storage/client.go @@ -44,7 +44,7 @@ type storageClient interface { // Top-level methods. GetServiceAccount(ctx context.Context, project string, opts ...storageOption) (string, error) - CreateBucket(ctx context.Context, project string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) + CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) ListBuckets(ctx context.Context, project string, opts ...storageOption) *BucketIterator Close() error @@ -162,6 +162,20 @@ func callSettings(defaults *settings, opts ...storageOption) *settings { return &cs } +// makeStorageOpts is a helper for generating a set of storageOption based on +// idempotency, retryConfig, and userProject. All top-level client operations +// will generally have to pass these options through the interface. +func makeStorageOpts(isIdempotent bool, retry *retryConfig, userProject string) []storageOption { + opts := []storageOption{idempotent(isIdempotent)} + if retry != nil { + opts = append(opts, withRetryConfig(retry)) + } + if userProject != "" { + opts = append(opts, withUserProject(userProject)) + } + return opts +} + // storageOption is the transport-agnostic call option for the storageClient // interface. type storageOption interface { @@ -267,13 +281,14 @@ type openWriterParams struct { } type newRangeReaderParams struct { - bucket string - conds *Conditions - encryptionKey []byte - gen int64 - length int64 - object string - offset int64 + bucket string + conds *Conditions + encryptionKey []byte + gen int64 + length int64 + object string + offset int64 + readCompressed bool // Use accept-encoding: gzip. Only works for HTTP currently. } type composeObjectRequest struct { @@ -281,7 +296,6 @@ type composeObjectRequest struct { dstObject destinationObject srcs []sourceObject predefinedACL string - encryptionKey []byte sendCRC32C bool } @@ -313,5 +327,6 @@ type rewriteObjectResponse struct { resource *ObjectAttrs done bool written int64 + size int64 token string } diff --git a/storage/client_test.go b/storage/client_test.go index a32d5b9ef00..052ae0bea6b 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -39,7 +39,7 @@ func TestCreateBucketEmulated(t *testing.T) { LogBucket: bucket, }, } - got, err := client.CreateBucket(context.Background(), project, want) + got, err := client.CreateBucket(context.Background(), project, want.Name, want) if err != nil { t.Fatal(err) } @@ -62,7 +62,7 @@ func TestDeleteBucketEmulated(t *testing.T) { Name: bucket, } // Create the bucket that will be deleted. - _, err := client.CreateBucket(context.Background(), project, b) + _, err := client.CreateBucket(context.Background(), project, b.Name, b) if err != nil { t.Fatalf("client.CreateBucket: %v", err) } @@ -80,7 +80,7 @@ func TestGetBucketEmulated(t *testing.T) { Name: bucket, } // Create the bucket that will be retrieved. - _, err := client.CreateBucket(context.Background(), project, want) + _, err := client.CreateBucket(context.Background(), project, want.Name, want) if err != nil { t.Fatalf("client.CreateBucket: %v", err) } @@ -100,7 +100,7 @@ func TestUpdateBucketEmulated(t *testing.T) { Name: bucket, } // Create the bucket that will be updated. - _, err := client.CreateBucket(context.Background(), project, bkt) + _, err := client.CreateBucket(context.Background(), project, bkt.Name, bkt) if err != nil { t.Fatalf("client.CreateBucket: %v", err) } @@ -191,7 +191,7 @@ func TestGetServiceAccountEmulated(t *testing.T) { func TestGetSetTestIamPolicyEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { - battrs, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + battrs, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -222,7 +222,7 @@ func TestGetSetTestIamPolicyEmulated(t *testing.T) { func TestDeleteObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object that will be deleted. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -249,7 +249,7 @@ func TestDeleteObjectEmulated(t *testing.T) { func TestGetObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -279,7 +279,7 @@ func TestGetObjectEmulated(t *testing.T) { func TestRewriteObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -324,7 +324,7 @@ func TestRewriteObjectEmulated(t *testing.T) { func TestUpdateObjectEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -391,7 +391,7 @@ func TestUpdateObjectEmulated(t *testing.T) { func TestListObjectsEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test data. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -449,7 +449,7 @@ func TestListObjectsEmulated(t *testing.T) { func TestListObjectsWithPrefixEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test data. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -515,7 +515,7 @@ func TestListBucketsEmulated(t *testing.T) { } // Create the buckets that will be listed. for _, b := range want { - _, err := client.CreateBucket(context.Background(), project, b) + _, err := client.CreateBucket(context.Background(), project, b.Name, b) if err != nil { t.Fatalf("client.CreateBucket: %v", err) } @@ -556,7 +556,7 @@ func TestListBucketACLsEmulated(t *testing.T) { PredefinedACL: "publicRead", } // Create the bucket that will be retrieved. - if _, err := client.CreateBucket(ctx, project, attrs); err != nil { + if _, err := client.CreateBucket(ctx, project, attrs.Name, attrs); err != nil { t.Fatalf("client.CreateBucket: %v", err) } @@ -578,7 +578,7 @@ func TestUpdateBucketACLEmulated(t *testing.T) { PredefinedACL: "authenticatedRead", } // Create the bucket that will be retrieved. - if _, err := client.CreateBucket(ctx, project, attrs); err != nil { + if _, err := client.CreateBucket(ctx, project, attrs.Name, attrs); err != nil { t.Fatalf("client.CreateBucket: %v", err) } var listAcls []ACLRule @@ -614,7 +614,7 @@ func TestDeleteBucketACLEmulated(t *testing.T) { PredefinedACL: "publicRead", } // Create the bucket that will be retrieved. - if _, err := client.CreateBucket(ctx, project, attrs); err != nil { + if _, err := client.CreateBucket(ctx, project, attrs.Name, attrs); err != nil { t.Fatalf("client.CreateBucket: %v", err) } // Assert bucket has two BucketACL entities, including project owner and predefinedACL. @@ -648,7 +648,7 @@ func TestDefaultObjectACLCRUDEmulated(t *testing.T) { PredefinedDefaultObjectACL: "publicRead", } // Create the bucket that will be retrieved. - if _, err := client.CreateBucket(ctx, project, attrs); err != nil { + if _, err := client.CreateBucket(ctx, project, attrs.Name, attrs); err != nil { t.Fatalf("client.CreateBucket: %v", err) } // Assert bucket has 2 DefaultObjectACL entities, including project owner and PredefinedDefaultObjectACL. @@ -692,7 +692,7 @@ func TestObjectACLCRUDEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. ctx := context.Background() - _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + _, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -746,7 +746,7 @@ func TestObjectACLCRUDEmulated(t *testing.T) { func TestOpenReaderEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test data. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -791,7 +791,7 @@ func TestOpenReaderEmulated(t *testing.T) { func TestOpenWriterEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test data. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -854,7 +854,7 @@ func TestListNotificationsEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. ctx := context.Background() - _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + _, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -882,7 +882,7 @@ func TestCreateNotificationEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. ctx := context.Background() - _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + _, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -908,7 +908,7 @@ func TestDeleteNotificationEmulated(t *testing.T) { transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) { // Populate test object. ctx := context.Background() - _, err := client.CreateBucket(ctx, project, &BucketAttrs{ + _, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { @@ -982,7 +982,7 @@ func TestLockBucketRetentionPolicyEmulated(t *testing.T) { }, } // Create the bucket that will be locked. - _, err := client.CreateBucket(context.Background(), project, b) + _, err := client.CreateBucket(context.Background(), project, b.Name, b) if err != nil { t.Fatalf("client.CreateBucket: %v", err) } @@ -1006,7 +1006,7 @@ func TestComposeEmulated(t *testing.T) { ctx := context.Background() // Populate test data. - _, err := client.CreateBucket(context.Background(), project, &BucketAttrs{ + _, err := client.CreateBucket(context.Background(), project, bucket, &BucketAttrs{ Name: bucket, }) if err != nil { diff --git a/storage/copy.go b/storage/copy.go index 88e1daefd02..60ed81391e6 100644 --- a/storage/copy.go +++ b/storage/copy.go @@ -20,7 +20,6 @@ import ( "fmt" "cloud.google.com/go/internal/trace" - raw "google.golang.org/api/storage/v1" ) // CopierFrom creates a Copier that can copy src to dst. @@ -86,69 +85,57 @@ func (c *Copier) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { if c.DestinationKMSKeyName != "" && c.dst.encryptionKey != nil { return nil, errors.New("storage: cannot use DestinationKMSKeyName with a customer-supplied encryption key") } + if c.dst.gen != defaultGen { + return nil, fmt.Errorf("storage: generation cannot be specified on copy destination, got %v", c.dst.gen) + } // Convert destination attributes to raw form, omitting the bucket. // If the bucket is included but name or content-type aren't, the service // returns a 400 with "Required" as the only message. Omitting the bucket // does not cause any problems. - rawObject := c.ObjectAttrs.toRawObject("") + req := &rewriteObjectRequest{ + srcObject: sourceObject{ + name: c.src.object, + bucket: c.src.bucket, + gen: c.src.gen, + conds: c.src.conds, + encryptionKey: c.src.encryptionKey, + }, + dstObject: destinationObject{ + name: c.dst.object, + bucket: c.dst.bucket, + conds: c.dst.conds, + attrs: &c.ObjectAttrs, + encryptionKey: c.dst.encryptionKey, + keyName: c.DestinationKMSKeyName, + }, + predefinedACL: c.PredefinedACL, + token: c.RewriteToken, + } + + isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) + var userProject string + if c.dst.userProject != "" { + userProject = c.dst.userProject + } else if c.src.userProject != "" { + userProject = c.src.userProject + } + opts := makeStorageOpts(isIdempotent, c.dst.retry, userProject) + for { - res, err := c.callRewrite(ctx, rawObject) + res, err := c.dst.c.tc.RewriteObject(ctx, req, opts...) if err != nil { return nil, err } + c.RewriteToken = res.token if c.ProgressFunc != nil { - c.ProgressFunc(uint64(res.TotalBytesRewritten), uint64(res.ObjectSize)) + c.ProgressFunc(uint64(res.written), uint64(res.size)) } - if res.Done { // Finished successfully. - return newObject(res.Resource), nil + if res.done { // Finished successfully. + return res.resource, nil } } } -func (c *Copier) callRewrite(ctx context.Context, rawObj *raw.Object) (*raw.RewriteResponse, error) { - call := c.dst.c.raw.Objects.Rewrite(c.src.bucket, c.src.object, c.dst.bucket, c.dst.object, rawObj) - - call.Context(ctx).Projection("full") - if c.RewriteToken != "" { - call.RewriteToken(c.RewriteToken) - } - if c.DestinationKMSKeyName != "" { - call.DestinationKmsKeyName(c.DestinationKMSKeyName) - } - if c.PredefinedACL != "" { - call.DestinationPredefinedAcl(c.PredefinedACL) - } - if err := applyConds("Copy destination", c.dst.gen, c.dst.conds, call); err != nil { - return nil, err - } - if c.dst.userProject != "" { - call.UserProject(c.dst.userProject) - } else if c.src.userProject != "" { - call.UserProject(c.src.userProject) - } - if err := applySourceConds(c.src.gen, c.src.conds, call); err != nil { - return nil, err - } - if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { - return nil, err - } - if err := setEncryptionHeaders(call.Header(), c.src.encryptionKey, true); err != nil { - return nil, err - } - var res *raw.RewriteResponse - var err error - setClientHeader(call.Header()) - - retryCall := func() error { res, err = call.Do(); return err } - isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) - - if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil { - return nil, err - } - c.RewriteToken = res.RewriteToken - return res, nil -} - // ComposerFrom creates a Composer that can compose srcs into dst. // You can immediately call Run on the returned Composer, or you can // configure it first. @@ -188,6 +175,9 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { if err := c.dst.validate(); err != nil { return nil, err } + if c.dst.gen != defaultGen { + return nil, fmt.Errorf("storage: generation cannot be specified on compose destination, got %v", c.dst.gen) + } if len(c.srcs) == 0 { return nil, errors.New("storage: at least one source object must be specified") } @@ -204,45 +194,29 @@ func (c *Composer) Run(ctx context.Context) (attrs *ObjectAttrs, err error) { } } - // TODO: transport agnostic interface starts here. - req := &raw.ComposeRequest{} - // Compose requires a non-empty Destination, so we always set it, - // even if the caller-provided ObjectAttrs is the zero value. - req.Destination = c.ObjectAttrs.toRawObject(c.dst.bucket) - if c.SendCRC32C { - req.Destination.Crc32c = encodeUint32(c.ObjectAttrs.CRC32C) + req := &composeObjectRequest{ + dstBucket: c.dst.bucket, + predefinedACL: c.PredefinedACL, + sendCRC32C: c.SendCRC32C, + } + req.dstObject = destinationObject{ + name: c.dst.object, + bucket: c.dst.bucket, + conds: c.dst.conds, + attrs: &c.ObjectAttrs, + encryptionKey: c.dst.encryptionKey, } for _, src := range c.srcs { - srcObj := &raw.ComposeRequestSourceObjects{ - Name: src.object, - } - if err := applyConds("ComposeFrom source", src.gen, src.conds, composeSourceObj{srcObj}); err != nil { - return nil, err + s := sourceObject{ + name: src.object, + bucket: src.bucket, + gen: src.gen, + conds: src.conds, } - req.SourceObjects = append(req.SourceObjects, srcObj) + req.srcs = append(req.srcs, s) } - call := c.dst.c.raw.Objects.Compose(c.dst.bucket, c.dst.object, req).Context(ctx) - if err := applyConds("ComposeFrom destination", c.dst.gen, c.dst.conds, call); err != nil { - return nil, err - } - if c.dst.userProject != "" { - call.UserProject(c.dst.userProject) - } - if c.PredefinedACL != "" { - call.DestinationPredefinedAcl(c.PredefinedACL) - } - if err := setEncryptionHeaders(call.Header(), c.dst.encryptionKey, false); err != nil { - return nil, err - } - var obj *raw.Object - setClientHeader(call.Header()) - - retryCall := func() error { obj, err = call.Do(); return err } isIdempotent := c.dst.conds != nil && (c.dst.conds.GenerationMatch != 0 || c.dst.conds.DoesNotExist) - - if err := run(ctx, retryCall, c.dst.retry, isIdempotent, setRetryHeaderHTTP(call)); err != nil { - return nil, err - } - return newObject(obj), nil + opts := makeStorageOpts(isIdempotent, c.dst.retry, c.dst.userProject) + return c.dst.c.tc.ComposeObject(ctx, req, opts...) } diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 110baebb86f..8a08e35ec2b 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -19,6 +19,7 @@ import ( "encoding/base64" "fmt" "io" + "net/url" "os" "cloud.google.com/go/internal/trace" @@ -44,6 +45,12 @@ const ( // This is only used for the gRPC client. defaultConnPoolSize = 4 + // maxPerMessageWriteSize is the maximum amount of content that can be sent + // per WriteObjectRequest message. A buffer reaching this amount will + // precipitate a flush of the buffer. It is only used by the gRPC Writer + // implementation. + maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES) + // globalProjectAlias is the project ID alias used for global buckets. // // This is only used for the gRPC API. @@ -133,10 +140,10 @@ func (c *grpcStorageClient) GetServiceAccount(ctx context.Context, project strin return resp.EmailAddress, err } -func (c *grpcStorageClient) CreateBucket(ctx context.Context, project string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) { +func (c *grpcStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) b := attrs.toProtoBucket() - + b.Name = bucket // If there is lifecycle information but no location, explicitly set // the location. This is a GCS quirk/bug. if b.GetLocation() == "" && b.GetLifecycle() != nil { @@ -727,7 +734,7 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec dstObjPb := req.dstObject.attrs.toProtoObject(req.dstBucket) dstObjPb.Name = req.dstObject.name - if err := applyCondsProto("ComposeObject destination", -1, req.dstObject.conds, dstObjPb); err != nil { + if err := applyCondsProto("ComposeObject destination", defaultGen, req.dstObject.conds, dstObjPb); err != nil { return nil, err } if req.sendCRC32C { @@ -750,8 +757,8 @@ func (c *grpcStorageClient) ComposeObject(ctx context.Context, req *composeObjec if req.predefinedACL != "" { rawReq.DestinationPredefinedAcl = req.predefinedACL } - if req.encryptionKey != nil { - rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.encryptionKey) + if req.dstObject.encryptionKey != nil { + rawReq.CommonObjectRequestParams = toProtoCommonObjectRequestParams(req.dstObject.encryptionKey) } var obj *storagepb.Object @@ -811,6 +818,7 @@ func (c *grpcStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec r := &rewriteObjectResponse{ done: res.GetDone(), written: res.GetTotalBytesRewritten(), + size: res.GetObjectSize(), token: res.GetRewriteToken(), resource: newObjectFromProto(res.GetResource()), } @@ -822,14 +830,12 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.NewRangeReader") defer func() { trace.EndSpan(ctx, err) }() - if params.conds != nil { - if err := params.conds.validate("grpcStorageClient.NewRangeReader"); err != nil { - return nil, err - } - } - s := callSettings(c.settings, opts...) + if s.userProject != "" { + ctx = setUserProjectMetadata(ctx, s.userProject) + } + // A negative length means "read to the end of the object", but the // read_limit field it corresponds to uses zero to mean the same thing. Thus // we coerce the length to 0 to read to the end of the object. @@ -1513,7 +1519,8 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st // The first message on the WriteObject stream must either be the // Object or the Resumable Upload ID. if first { - w.stream, err = w.c.raw.WriteObject(w.ctx) + ctx := gapic.InsertMetadata(w.ctx, metadata.Pairs("x-goog-request-params", "bucket="+url.QueryEscape(w.bucket))) + w.stream, err = w.c.raw.WriteObject(ctx) if err != nil { return nil, 0, false, err } @@ -1646,8 +1653,8 @@ func (w *gRPCWriter) writeObjectSpec() (*storagepb.WriteObjectSpec, error) { spec := &storagepb.WriteObjectSpec{ Resource: attrs.toProtoObject(w.bucket), } - // WriteObject doesn't support the generation condition, so use -1. - if err := applyCondsProto("WriteObject", -1, w.conds, spec); err != nil { + // WriteObject doesn't support the generation condition, so use default. + if err := applyCondsProto("WriteObject", defaultGen, w.conds, spec); err != nil { return nil, err } return spec, nil diff --git a/storage/hmac.go b/storage/hmac.go index ca80d716f01..d21fba141c0 100644 --- a/storage/hmac.go +++ b/storage/hmac.go @@ -91,7 +91,7 @@ type HMACKeyHandle struct { projectID string accessID string retry *retryConfig - raw *raw.ProjectsHmacKeysService + tc storageClient } // HMACKeyHandle creates a handle that will be used for HMACKey operations. @@ -102,7 +102,7 @@ func (c *Client) HMACKeyHandle(projectID, accessID string) *HMACKeyHandle { projectID: projectID, accessID: accessID, retry: c.retry, - raw: raw.NewProjectsHmacKeysService(c.raw), + tc: c.tc, } } @@ -114,32 +114,15 @@ func (c *Client) HMACKeyHandle(projectID, accessID string) *HMACKeyHandle { // // This method is EXPERIMENTAL and subject to change or removal without notice. func (hkh *HMACKeyHandle) Get(ctx context.Context, opts ...HMACKeyOption) (*HMACKey, error) { - call := hkh.raw.Get(hkh.projectID, hkh.accessID) - desc := new(hmacKeyDesc) for _, opt := range opts { opt.withHMACKeyDesc(desc) } - if desc.userProjectID != "" { - call = call.UserProject(desc.userProjectID) - } - - setClientHeader(call.Header()) - var metadata *raw.HmacKeyMetadata - var err error - err = run(ctx, func() error { - metadata, err = call.Context(ctx).Do() - return err - }, hkh.retry, true, setRetryHeaderHTTP(call)) - if err != nil { - return nil, err - } + o := makeStorageOpts(true, hkh.retry, desc.userProjectID) + hk, err := hkh.tc.GetHMACKey(ctx, hkh.projectID, hkh.accessID, o...) - hk := &raw.HmacKey{ - Metadata: metadata, - } - return toHMACKeyFromRaw(hk, false) + return hk, err } // Delete invokes an RPC to delete the key referenced by accessID, on Google Cloud Storage. @@ -148,19 +131,13 @@ func (hkh *HMACKeyHandle) Get(ctx context.Context, opts ...HMACKeyOption) (*HMAC // // This method is EXPERIMENTAL and subject to change or removal without notice. func (hkh *HMACKeyHandle) Delete(ctx context.Context, opts ...HMACKeyOption) error { - delCall := hkh.raw.Delete(hkh.projectID, hkh.accessID) desc := new(hmacKeyDesc) for _, opt := range opts { opt.withHMACKeyDesc(desc) } - if desc.userProjectID != "" { - delCall = delCall.UserProject(desc.userProjectID) - } - setClientHeader(delCall.Header()) - return run(ctx, func() error { - return delCall.Context(ctx).Do() - }, hkh.retry, true, setRetryHeaderHTTP(delCall)) + o := makeStorageOpts(true, hkh.retry, desc.userProjectID) + return hkh.tc.DeleteHMACKey(ctx, hkh.projectID, hkh.accessID, o...) } func toHMACKeyFromRaw(hk *raw.HmacKey, updatedTimeCanBeNil bool) (*HMACKey, error) { @@ -220,29 +197,14 @@ func (c *Client) CreateHMACKey(ctx context.Context, projectID, serviceAccountEma return nil, errors.New("storage: expecting a non-blank service account email") } - svc := raw.NewProjectsHmacKeysService(c.raw) - call := svc.Create(projectID, serviceAccountEmail) desc := new(hmacKeyDesc) for _, opt := range opts { opt.withHMACKeyDesc(desc) } - if desc.userProjectID != "" { - call = call.UserProject(desc.userProjectID) - } - - setClientHeader(call.Header()) - var hk *raw.HmacKey - - if err := run(ctx, func() error { - h, err := call.Context(ctx).Do() - hk = h - return err - }, c.retry, false, setRetryHeaderHTTP(call)); err != nil { - return nil, err - } - - return toHMACKeyFromRaw(hk, true) + o := makeStorageOpts(false, c.retry, desc.userProjectID) + hk, err := c.tc.CreateHMACKey(ctx, projectID, serviceAccountEmail, o...) + return hk, err } // HMACKeyAttrsToUpdate defines the attributes of an HMACKey that will be updated. @@ -264,35 +226,15 @@ func (h *HMACKeyHandle) Update(ctx context.Context, au HMACKeyAttrsToUpdate, opt return nil, fmt.Errorf("storage: invalid state %q for update, must be either %q or %q", au.State, Active, Inactive) } - call := h.raw.Update(h.projectID, h.accessID, &raw.HmacKeyMetadata{ - Etag: au.Etag, - State: string(au.State), - }) - desc := new(hmacKeyDesc) for _, opt := range opts { opt.withHMACKeyDesc(desc) } - if desc.userProjectID != "" { - call = call.UserProject(desc.userProjectID) - } - setClientHeader(call.Header()) - var metadata *raw.HmacKeyMetadata - var err error isIdempotent := len(au.Etag) > 0 - err = run(ctx, func() error { - metadata, err = call.Context(ctx).Do() - return err - }, h.retry, isIdempotent, setRetryHeaderHTTP(call)) - - if err != nil { - return nil, err - } - hk := &raw.HmacKey{ - Metadata: metadata, - } - return toHMACKeyFromRaw(hk, false) + o := makeStorageOpts(isIdempotent, h.retry, desc.userProjectID) + hk, err := h.tc.UpdateHMACKey(ctx, h.projectID, desc.forServiceAccountEmail, h.accessID, &au, o...) + return hk, err } // An HMACKeysIterator is an iterator over HMACKeys. @@ -318,27 +260,13 @@ type HMACKeysIterator struct { // // This method is EXPERIMENTAL and subject to change or removal without notice. func (c *Client) ListHMACKeys(ctx context.Context, projectID string, opts ...HMACKeyOption) *HMACKeysIterator { - it := &HMACKeysIterator{ - ctx: ctx, - raw: raw.NewProjectsHmacKeysService(c.raw), - projectID: projectID, - retry: c.retry, - } - + desc := new(hmacKeyDesc) for _, opt := range opts { - opt.withHMACKeyDesc(&it.desc) + opt.withHMACKeyDesc(desc) } - it.pageInfo, it.nextFunc = iterator.NewPageInfo( - it.fetch, - func() int { return len(it.hmacKeys) - it.index }, - func() interface{} { - prev := it.hmacKeys - it.hmacKeys = it.hmacKeys[:0] - it.index = 0 - return prev - }) - return it + o := makeStorageOpts(true, c.retry, desc.userProjectID) + return c.tc.ListHMACKeys(ctx, projectID, desc.forServiceAccountEmail, desc.showDeletedKeys, o...) } // Next returns the next result. Its second return value is iterator.Done if diff --git a/storage/http_client.go b/storage/http_client.go index ffa2e369943..a589d3d0fba 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -159,7 +159,7 @@ func (c *httpStorageClient) GetServiceAccount(ctx context.Context, project strin return res.EmailAddress, nil } -func (c *httpStorageClient) CreateBucket(ctx context.Context, project string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) { +func (c *httpStorageClient) CreateBucket(ctx context.Context, project, bucket string, attrs *BucketAttrs, opts ...storageOption) (*BucketAttrs, error) { s := callSettings(c.settings, opts...) var bkt *raw.Bucket if attrs != nil { @@ -167,7 +167,7 @@ func (c *httpStorageClient) CreateBucket(ctx context.Context, project string, at } else { bkt = &raw.Bucket{} } - + bkt.Name = bucket // If there is lifecycle information but no location, explicitly set // the location. This is a GCS quirk/bug. if bkt.Location == "" && bkt.Lifecycle != nil { @@ -692,7 +692,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec } call := c.raw.Objects.Compose(req.dstBucket, req.dstObject.name, rawReq).Context(ctx) - if err := applyConds("ComposeFrom destination", -1, req.dstObject.conds, call); err != nil { + if err := applyConds("ComposeFrom destination", defaultGen, req.dstObject.conds, call); err != nil { return nil, err } if s.userProject != "" { @@ -701,7 +701,7 @@ func (c *httpStorageClient) ComposeObject(ctx context.Context, req *composeObjec if req.predefinedACL != "" { call.DestinationPredefinedAcl(req.predefinedACL) } - if err := setEncryptionHeaders(call.Header(), req.encryptionKey, false); err != nil { + if err := setEncryptionHeaders(call.Header(), req.dstObject.encryptionKey, false); err != nil { return nil, err } var obj *raw.Object @@ -760,6 +760,7 @@ func (c *httpStorageClient) RewriteObject(ctx context.Context, req *rewriteObjec r := &rewriteObjectResponse{ done: res.Done, written: res.TotalBytesRewritten, + size: res.ObjectSize, token: res.RewriteToken, resource: newObject(res.Resource), } @@ -773,14 +774,6 @@ func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRange s := callSettings(c.settings, opts...) - if params.offset < 0 && params.length >= 0 { - return nil, fmt.Errorf("storage: invalid offset %d < 0 requires negative length", params.offset) - } - if params.conds != nil { - if err := params.conds.validate("NewRangeReader"); err != nil { - return nil, err - } - } u := &url.URL{ Scheme: c.scheme, Host: c.readHost, @@ -798,10 +791,9 @@ func (c *httpStorageClient) NewRangeReader(ctx context.Context, params *newRange if s.userProject != "" { req.Header.Set("X-Goog-User-Project", s.userProject) } - // TODO(noahdietz): add option for readCompressed. - // if o.readCompressed { - // req.Header.Set("Accept-Encoding", "gzip") - // } + if params.readCompressed { + req.Header.Set("Accept-Encoding", "gzip") + } if err := setEncryptionHeaders(req.Header, params.encryptionKey, false); err != nil { return nil, err } @@ -1026,7 +1018,7 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage return } var resp *raw.Object - err := applyConds("NewWriter", params.attrs.Generation, params.conds, call) + err := applyConds("NewWriter", defaultGen, params.conds, call) if err == nil { if s.userProject != "" { call.UserProject(s.userProject) diff --git a/storage/iam.go b/storage/iam.go index cf9f899a487..408661718fc 100644 --- a/storage/iam.go +++ b/storage/iam.go @@ -27,17 +27,17 @@ import ( // IAM provides access to IAM access control for the bucket. func (b *BucketHandle) IAM() *iam.Handle { return iam.InternalNewHandleClient(&iamClient{ - raw: b.c.raw, userProject: b.userProject, retry: b.retry, + client: b.c, }, b.name) } // iamClient implements the iam.client interface. type iamClient struct { - raw *raw.Service userProject string retry *retryConfig + client *Client } func (c *iamClient) Get(ctx context.Context, resource string) (p *iampb.Policy, err error) { @@ -48,57 +48,25 @@ func (c *iamClient) GetWithVersion(ctx context.Context, resource string, request ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.IAM.Get") defer func() { trace.EndSpan(ctx, err) }() - call := c.raw.Buckets.GetIamPolicy(resource).OptionsRequestedPolicyVersion(int64(requestedPolicyVersion)) - setClientHeader(call.Header()) - if c.userProject != "" { - call.UserProject(c.userProject) - } - var rp *raw.Policy - err = run(ctx, func() error { - rp, err = call.Context(ctx).Do() - return err - }, c.retry, true, setRetryHeaderHTTP(call)) - if err != nil { - return nil, err - } - return iamFromStoragePolicy(rp), nil + o := makeStorageOpts(true, c.retry, c.userProject) + return c.client.tc.GetIamPolicy(ctx, resource, requestedPolicyVersion, o...) } func (c *iamClient) Set(ctx context.Context, resource string, p *iampb.Policy) (err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.IAM.Set") defer func() { trace.EndSpan(ctx, err) }() - rp := iamToStoragePolicy(p) - call := c.raw.Buckets.SetIamPolicy(resource, rp) - setClientHeader(call.Header()) - if c.userProject != "" { - call.UserProject(c.userProject) - } isIdempotent := len(p.Etag) > 0 - return run(ctx, func() error { - _, err := call.Context(ctx).Do() - return err - }, c.retry, isIdempotent, setRetryHeaderHTTP(call)) + o := makeStorageOpts(isIdempotent, c.retry, c.userProject) + return c.client.tc.SetIamPolicy(ctx, resource, p, o...) } func (c *iamClient) Test(ctx context.Context, resource string, perms []string) (permissions []string, err error) { ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.IAM.Test") defer func() { trace.EndSpan(ctx, err) }() - call := c.raw.Buckets.TestIamPermissions(resource, perms) - setClientHeader(call.Header()) - if c.userProject != "" { - call.UserProject(c.userProject) - } - var res *raw.TestIamPermissionsResponse - err = run(ctx, func() error { - res, err = call.Context(ctx).Do() - return err - }, c.retry, true, setRetryHeaderHTTP(call)) - if err != nil { - return nil, err - } - return res.Permissions, nil + o := makeStorageOpts(true, c.retry, c.userProject) + return c.client.tc.TestIamPermissions(ctx, resource, perms, o...) } func iamToStoragePolicy(ip *iampb.Policy) *raw.Policy { diff --git a/storage/integration_test.go b/storage/integration_test.go index 2f46a981365..924eea65edb 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -2735,26 +2735,6 @@ func TestIntegration_PerObjectStorageClass(t *testing.T) { } } -func TestIntegration_BucketInCopyAttrs(t *testing.T) { - // Confirm that if bucket is included in the object attributes of a rewrite - // call, but object name and content-type aren't, then we get an error. See - // the comment in Copier.Run. - ctx := context.Background() - client := testConfig(ctx, t) - defer client.Close() - h := testHelper{t} - - bkt := client.Bucket(bucketName) - obj := bkt.Object("bucketInCopyAttrs") - h.mustWrite(obj.NewWriter(ctx), []byte("foo")) - copier := obj.CopierFrom(obj) - rawObject := copier.ObjectAttrs.toRawObject(bucketName) - _, err := copier.callRewrite(ctx, rawObject) - if err == nil { - t.Errorf("got nil, want error") - } -} - func TestIntegration_NoUnicodeNormalization(t *testing.T) { t.Parallel() ctx := context.Background() @@ -3164,6 +3144,9 @@ func TestIntegration_Notifications(t *testing.T) { if err != nil { t.Fatal(err) } + if n.ID == "" { + t.Fatal("expected created Notification to have non-empty ID") + } nArg.ID = n.ID if !testutil.Equal(n, nArg) { t.Errorf("got %+v, want %+v", n, nArg) diff --git a/storage/internal/apiv2/metadata.go b/storage/internal/apiv2/metadata.go new file mode 100644 index 00000000000..6ff86c4fb49 --- /dev/null +++ b/storage/internal/apiv2/metadata.go @@ -0,0 +1,26 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage + +import ( + "context" + + "google.golang.org/grpc/metadata" +) + +// InsertMetadata inserts the given gRPC metadata into the outgoing context. +func InsertMetadata(ctx context.Context, mds ...metadata.MD) context.Context { + return insertMetadata(ctx, mds...) +} diff --git a/storage/notifications.go b/storage/notifications.go index 587a7b15b0a..614feb7b6da 100644 --- a/storage/notifications.go +++ b/storage/notifications.go @@ -157,21 +157,10 @@ func (b *BucketHandle) AddNotification(ctx context.Context, n *Notification) (re if n.TopicID == "" { return nil, errors.New("storage: AddNotification: missing TopicID") } - call := b.c.raw.Notifications.Insert(b.name, toRawNotification(n)) - setClientHeader(call.Header()) - if b.userProject != "" { - call.UserProject(b.userProject) - } - var rn *raw.Notification - err = run(ctx, func() error { - rn, err = call.Context(ctx).Do() - return err - }, b.retry, false, setRetryHeaderHTTP(call)) - if err != nil { - return nil, err - } - return toNotification(rn), nil + opts := makeStorageOpts(false, b.retry, b.userProject) + ret, err = b.c.tc.CreateNotification(ctx, b.name, n, opts...) + return ret, err } // Notifications returns all the Notifications configured for this bucket, as a map @@ -180,20 +169,9 @@ func (b *BucketHandle) Notifications(ctx context.Context) (n map[string]*Notific ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.Notifications") defer func() { trace.EndSpan(ctx, err) }() - call := b.c.raw.Notifications.List(b.name) - setClientHeader(call.Header()) - if b.userProject != "" { - call.UserProject(b.userProject) - } - var res *raw.Notifications - err = run(ctx, func() error { - res, err = call.Context(ctx).Do() - return err - }, b.retry, true, setRetryHeaderHTTP(call)) - if err != nil { - return nil, err - } - return notificationsToMap(res.Items), nil + opts := makeStorageOpts(true, b.retry, b.userProject) + n, err = b.c.tc.ListNotifications(ctx, b.name, opts...) + return n, err } func notificationsToMap(rns []*raw.Notification) map[string]*Notification { @@ -217,12 +195,6 @@ func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) (err e ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.DeleteNotification") defer func() { trace.EndSpan(ctx, err) }() - call := b.c.raw.Notifications.Delete(b.name, id) - setClientHeader(call.Header()) - if b.userProject != "" { - call.UserProject(b.userProject) - } - return run(ctx, func() error { - return call.Context(ctx).Do() - }, b.retry, true, setRetryHeaderHTTP(call)) + opts := makeStorageOpts(true, b.retry, b.userProject) + return b.c.tc.DeleteNotification(ctx, b.name, id, opts...) } diff --git a/storage/reader.go b/storage/reader.go index d895a4390db..46487d2b77d 100644 --- a/storage/reader.go +++ b/storage/reader.go @@ -16,19 +16,15 @@ package storage import ( "context" - "errors" "fmt" "hash/crc32" "io" "io/ioutil" "net/http" - "net/url" - "strconv" "strings" "time" "cloud.google.com/go/internal/trace" - "google.golang.org/api/googleapi" ) var crc32cTable = crc32.MakeTable(crc32.Castagnoli) @@ -94,10 +90,6 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.NewRangeReader") defer func() { trace.EndSpan(ctx, err) }() - if o.c.tc != nil { - return o.newRangeReaderWithGRPC(ctx, offset, length) - } - if err := o.validate(); err != nil { return nil, err } @@ -109,200 +101,23 @@ func (o *ObjectHandle) NewRangeReader(ctx context.Context, offset, length int64) return nil, err } } - u := &url.URL{ - Scheme: o.c.scheme, - Host: o.c.readHost, - Path: fmt.Sprintf("/%s/%s", o.bucket, o.object), - } - verb := "GET" - if length == 0 { - verb = "HEAD" - } - req, err := http.NewRequest(verb, u.String(), nil) - if err != nil { - return nil, err - } - req = req.WithContext(ctx) - if o.userProject != "" { - req.Header.Set("X-Goog-User-Project", o.userProject) - } - if o.readCompressed { - req.Header.Set("Accept-Encoding", "gzip") - } - if err := setEncryptionHeaders(req.Header, o.encryptionKey, false); err != nil { - return nil, err - } - gen := o.gen + opts := makeStorageOpts(true, o.retry, o.userProject) - // Define a function that initiates a Read with offset and length, assuming we - // have already read seen bytes. - reopen := func(seen int64) (*http.Response, error) { - // If the context has already expired, return immediately without making a - // call. - if err := ctx.Err(); err != nil { - return nil, err - } - start := offset + seen - if length < 0 && start < 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d", start)) - } else if length < 0 && start > 0 { - req.Header.Set("Range", fmt.Sprintf("bytes=%d-", start)) - } else if length > 0 { - // The end character isn't affected by how many bytes we've seen. - req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", start, offset+length-1)) - } - // We wait to assign conditions here because the generation number can change in between reopen() runs. - if err := setConditionsHeaders(req.Header, o.conds); err != nil { - return nil, err - } - // If an object generation is specified, include generation as query string parameters. - if gen >= 0 { - req.URL.RawQuery = fmt.Sprintf("generation=%d", gen) - } - - var res *http.Response - err = run(ctx, func() error { - res, err = o.c.hc.Do(req) - if err != nil { - return err - } - if res.StatusCode == http.StatusNotFound { - res.Body.Close() - return ErrObjectNotExist - } - if res.StatusCode < 200 || res.StatusCode > 299 { - body, _ := ioutil.ReadAll(res.Body) - res.Body.Close() - return &googleapi.Error{ - Code: res.StatusCode, - Header: res.Header, - Body: string(body), - } - } - - partialContentNotSatisfied := - !decompressiveTranscoding(res) && - start > 0 && length != 0 && - res.StatusCode != http.StatusPartialContent - - if partialContentNotSatisfied { - res.Body.Close() - return errors.New("storage: partial request not satisfied") - } - - // With "Content-Encoding": "gzip" aka decompressive transcoding, GCS serves - // back the whole file regardless of the range count passed in as per: - // https://cloud.google.com/storage/docs/transcoding#range, - // thus we have to manually move the body forward by seen bytes. - if decompressiveTranscoding(res) && seen > 0 { - _, _ = io.CopyN(ioutil.Discard, res.Body, seen) - } - - // If a generation hasn't been specified, and this is the first response we get, let's record the - // generation. In future requests we'll use this generation as a precondition to avoid data races. - if gen < 0 && res.Header.Get("X-Goog-Generation") != "" { - gen64, err := strconv.ParseInt(res.Header.Get("X-Goog-Generation"), 10, 64) - if err != nil { - return err - } - gen = gen64 - } - return nil - }, o.retry, true, setRetryHeaderHTTP(&readerRequestWrapper{req})) - if err != nil { - return nil, err - } - return res, nil - } - - res, err := reopen(0) - if err != nil { - return nil, err - } - var ( - size int64 // total size of object, even if a range was requested. - checkCRC bool - crc uint32 - startOffset int64 // non-zero if range request. - ) - if res.StatusCode == http.StatusPartialContent { - cr := strings.TrimSpace(res.Header.Get("Content-Range")) - if !strings.HasPrefix(cr, "bytes ") || !strings.Contains(cr, "/") { - return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) - } - // Content range is formatted -/. We take - // the total size. - size, err = strconv.ParseInt(cr[strings.LastIndex(cr, "/")+1:], 10, 64) - if err != nil { - return nil, fmt.Errorf("storage: invalid Content-Range %q", cr) - } - - dashIndex := strings.Index(cr, "-") - if dashIndex >= 0 { - startOffset, err = strconv.ParseInt(cr[len("bytes="):dashIndex], 10, 64) - if err != nil { - return nil, fmt.Errorf("storage: invalid Content-Range %q: %v", cr, err) - } - } - } else { - size = res.ContentLength - // Check the CRC iff all of the following hold: - // - We asked for content (length != 0). - // - We got all the content (status != PartialContent). - // - The server sent a CRC header. - // - The Go http stack did not uncompress the file. - // - We were not served compressed data that was uncompressed on download. - // The problem with the last two cases is that the CRC will not match -- GCS - // computes it on the compressed contents, but we compute it on the - // uncompressed contents. - if length != 0 && !res.Uncompressed && !uncompressedByServer(res) { - crc, checkCRC = parseCRC32c(res) - } - } - - remain := res.ContentLength - body := res.Body - if length == 0 { - remain = 0 - body.Close() - body = emptyBody - } - var metaGen int64 - if res.Header.Get("X-Goog-Metageneration") != "" { - metaGen, err = strconv.ParseInt(res.Header.Get("X-Goog-Metageneration"), 10, 64) - if err != nil { - return nil, err - } + params := &newRangeReaderParams{ + bucket: o.bucket, + object: o.object, + gen: o.gen, + offset: offset, + length: length, + encryptionKey: o.encryptionKey, + conds: o.conds, + readCompressed: o.readCompressed, } - var lm time.Time - if res.Header.Get("Last-Modified") != "" { - lm, err = http.ParseTime(res.Header.Get("Last-Modified")) - if err != nil { - return nil, err - } - } + r, err = o.c.tc.NewRangeReader(ctx, params, opts...) - attrs := ReaderObjectAttrs{ - Size: size, - ContentType: res.Header.Get("Content-Type"), - ContentEncoding: res.Header.Get("Content-Encoding"), - CacheControl: res.Header.Get("Cache-Control"), - LastModified: lm, - StartOffset: startOffset, - Generation: gen, - Metageneration: metaGen, - } - return &Reader{ - Attrs: attrs, - body: body, - size: size, - remain: remain, - wantCRC: crc, - checkCRC: checkCRC, - reopen: reopen, - }, nil + return r, err } // decompressiveTranscoding returns true if the request was served decompressed @@ -375,37 +190,21 @@ var emptyBody = ioutil.NopCloser(strings.NewReader("")) // is skipped if transcoding occurs. See https://cloud.google.com/storage/docs/transcoding. type Reader struct { Attrs ReaderObjectAttrs - body io.ReadCloser seen, remain, size int64 checkCRC bool // should we check the CRC? wantCRC uint32 // the CRC32c value the server sent in the header gotCRC uint32 // running crc - reopen func(seen int64) (*http.Response, error) reader io.ReadCloser } // Close closes the Reader. It must be called when done reading. func (r *Reader) Close() error { - if r.body != nil { - return r.body.Close() - } - - // TODO(noahdietz): Complete integration means returning this call's return - // value, which for gRPC will always be nil. - if r.reader != nil { - return r.reader.Close() - } - return nil + return r.reader.Close() } func (r *Reader) Read(p []byte) (int, error) { - read := r.readWithRetry - if r.reader != nil { - read = r.reader.Read - } - - n, err := read(p) + n, err := r.reader.Read(p) if r.remain != -1 { r.remain -= int64(n) } @@ -424,56 +223,6 @@ func (r *Reader) Read(p []byte) (int, error) { return n, err } -// newRangeReaderWithGRPC creates a new Reader with the given range that uses -// gRPC to read Object content. -// -// This is an experimental API and not intended for public use. -func (o *ObjectHandle) newRangeReaderWithGRPC(ctx context.Context, offset, length int64) (r *Reader, err error) { - ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Object.newRangeReaderWithGRPC") - defer func() { trace.EndSpan(ctx, err) }() - - if err = o.validate(); err != nil { - return - } - - params := &newRangeReaderParams{ - bucket: o.bucket, - object: o.object, - gen: o.gen, - offset: offset, - length: length, - encryptionKey: o.encryptionKey, - conds: o.conds, - } - - r, err = o.c.tc.NewRangeReader(ctx, params) - - return r, err -} - -func (r *Reader) readWithRetry(p []byte) (int, error) { - n := 0 - for len(p[n:]) > 0 { - m, err := r.body.Read(p[n:]) - n += m - r.seen += int64(m) - if err == nil || err == io.EOF { - return n, err - } - // Read failed (likely due to connection issues), but we will try to reopen - // the pipe and continue. Send a ranged read request that takes into account - // the number of bytes we've already seen. - res, err := r.reopen(r.seen) - if err != nil { - // reopen already retries - return n, err - } - r.body.Close() - r.body = res.Body - } - return n, nil -} - // Size returns the size of the object in bytes. // The returned value is always the same and is not affected by // calls to Read or Close. diff --git a/storage/reader_test.go b/storage/reader_test.go index 9d19af873da..563e55cfe7d 100644 --- a/storage/reader_test.go +++ b/storage/reader_test.go @@ -134,6 +134,10 @@ func (h http2Error) Error() string { return string(h) } +// TestRangeReaderRetry tests Reader resumption logic. It ensures that offset +// and seen bytes are handled correctly so that data is not corrupted. +// This tests only works for the HTTP Reader. +// TODO: Design a similar test for gRPC. func TestRangeReaderRetry(t *testing.T) { internalErr := http2Error("blah blah INTERNAL_ERROR") goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"") @@ -240,11 +244,13 @@ func TestRangeReaderRetry(t *testing.T) { t.Errorf("#%d: %v", i, err) continue } - r.body = &test.bodies[0] b := 0 - r.reopen = func(int64) (*http.Response, error) { - b++ - return &http.Response{Body: &test.bodies[b]}, nil + r.reader = &httpReader{ + body: &test.bodies[0], + reopen: func(int64) (*http.Response, error) { + b++ + return &http.Response{Body: &test.bodies[b]}, nil + }, } buf := make([]byte, len(readData)/2) var gotb []byte diff --git a/storage/storage.go b/storage/storage.go index 95fe8fc62c8..3d42caa4289 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -115,6 +115,10 @@ type Client struct { // tc is the transport-agnostic client implemented with either gRPC or HTTP. tc storageClient + // useGRPC flags whether the client uses gRPC. This is needed while the + // integration piece is only partially complete. + // TODO: remove before merging to main. + useGRPC bool } // NewClient creates a new Google Cloud Storage client. @@ -195,12 +199,18 @@ func NewClient(ctx context.Context, opts ...option.ClientOption) (*Client, error return nil, fmt.Errorf("supplied endpoint %q is not valid: %v", ep, err) } + tc, err := newHTTPStorageClient(ctx, withClientOptions(opts...)) + if err != nil { + return nil, fmt.Errorf("storage: %v", err) + } + return &Client{ hc: hc, raw: rawService, scheme: u.Scheme, readHost: u.Host, creds: creds, + tc: tc, }, nil } @@ -215,7 +225,7 @@ func newGRPCClient(ctx context.Context, opts ...option.ClientOption) (*Client, e return nil, err } - return &Client{tc: tc}, nil + return &Client{tc: tc, useGRPC: true}, nil } // Close closes the Client. @@ -907,27 +917,8 @@ func (o *ObjectHandle) Attrs(ctx context.Context) (attrs *ObjectAttrs, err error if err := o.validate(); err != nil { return nil, err } - call := o.c.raw.Objects.Get(o.bucket, o.object).Projection("full").Context(ctx) - if err := applyConds("Attrs", o.gen, o.conds, call); err != nil { - return nil, err - } - if o.userProject != "" { - call.UserProject(o.userProject) - } - if err := setEncryptionHeaders(call.Header(), o.encryptionKey, false); err != nil { - return nil, err - } - var obj *raw.Object - setClientHeader(call.Header()) - err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, true, setRetryHeaderHTTP(call)) - var e *googleapi.Error - if errors.As(err, &e) && e.Code == http.StatusNotFound { - return nil, ErrObjectNotExist - } - if err != nil { - return nil, err - } - return newObject(obj), nil + opts := makeStorageOpts(true, o.retry, o.userProject) + return o.c.tc.GetObject(ctx, o.bucket, o.object, o.gen, o.encryptionKey, o.conds, opts...) } // Update updates an object with the provided attributes. See @@ -940,99 +931,9 @@ func (o *ObjectHandle) Update(ctx context.Context, uattrs ObjectAttrsToUpdate) ( if err := o.validate(); err != nil { return nil, err } - var attrs ObjectAttrs - // Lists of fields to send, and set to null, in the JSON. - var forceSendFields, nullFields []string - if uattrs.ContentType != nil { - attrs.ContentType = optional.ToString(uattrs.ContentType) - // For ContentType, sending the empty string is a no-op. - // Instead we send a null. - if attrs.ContentType == "" { - nullFields = append(nullFields, "ContentType") - } else { - forceSendFields = append(forceSendFields, "ContentType") - } - } - if uattrs.ContentLanguage != nil { - attrs.ContentLanguage = optional.ToString(uattrs.ContentLanguage) - // For ContentLanguage it's an error to send the empty string. - // Instead we send a null. - if attrs.ContentLanguage == "" { - nullFields = append(nullFields, "ContentLanguage") - } else { - forceSendFields = append(forceSendFields, "ContentLanguage") - } - } - if uattrs.ContentEncoding != nil { - attrs.ContentEncoding = optional.ToString(uattrs.ContentEncoding) - forceSendFields = append(forceSendFields, "ContentEncoding") - } - if uattrs.ContentDisposition != nil { - attrs.ContentDisposition = optional.ToString(uattrs.ContentDisposition) - forceSendFields = append(forceSendFields, "ContentDisposition") - } - if uattrs.CacheControl != nil { - attrs.CacheControl = optional.ToString(uattrs.CacheControl) - forceSendFields = append(forceSendFields, "CacheControl") - } - if uattrs.EventBasedHold != nil { - attrs.EventBasedHold = optional.ToBool(uattrs.EventBasedHold) - forceSendFields = append(forceSendFields, "EventBasedHold") - } - if uattrs.TemporaryHold != nil { - attrs.TemporaryHold = optional.ToBool(uattrs.TemporaryHold) - forceSendFields = append(forceSendFields, "TemporaryHold") - } - if !uattrs.CustomTime.IsZero() { - attrs.CustomTime = uattrs.CustomTime - forceSendFields = append(forceSendFields, "CustomTime") - } - if uattrs.Metadata != nil { - attrs.Metadata = uattrs.Metadata - if len(attrs.Metadata) == 0 { - // Sending the empty map is a no-op. We send null instead. - nullFields = append(nullFields, "Metadata") - } else { - forceSendFields = append(forceSendFields, "Metadata") - } - } - if uattrs.ACL != nil { - attrs.ACL = uattrs.ACL - // It's an error to attempt to delete the ACL, so - // we don't append to nullFields here. - forceSendFields = append(forceSendFields, "Acl") - } - rawObj := attrs.toRawObject(o.bucket) - rawObj.ForceSendFields = forceSendFields - rawObj.NullFields = nullFields - call := o.c.raw.Objects.Patch(o.bucket, o.object, rawObj).Projection("full").Context(ctx) - if err := applyConds("Update", o.gen, o.conds, call); err != nil { - return nil, err - } - if o.userProject != "" { - call.UserProject(o.userProject) - } - if uattrs.PredefinedACL != "" { - call.PredefinedAcl(uattrs.PredefinedACL) - } - if err := setEncryptionHeaders(call.Header(), o.encryptionKey, false); err != nil { - return nil, err - } - var obj *raw.Object - setClientHeader(call.Header()) - var isIdempotent bool - if o.conds != nil && o.conds.MetagenerationMatch != 0 { - isIdempotent = true - } - err = run(ctx, func() error { obj, err = call.Do(); return err }, o.retry, isIdempotent, setRetryHeaderHTTP(call)) - var e *googleapi.Error - if errors.As(err, &e) && e.Code == http.StatusNotFound { - return nil, ErrObjectNotExist - } - if err != nil { - return nil, err - } - return newObject(obj), nil + isIdempotent := o.conds != nil && o.conds.MetagenerationMatch != 0 + opts := makeStorageOpts(isIdempotent, o.retry, o.userProject) + return o.c.tc.UpdateObject(ctx, o.bucket, o.object, &uattrs, o.gen, o.encryptionKey, o.conds, opts...) } // BucketName returns the name of the bucket. @@ -1080,27 +981,11 @@ func (o *ObjectHandle) Delete(ctx context.Context) error { if err := o.validate(); err != nil { return err } - call := o.c.raw.Objects.Delete(o.bucket, o.object).Context(ctx) - if err := applyConds("Delete", o.gen, o.conds, call); err != nil { - return err - } - if o.userProject != "" { - call.UserProject(o.userProject) - } - // Encryption doesn't apply to Delete. - setClientHeader(call.Header()) - var isIdempotent bool // Delete is idempotent if GenerationMatch or Generation have been passed in. // The default generation is negative to get the latest version of the object. - if (o.conds != nil && o.conds.GenerationMatch != 0) || o.gen >= 0 { - isIdempotent = true - } - err := run(ctx, func() error { return call.Do() }, o.retry, isIdempotent, setRetryHeaderHTTP(call)) - var e *googleapi.Error - if errors.As(err, &e) && e.Code == http.StatusNotFound { - return ErrObjectNotExist - } - return err + isIdempotent := (o.conds != nil && o.conds.GenerationMatch != 0) || o.gen >= 0 + opts := makeStorageOpts(isIdempotent, o.retry, o.userProject) + return o.c.tc.DeleteObject(ctx, o.bucket, o.object, o.gen, o.conds, opts...) } // ReadCompressed when true causes the read to happen without decompressing. @@ -2097,17 +1982,9 @@ func toProtoCommonObjectRequestParams(key []byte) *storagepb.CommonObjectRequest // ServiceAccount fetches the email address of the given project's Google Cloud Storage service account. func (c *Client) ServiceAccount(ctx context.Context, projectID string) (string, error) { - r := c.raw.Projects.ServiceAccount.Get(projectID) - var res *raw.ServiceAccount - var err error - err = run(ctx, func() error { - res, err = r.Context(ctx).Do() - return err - }, c.retry, true, setRetryHeaderHTTP(r)) - if err != nil { - return "", err - } - return res.EmailAddress, nil + o := makeStorageOpts(true, c.retry, "") + return c.tc.GetServiceAccount(ctx, projectID, o...) + } // bucketResourceName formats the given project ID and bucketResourceName ID diff --git a/storage/storage_test.go b/storage/storage_test.go index c24a41b80d9..c1bce60c070 100644 --- a/storage/storage_test.go +++ b/storage/storage_test.go @@ -922,7 +922,7 @@ func TestCondition(t *testing.T) { // Test an error, too: err = obj.Generation(1234).NewWriter(ctx).Close() - if err == nil || !strings.Contains(err.Error(), "NewWriter: generation not supported") { + if err == nil || !strings.Contains(err.Error(), "storage: generation not supported") { t.Errorf("want error about unsupported generation; got %v", err) } } @@ -1354,16 +1354,6 @@ func TestRetryer(t *testing.T) { r: c.HMACKeyHandle("pID", "accessID").retry, want: c.retry, }, - { - name: "client.Buckets()", - r: c.Buckets(ctx, "pID").client.retry, - want: c.retry, - }, - { - name: "bucket.Objects()", - r: b.Objects(ctx, nil).bucket.retry, - want: b.retry, - }, } for _, ac := range configHandleCases { s.Run(ac.name, func(ss *testing.T) { diff --git a/storage/writer.go b/storage/writer.go index 0a9bc2bcee4..91229f148a6 100644 --- a/storage/writer.go +++ b/storage/writer.go @@ -16,25 +16,12 @@ package storage import ( "context" - "encoding/base64" "errors" "fmt" "io" "sync" "time" "unicode/utf8" - - storagepb "cloud.google.com/go/storage/internal/apiv2/stubs" - "google.golang.org/api/googleapi" - raw "google.golang.org/api/storage/v1" -) - -const ( - // Maximum amount of content that can be sent per WriteObjectRequest message. - // A buffer reaching this amount will precipitate a flush of the buffer. - // - // This is only used for the gRPC-based Writer. - maxPerMessageWriteSize int = int(storagepb.ServiceConstants_MAX_WRITE_CHUNK_BYTES) ) // A Writer writes a Cloud Storage object. @@ -122,102 +109,6 @@ type Writer struct { err error } -func (w *Writer) open() error { - if err := w.validateWriteAttrs(); err != nil { - return err - } - - pr, pw := io.Pipe() - w.pw = pw - w.opened = true - - go w.monitorCancel() - - attrs := w.ObjectAttrs - mediaOpts := []googleapi.MediaOption{ - googleapi.ChunkSize(w.ChunkSize), - } - if c := attrs.ContentType; c != "" { - mediaOpts = append(mediaOpts, googleapi.ContentType(c)) - } - if w.ChunkRetryDeadline != 0 { - mediaOpts = append(mediaOpts, googleapi.ChunkRetryDeadline(w.ChunkRetryDeadline)) - } - - go func() { - defer close(w.donec) - - rawObj := attrs.toRawObject(w.o.bucket) - if w.SendCRC32C { - rawObj.Crc32c = encodeUint32(attrs.CRC32C) - } - if w.MD5 != nil { - rawObj.Md5Hash = base64.StdEncoding.EncodeToString(w.MD5) - } - call := w.o.c.raw.Objects.Insert(w.o.bucket, rawObj). - Media(pr, mediaOpts...). - Projection("full"). - Context(w.ctx). - Name(w.o.object) - - if w.ProgressFunc != nil { - call.ProgressUpdater(func(n, _ int64) { w.ProgressFunc(n) }) - } - if attrs.KMSKeyName != "" { - call.KmsKeyName(attrs.KMSKeyName) - } - if attrs.PredefinedACL != "" { - call.PredefinedAcl(attrs.PredefinedACL) - } - if err := setEncryptionHeaders(call.Header(), w.o.encryptionKey, false); err != nil { - w.mu.Lock() - w.err = err - w.mu.Unlock() - pr.CloseWithError(err) - return - } - var resp *raw.Object - err := applyConds("NewWriter", w.o.gen, w.o.conds, call) - if err == nil { - if w.o.userProject != "" { - call.UserProject(w.o.userProject) - } - setClientHeader(call.Header()) - - // The internals that perform call.Do automatically retry both the initial - // call to set up the upload as well as calls to upload individual chunks - // for a resumable upload (as long as the chunk size is non-zero). Hence - // there is no need to add retries here. - - // Retry only when the operation is idempotent or the retry policy is RetryAlways. - isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true) - var useRetry bool - if (w.o.retry == nil || w.o.retry.policy == RetryIdempotent) && isIdempotent { - useRetry = true - } else if w.o.retry != nil && w.o.retry.policy == RetryAlways { - useRetry = true - } - if useRetry { - if w.o.retry != nil { - call.WithRetry(w.o.retry.backoff, w.o.retry.shouldRetry) - } else { - call.WithRetry(nil, nil) - } - } - resp, err = call.Do() - } - if err != nil { - w.mu.Lock() - w.err = err - w.mu.Unlock() - pr.CloseWithError(err) - return - } - w.obj = newObject(resp) - }() - return nil -} - // Write appends to w. It implements the io.Writer interface. // // Since writes happen asynchronously, Write may return a nil @@ -235,12 +126,7 @@ func (w *Writer) Write(p []byte) (n int, err error) { return 0, werr } if !w.opened { - // gRPC client has been initialized - use gRPC to upload. - if w.o.c.tc != nil { - if err := w.openWriter(); err != nil { - return 0, err - } - } else if err := w.open(); err != nil { + if err := w.openWriter(); err != nil { return 0, err } } @@ -264,11 +150,7 @@ func (w *Writer) Write(p []byte) (n int, err error) { // can be retrieved by calling Attrs. func (w *Writer) Close() error { if !w.opened { - if w.o.c.tc != nil { - if err := w.openWriter(); err != nil { - return err - } - } else if err := w.open(); err != nil { + if err := w.openWriter(); err != nil { return err } } @@ -288,7 +170,12 @@ func (w *Writer) openWriter() (err error) { if err := w.validateWriteAttrs(); err != nil { return err } + if w.o.gen != defaultGen { + return fmt.Errorf("storage: generation not supported on Writer, got %v", w.o.gen) + } + isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true) + opts := makeStorageOpts(isIdempotent, w.o.retry, w.o.userProject) go w.monitorCancel() params := &openWriterParams{ ctx: w.ctx, @@ -304,7 +191,7 @@ func (w *Writer) openWriter() (err error) { progress: w.progress, setObj: func(o *ObjectAttrs) { w.obj = o }, } - w.pw, err = w.o.c.tc.OpenWriter(params) + w.pw, err = w.o.c.tc.OpenWriter(params, opts...) if err != nil { return err }