Skip to content

Commit

Permalink
feat(gensupport): per-chunk deadline configs
Browse files Browse the repository at this point in the history
Allow users to configure the per-chunk deadline for retries
that's used during resumable uploads.

Needs to be exposed via the manual layer for storage.

Fixes googleapis#685
  • Loading branch information
tritone committed Feb 1, 2022
1 parent a584462 commit f21e27d
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 9 deletions.
21 changes: 20 additions & 1 deletion googleapi/googleapi.go
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"google.golang.org/api/internal/third_party/uritemplates"
)
Expand Down Expand Up @@ -245,12 +246,30 @@ func ChunkSize(size int) MediaOption {
return chunkSizeOption(size)
}

type chunkRetryDeadlineOption time.Duration

func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) {
o.ChunkRetryDeadline = time.Duration(cd)
}

// ChunkRetryDeadline returns a MediaOption which sets a per-chunk retry
// deadline. If a single chunk has been attempting to upload for longer than
// this time and the request fails, it will no longer be retried, and the error
// will be returned to the caller.
// This is only applicable for files which are large enough to require
// a multi-chunk resumable upload.
// The default value is 32s.
// To set a deadline on the entire upload, use context timeout or cancellation.
func ChunkRetryDeadline(deadline time.Duration) MediaOption {
return chunkRetryDeadlineOption(deadline)
}

// MediaOptions stores options for customizing media upload. It is not used by developers directly.
type MediaOptions struct {
ContentType string
ForceEmptyContentType bool

ChunkSize int
ChunkRetryDeadline time.Duration
}

// ProcessMediaOptions stores options from opts in a MediaOptions.
Expand Down
4 changes: 4 additions & 0 deletions internal/gensupport/media.go
Expand Up @@ -15,6 +15,7 @@ import (
"net/textproto"
"strings"
"sync"
"time"

"google.golang.org/api/googleapi"
)
Expand Down Expand Up @@ -223,6 +224,7 @@ type MediaInfo struct {
mType string
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
progressUpdater googleapi.ProgressUpdater
chunkRetryDeadline time.Duration
}

// NewInfoFromMedia should be invoked from the Media method of a call. It returns a
Expand All @@ -234,6 +236,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
if !opts.ForceEmptyContentType {
r, mi.mType = DetermineContentType(r, opts.ContentType)
}
mi.chunkRetryDeadline = opts.ChunkRetryDeadline
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
return mi
}
Expand Down Expand Up @@ -356,6 +359,7 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
mi.progressUpdater(curr, mi.size)
}
},
ChunkRetryDeadline: mi.chunkRetryDeadline,
}
}

