Skip to content

Commit

Permalink
chore(storage): retry configs for gRPC media ops (#6754)
Browse files Browse the repository at this point in the history
Adds retry config to grpcReader and grpcWriter types so that they can be propagated to operations as needed. Ensure that the configured values are used for all retry calls, and that UserProject is consistently passed through as well.
  • Loading branch information
tritone committed Sep 28, 2022
1 parent 6ae9c67 commit 43dbbd7
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
45 changes: 34 additions & 11 deletions storage/grpc_client.go
Expand Up @@ -943,6 +943,7 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
// Store the content from the first Recv in the
// client buffer for reading later.
leftovers: msg.GetChecksummedData().GetContent(),
settings: s,
},
}

Expand All @@ -964,13 +965,19 @@ func (c *grpcStorageClient) NewRangeReader(ctx context.Context, params *newRange
}

func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error) {
s := callSettings(c.settings, opts...)

var offset int64
errorf := params.setError
progress := params.progress
setObj := params.setObj

pr, pw := io.Pipe()
gw := newGRPCWriter(c, params, pr)
gw.settings = s
if s.userProject != "" {
gw.ctx = setUserProjectMetadata(gw.ctx, s.userProject)
}

// This function reads the data sent to the pipe and sends sets of messages
// on the gRPC client-stream as the buffer is filled.
Expand Down Expand Up @@ -1315,6 +1322,7 @@ type gRPCReader struct {
reopen func(seen int64) (*readStreamResponse, context.CancelFunc, error)
leftovers []byte
cancel context.CancelFunc
settings *settings
}

// Read reads bytes into the user's buffer from an open gRPC stream.
Expand Down Expand Up @@ -1390,7 +1398,11 @@ func (r *gRPCReader) Close() error {
// an attempt to reopen the stream.
func (r *gRPCReader) recv() (*storagepb.ReadObjectResponse, error) {
msg, err := r.stream.Recv()
if err != nil && ShouldRetry(err) {
var shouldRetry = ShouldRetry
if r.settings.retry != nil {
shouldRetry = r.settings.retry.shouldRetry
}
if err != nil && shouldRetry(err) {
// This will "close" the existing stream and immediately attempt to
// reopen the stream, but will backoff if further attempts are necessary.
// Reopening the stream Recvs the first message, so if retrying is
Expand Down Expand Up @@ -1454,6 +1466,7 @@ type gRPCWriter struct {
attrs *ObjectAttrs
conds *Conditions
encryptionKey []byte
settings *settings

sendCRC32C bool

Expand All @@ -1471,21 +1484,27 @@ func (w *gRPCWriter) startResumableUpload() error {
if err != nil {
return err
}
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
})

w.upid = upres.GetUploadId()
return err
return run(w.ctx, func() error {
upres, err := w.c.raw.StartResumableWrite(w.ctx, &storagepb.StartResumableWriteRequest{
WriteObjectSpec: spec,
})
w.upid = upres.GetUploadId()
return err
}, w.settings.retry, w.settings.idempotent, setRetryHeaderGRPC(w.ctx))
}

// queryProgress is a helper that queries the status of the resumable upload
// associated with the given upload ID.
func (w *gRPCWriter) queryProgress() (int64, error) {
q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid})
var persistedSize int64
err := run(w.ctx, func() error {
q, err := w.c.raw.QueryWriteStatus(w.ctx, &storagepb.QueryWriteStatusRequest{UploadId: w.upid})
persistedSize = q.GetPersistedSize()
return err
}, w.settings.retry, true, setRetryHeaderGRPC(w.ctx))

// q.GetCommittedSize() will return 0 if q is nil.
return q.GetPersistedSize(), err
return persistedSize, err
}

// uploadBuffer opens a Write stream and uploads the buffer at the given offset (if
Expand All @@ -1500,6 +1519,10 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
var err error
var finishWrite bool
var sent, limit int = 0, maxPerMessageWriteSize
var shouldRetry = ShouldRetry
if w.settings.retry != nil {
shouldRetry = w.settings.retry.shouldRetry
}
offset := start
toWrite := w.buf[:recvd]
for {
Expand Down Expand Up @@ -1570,7 +1593,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if ShouldRetry(err) {
if shouldRetry(err) {
sent = 0
finishWrite = false
// TODO: Add test case for failure modes of querying progress.
Expand Down Expand Up @@ -1601,7 +1624,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
// resend the entire buffer via a new stream.
// If not retriable, falling through will return the error received
// from closing the stream.
if ShouldRetry(err) {
if shouldRetry(err) {
sent = 0
finishWrite = false
offset, err = w.determineOffset(start)
Expand Down
3 changes: 1 addition & 2 deletions storage/http_client.go
Expand Up @@ -1033,9 +1033,8 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage
// there is no need to add retries here.

// Retry only when the operation is idempotent or the retry policy is RetryAlways.
isIdempotent := params.conds != nil && (params.conds.GenerationMatch >= 0 || params.conds.DoesNotExist == true)
var useRetry bool
if (s.retry == nil || s.retry.policy == RetryIdempotent) && isIdempotent {
if (s.retry == nil || s.retry.policy == RetryIdempotent) && s.idempotent {
useRetry = true
} else if s.retry != nil && s.retry.policy == RetryAlways {
useRetry = true
Expand Down

0 comments on commit 43dbbd7

Please sign in to comment.