From 43dbbd7d875bfca1b370c482000d325ef7a97102 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 27 Sep 2022 22:40:16 -0400 Subject: [PATCH] chore(storage): retry configs for gRPC media ops (#6754) Adds retry config to grpcReader and grpcWriter types so that they can be propagated to operations as needed. Ensure that the configured values are used for all retry calls, and that UserProject is consistently passed through as well. --- storage/grpc_client.go | 45 +++++++++++++++++++++++++++++++----------- storage/http_client.go | 3 +-- 2 files changed, 35 insertions(+), 13 deletions(-) diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 049fbae2f8e..a7c0d04f7fd 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -943,6 +943,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange // Store the content from the first Recv in the // client buffer for reading later. leftovers: msg.GetChecksummedData().GetContent(), + settings: s, }, } @@ -964,6 +965,8 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange } func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) { + s := callSettings(c.settings, opts...) + var offset int64 errorf := params.setError progress := params.progress @@ -971,6 +974,10 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage pr, pw := io.Pipe() gw := newGRPCWriter(c, params, pr) + gw.settings = s + if s.userProject != "" { + gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject) + } // This function reads the data sent to the pipe and sends sets of messages // on the gRPC client-stream as the buffer is filled. @@ -1315,6 +1322,7 @@ type gRPCReader struct { reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error) leftovers []byte cancel context.CancelFunc + settings *settings } // Read reads bytes into the user's buffer from an open gRPC stream. @@ -1390,7 +1398,11 @@ func (r *gRPCReader) Close() error { // an attempt to reopen the stream. func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) { msg, err := r.stream.Recv() - if err != nil && ShouldRetry(err) { + var shouldRetry = ShouldRetry + if r.settings.retry != nil { + shouldRetry = r.settings.retry.shouldRetry + } + if err != nil && shouldRetry(err) { // This will "close" the existing stream and immediately attempt to // reopen the stream, but will backoff if further attempts are necessary. // Reopening the stream Recvs the first message, so if retrying is @@ -1454,6 +1466,7 @@ type gRPCWriter struct { attrs *ObjectAttrs conds *Conditions encryptionKey []byte + settings *settings sendCRC32C bool @@ -1471,21 +1484,27 @@ func (w *gRPCWriter) startResumableUpload() error { if err != nil { return err } - upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{ - WriteObjectSpec: spec, - }) - - w.upid = upres.GetUploadId() - return err + return run(w.ctx, func() error { + upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{ + WriteObjectSpec: spec, + }) + w.upid = upres.GetUploadId() + return err + }, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx)) } // queryProgress is a helper that queries the status of the resumable upload // associated with the given upload ID. func (w *gRPCWriter) queryProgress() (int64, error) { - q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid}) + var persistedSize int64 + err := run(w.ctx, func() error { + q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid}) + persistedSize = q.GetPersistedSize() + return err + }, w.settings.retry, true, setRetryHeaderGRPC(w.ctx)) // q.GetCommittedSize() will return 0 if q is nil. - return q.GetPersistedSize(), err + return persistedSize, err } // uploadBuffer opens a Write stream and uploads the buffer at the given offset (if @@ -1500,6 +1519,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st var err error var finishWrite bool var sent, limit int = 0, maxPerMessageWriteSize + var shouldRetry = ShouldRetry + if w.settings.retry != nil { + shouldRetry = w.settings.retry.shouldRetry + } offset := start toWrite := w.buf[:recvd] for { @@ -1570,7 +1593,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st // resend the entire buffer via a new stream. // If not retriable, falling through will return the error received // from closing the stream. - if ShouldRetry(err) { + if shouldRetry(err) { sent = 0 finishWrite = false // TODO: Add test case for failure modes of querying progress. @@ -1601,7 +1624,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st // resend the entire buffer via a new stream. // If not retriable, falling through will return the error received // from closing the stream. - if ShouldRetry(err) { + if shouldRetry(err) { sent = 0 finishWrite = false offset, err = w.determineOffset(start) diff --git a/storage/http_client.go b/storage/http_client.go index a589d3d0fba..3d1cf8d4fff 100644 --- a/storage/http_client.go +++ b/storage/http_client.go @@ -1033,9 +1033,8 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage // there is no need to add retries here. // Retry only when the operation is idempotent or the retry policy is RetryAlways. - isIdempotent := params.conds != nil && (params.conds.GenerationMatch >= 0 || params.conds.DoesNotExist == true) var useRetry bool - if (s.retry == nil || s.retry.policy == RetryIdempotent) && isIdempotent { + if (s.retry == nil || s.retry.policy == RetryIdempotent) && s.idempotent { useRetry = true } else if s.retry != nil && s.retry.policy == RetryAlways { useRetry = true