Expand Down
38 changes: 37 additions & 1 deletion internal/gensupport/media_test.go
Expand Up @@ -14,6 +14,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"google.golang.org/api/googleapi"
)
Expand Down Expand Up @@ -155,6 +156,7 @@ func TestNewInfoFromMedia(t *testing.T) {
opts []googleapi.MediaOption
wantType string
wantMedia, wantBuffer, wantSingleChunk bool
wantDeadline time.Duration
}{
{
desc: "an empty reader results in a MediaBuffer with a single, empty chunk",
Expand All @@ -172,6 +174,15 @@ func TestNewInfoFromMedia(t *testing.T) {
wantBuffer: true,
wantSingleChunk: true,
},
{
desc: "ChunkRetryDeadline is observed",
r: new(bytes.Buffer),
opts: []googleapi.MediaOption{googleapi.ChunkRetryDeadline(time.Second)},
wantType: textType,
wantBuffer: true,
wantSingleChunk: true,
wantDeadline: time.Second,
},
{
desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk",
r: strings.NewReader("12345"),
Expand Down Expand Up @@ -220,6 +231,9 @@ func TestNewInfoFromMedia(t *testing.T) {
if got, want := mi.singleChunk, test.wantSingleChunk; got != want {
t.Errorf("%s: singleChunk: got %t, want %t", test.desc, got, want)
}
if got, want := mi.chunkRetryDeadline, test.wantDeadline; got != want {
t.Errorf("%s: chunkRetryDeadline: got %v, want %v", test.desc, got, want)
}
}
}

Expand Down Expand Up @@ -341,6 +355,7 @@ func TestResumableUpload(t *testing.T) {
chunkSize int
wantUploadType string
wantResumableUpload bool
chunkRetryDeadline time.Duration
}{
{
desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk",
Expand Down Expand Up @@ -372,14 +387,35 @@ func TestResumableUpload(t *testing.T) {
wantUploadType: "resumable",
wantResumableUpload: true,
},
{
desc: "confirm that ChunkRetryDeadline is carried to ResumableUpload",
r: &nullReader{2 * googleapi.MinUploadChunkSize},
chunkSize: 1,
wantUploadType: "resumable",
wantResumableUpload: true,
chunkRetryDeadline: 1 * time.Second,
},
} {
mi := NewInfoFromMedia(test.r, []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)})
opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)}
if test.chunkRetryDeadline != 0 {
opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline))
}
mi := NewInfoFromMedia(test.r, opts)
if got, want := mi.UploadType(), test.wantUploadType; got != want {
t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want)
}
if got, want := mi.ResumableUpload("") != nil, test.wantResumableUpload; got != want {
t.Errorf("%s: resumable upload non-nil: got %t, want %t", test.desc, got, want)
}
if test.chunkRetryDeadline != 0 {
if got := mi.ResumableUpload(""); got != nil {
if got.ChunkRetryDeadline != test.chunkRetryDeadline {
t.Errorf("%s: ChunkRetryDeadline: got %v, want %v", test.desc, got.ChunkRetryDeadline, test.chunkRetryDeadline)
}
} else {
t.Errorf("%s: test case invalid; resumable upload is nil", test.desc)
}
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions internal/gensupport/resumable.go
Expand Up @@ -34,6 +34,10 @@ type ResumableUpload struct {

// Retry optionally configures retries for requests made against the upload.
Retry *RetryConfig

// ChunkRetryDeadline configures the per-chunk deadline after which no further
// retries should happen.
ChunkRetryDeadline time.Duration
}

// Progress returns the number of bytes uploaded at this point.
Expand Down Expand Up @@ -156,6 +160,14 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
// Configure retryable error criteria.
errorFunc := rx.Retry.errorFunc()

// Configure per-chunk retry deadline.
var retryDeadline time.Duration
if rx.ChunkRetryDeadline != 0 {
retryDeadline = rx.ChunkRetryDeadline
} else {
retryDeadline = defaultRetryDeadline
}

// Send all chunks.
for {
var pause time.Duration
Expand Down
14 changes: 9 additions & 5 deletions internal/gensupport/resumable_test.go
Expand Up @@ -305,6 +305,9 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
)
media := strings.NewReader(strings.Repeat("a", mediaSize))

// This transport returns multiple errors on both the first chunk and third
// chunk of the upload. If the timeout were not reset between chunks, the
// errors on the third chunk would not retry and cause a failure.
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
Expand All @@ -320,8 +323,12 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 90-179/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 180-269/*", http.StatusServiceUnavailable},
// cum: 1s sleep on later chunk
{"bytes 180-269/*", http.StatusServiceUnavailable},
// cum: 2s sleep on later chunk
{"bytes 180-269/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
// cum: 3s sleep <-- resets because it's a new chunk
{"bytes 270-299/300", 200},
},
bodies: bodyTracker{},
Expand All @@ -332,12 +339,9 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
ChunkRetryDeadline: 5 * time.Second,
}

oldRetryDeadline := retryDeadline
retryDeadline = 5 * time.Second
defer func() { retryDeadline = oldRetryDeadline }()

oldBackoff := backoff
backoff = func() Backoff { return new(PauseOneSecond) }
defer func() { backoff = oldBackoff }()
Expand Down
4 changes: 2 additions & 2 deletions internal/gensupport/retry.go
Expand Up @@ -20,8 +20,8 @@ type Backoff interface {

// These are declared as global variables so that tests can overwrite them.
var (
// Per-chunk deadline for resumable uploads.
retryDeadline = 32 * time.Second
// Default per-chunk deadline for resumable uploads.
defaultRetryDeadline = 32 * time.Second
// Default backoff timer.
backoff = func() Backoff {
return &gax.Backoff{Initial: 100 * time.Millisecond}
Expand Down

0 comments on commit f21e27d

Please sign in to comment.