From 7b848ac4a5e1c58d386296bb2abe9f6ad2a41688 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 27 Dec 2022 16:54:22 -0500 Subject: [PATCH 1/2] chore(storage): fix checksums for gRPC uploads There was an API change so that checksums can now only be provided by the StartResumableUpload request rather than while uploading. Send the checksum at this stage instead. --- storage/grpc_client.go | 43 +++++++++++++++++++------------------ storage/integration_test.go | 11 ++++------ 2 files changed, 26 insertions(+), 28 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 4a44cee8b67..6a997befc06 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -1496,11 +1496,29 @@ func (w *gRPCWriter) startResumableUpload() error { if err != nil { return err } + req := &storagepb.StartResumableWriteRequest{ + WriteObjectSpec: spec, + CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), + } + // TODO: Currently the checksums are only sent on the first message + // of the stream, but in the future, we must also support sending it + // on the *last* message of the stream (instead of the first). + if w.sendCRC32C { + req.ObjectChecksums = &storagepb.ObjectChecksums{ + Crc32C: proto.Uint32(w.attrs.CRC32C), + } + } + if len(w.attrs.MD5) != 0 { + if cs := req.GetObjectChecksums(); cs == nil { + req.ObjectChecksums = &storagepb.ObjectChecksums{ + Md5Hash: w.attrs.MD5, + } + } else { + cs.Md5Hash = w.attrs.MD5 + } + } return run(w.ctx, func() error { - upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{ - WriteObjectSpec: spec, - CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey), - }) + upres, err := w.c.raw.StartResumableWrite(w.ctx, req) w.upid = upres.GetUploadId() return err }, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx)) @@ -1587,23 +1605,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st req.CommonObjectRequestParams = toProtoCommonObjectRequestParams(w.encryptionKey) } - // TODO: Currently the checksums are only sent on the first message - // of the stream, but in the future, we must also support sending it - // on the *last* message of the stream (instead of the first). - if w.sendCRC32C { - req.ObjectChecksums = &storagepb.ObjectChecksums{ - Crc32C: proto.Uint32(w.attrs.CRC32C), - } - } - if len(w.attrs.MD5) != 0 { - if cs := req.GetObjectChecksums(); cs == nil { - req.ObjectChecksums = &storagepb.ObjectChecksums{ - Md5Hash: w.attrs.MD5, - } - } else { - cs.Md5Hash = w.attrs.MD5 - } - } } err = w.stream.Send(req) diff --git a/storage/integration_test.go b/storage/integration_test.go index 518976136c3..52749b24968 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -265,8 +265,8 @@ func initTransportClients(ctx context.Context, t *testing.T, opts ...option.Clie // of an existing bucket to use, a bucket name to use for bucket creation, and // the client to use. func multiTransportTest(ctx context.Context, t *testing.T, - test func(*testing.T, context.Context, string, string, *Client), - opts ...option.ClientOption) { + test func(*testing.T, context.Context, string, string, *Client), + opts ...option.ClientOption) { for transport, client := range initTransportClients(ctx, t, opts...) { t.Run(transport, func(t *testing.T) { defer client.Close() @@ -1082,9 +1082,6 @@ func TestIntegration_MultiMessageWriteGRPC(t *testing.T) { func TestIntegration_MultiChunkWrite(t *testing.T) { multiTransportTest(context.Background(), t, func(t *testing.T, ctx context.Context, bucket string, _ string, client *Client) { - if bucket == grpcBucketName { - t.Skip("https://github.com/googleapis/google-cloud-go/issues/7033") - } h := testHelper{t} obj := client.Bucket(bucket).Object(uidSpace.New()).Retryer(WithPolicy(RetryAlways)) defer h.mustDeleteObject(obj) @@ -5150,8 +5147,8 @@ func retryOnTransient400and403(err error) bool { var e *googleapi.Error var ae *apierror.APIError return ShouldRetry(err) || - /* http */ errors.As(err, &e) && (e.Code == 400 || e.Code == 403) || - /* grpc */ errors.As(err, &ae) && (ae.GRPCStatus().Code() == codes.InvalidArgument || ae.GRPCStatus().Code() == codes.PermissionDenied) + /* http */ errors.As(err, &e) && (e.Code == 400 || e.Code == 403) || + /* grpc */ errors.As(err, &ae) && (ae.GRPCStatus().Code() == codes.InvalidArgument || ae.GRPCStatus().Code() == codes.PermissionDenied) } func skipGRPC(reason string) context.Context { From 9745eb4d2edc63e63d7c3ae17a0baf2aede21b92 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 27 Dec 2022 17:02:04 -0500 Subject: [PATCH 2/2] fmt --- storage/integration_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/storage/integration_test.go b/storage/integration_test.go index 52749b24968..9d9f3158975 100644 --- a/storage/integration_test.go +++ b/storage/integration_test.go @@ -265,8 +265,8 @@ func initTransportClients(ctx context.Context, t *testing.T, opts ...option.Clie // of an existing bucket to use, a bucket name to use for bucket creation, and // the client to use. func multiTransportTest(ctx context.Context, t *testing.T, - test func(*testing.T, context.Context, string, string, *Client), - opts ...option.ClientOption) { + test func(*testing.T, context.Context, string, string, *Client), + opts ...option.ClientOption) { for transport, client := range initTransportClients(ctx, t, opts...) { t.Run(transport, func(t *testing.T) { defer client.Close() @@ -5147,8 +5147,8 @@ func retryOnTransient400and403(err error) bool { var e *googleapi.Error var ae *apierror.APIError return ShouldRetry(err) || - /* http */ errors.As(err, &e) && (e.Code == 400 || e.Code == 403) || - /* grpc */ errors.As(err, &ae) && (ae.GRPCStatus().Code() == codes.InvalidArgument || ae.GRPCStatus().Code() == codes.PermissionDenied) + /* http */ errors.As(err, &e) && (e.Code == 400 || e.Code == 403) || + /* grpc */ errors.As(err, &ae) && (ae.GRPCStatus().Code() == codes.InvalidArgument || ae.GRPCStatus().Code() == codes.PermissionDenied) } func skipGRPC(reason string) context.Context {