Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gensupport): per-chunk deadline configs #1414

Merged
merged 4 commits into from Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 21 additions & 2 deletions googleapi/googleapi.go
Expand Up @@ -15,6 +15,7 @@ import (
"net/http"
"net/url"
"strings"
"time"

"google.golang.org/api/internal/third_party/uritemplates"
)
Expand Down Expand Up @@ -245,12 +246,30 @@ func ChunkSize(size int) MediaOption {
return chunkSizeOption(size)
}

type chunkRetryDeadlineOption time.Duration

func (cd chunkRetryDeadlineOption) setOptions(o *MediaOptions) {
o.ChunkRetryDeadline = time.Duration(cd)
}

// ChunkRetryDeadline returns a MediaOption which sets a per-chunk retry
// deadline. If a single chunk has been attempting to upload for longer than
// this time and the request fails, it will no longer be retried, and the error
// will be returned to the caller.
// This is only applicable for files which are large enough to require
// a multi-chunk resumable upload.
// The default value is 32s.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has 32s at per chunk been the default for a while now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 3ish years: 4fac4de

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, and there was a default of 16s prior to that apparently.

// To set a deadline on the entire upload, use context timeout or cancellation.
func ChunkRetryDeadline(deadline time.Duration) MediaOption {
codyoss marked this conversation as resolved.
Show resolved Hide resolved
return chunkRetryDeadlineOption(deadline)
}

// MediaOptions stores options for customizing media upload. It is not used by developers directly.
type MediaOptions struct {
ContentType string
ForceEmptyContentType bool

ChunkSize int
ChunkSize int
ChunkRetryDeadline time.Duration
}

// ProcessMediaOptions stores options from opts in a MediaOptions.
Expand Down
16 changes: 10 additions & 6 deletions internal/gensupport/media.go
Expand Up @@ -15,6 +15,7 @@ import (
"net/textproto"
"strings"
"sync"
"time"

"google.golang.org/api/googleapi"
)
Expand Down Expand Up @@ -217,12 +218,13 @@ func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer
// code only.
type MediaInfo struct {
// At most one of Media and MediaBuffer will be set.
media io.Reader
buffer *MediaBuffer
singleChunk bool
mType string
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
progressUpdater googleapi.ProgressUpdater
media io.Reader
buffer *MediaBuffer
singleChunk bool
mType string
size int64 // mediaSize, if known. Used only for calls to progressUpdater_.
progressUpdater googleapi.ProgressUpdater
chunkRetryDeadline time.Duration
}

// NewInfoFromMedia should be invoked from the Media method of a call. It returns a
Expand All @@ -234,6 +236,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
if !opts.ForceEmptyContentType {
r, mi.mType = DetermineContentType(r, opts.ContentType)
}
mi.chunkRetryDeadline = opts.ChunkRetryDeadline
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
return mi
}
Expand Down Expand Up @@ -356,6 +359,7 @@ func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
mi.progressUpdater(curr, mi.size)
}
},
ChunkRetryDeadline: mi.chunkRetryDeadline,
}
}

Expand Down
38 changes: 37 additions & 1 deletion internal/gensupport/media_test.go
Expand Up @@ -14,6 +14,7 @@ import (
"reflect"
"strings"
"testing"
"time"

"google.golang.org/api/googleapi"
)
Expand Down Expand Up @@ -155,6 +156,7 @@ func TestNewInfoFromMedia(t *testing.T) {
opts []googleapi.MediaOption
wantType string
wantMedia, wantBuffer, wantSingleChunk bool
wantDeadline time.Duration
}{
{
desc: "an empty reader results in a MediaBuffer with a single, empty chunk",
Expand All @@ -172,6 +174,15 @@ func TestNewInfoFromMedia(t *testing.T) {
wantBuffer: true,
wantSingleChunk: true,
},
{
desc: "ChunkRetryDeadline is observed",
r: new(bytes.Buffer),
opts: []googleapi.MediaOption{googleapi.ChunkRetryDeadline(time.Second)},
wantType: textType,
wantBuffer: true,
wantSingleChunk: true,
wantDeadline: time.Second,
},
{
desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk",
r: strings.NewReader("12345"),
Expand Down Expand Up @@ -220,6 +231,9 @@ func TestNewInfoFromMedia(t *testing.T) {
if got, want := mi.singleChunk, test.wantSingleChunk; got != want {
t.Errorf("%s: singleChunk: got %t, want %t", test.desc, got, want)
}
if got, want := mi.chunkRetryDeadline, test.wantDeadline; got != want {
t.Errorf("%s: chunkRetryDeadline: got %v, want %v", test.desc, got, want)
}
}
}

Expand Down Expand Up @@ -341,6 +355,7 @@ func TestResumableUpload(t *testing.T) {
chunkSize int
wantUploadType string
wantResumableUpload bool
chunkRetryDeadline time.Duration
}{
{
desc: "chunk size of zero: don't use a MediaBuffer; upload as a single chunk",
Expand Down Expand Up @@ -372,14 +387,35 @@ func TestResumableUpload(t *testing.T) {
wantUploadType: "resumable",
wantResumableUpload: true,
},
{
desc: "confirm that ChunkRetryDeadline is carried to ResumableUpload",
r: &nullReader{2 * googleapi.MinUploadChunkSize},
chunkSize: 1,
wantUploadType: "resumable",
wantResumableUpload: true,
chunkRetryDeadline: 1 * time.Second,
},
} {
mi := NewInfoFromMedia(test.r, []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)})
opts := []googleapi.MediaOption{googleapi.ChunkSize(test.chunkSize)}
if test.chunkRetryDeadline != 0 {
opts = append(opts, googleapi.ChunkRetryDeadline(test.chunkRetryDeadline))
}
mi := NewInfoFromMedia(test.r, opts)
if got, want := mi.UploadType(), test.wantUploadType; got != want {
t.Errorf("%s: upload type: got %q, want %q", test.desc, got, want)
}
if got, want := mi.ResumableUpload("") != nil, test.wantResumableUpload; got != want {
t.Errorf("%s: resumable upload non-nil: got %t, want %t", test.desc, got, want)
}
if test.chunkRetryDeadline != 0 {
if got := mi.ResumableUpload(""); got != nil {
if got.ChunkRetryDeadline != test.chunkRetryDeadline {
t.Errorf("%s: ChunkRetryDeadline: got %v, want %v", test.desc, got.ChunkRetryDeadline, test.chunkRetryDeadline)
}
} else {
t.Errorf("%s: test case invalid; resumable upload is nil", test.desc)
}
}
}
}

Expand Down
12 changes: 12 additions & 0 deletions internal/gensupport/resumable.go
Expand Up @@ -34,6 +34,10 @@ type ResumableUpload struct {

// Retry optionally configures retries for requests made against the upload.
Retry *RetryConfig

// ChunkRetryDeadline configures the per-chunk deadline after which no further
// retries should happen.
ChunkRetryDeadline time.Duration
}

// Progress returns the number of bytes uploaded at this point.
Expand Down Expand Up @@ -156,6 +160,14 @@ func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err
// Configure retryable error criteria.
errorFunc := rx.Retry.errorFunc()

// Configure per-chunk retry deadline.
var retryDeadline time.Duration
if rx.ChunkRetryDeadline != 0 {
retryDeadline = rx.ChunkRetryDeadline
} else {
retryDeadline = defaultRetryDeadline
}

// Send all chunks.
for {
var pause time.Duration
Expand Down
22 changes: 13 additions & 9 deletions internal/gensupport/resumable_test.go
Expand Up @@ -305,6 +305,9 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
)
media := strings.NewReader(strings.Repeat("a", mediaSize))

// This transport returns multiple errors on both the first chunk and third
// chunk of the upload. If the timeout were not reset between chunks, the
// errors on the third chunk would not retry and cause a failure.
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
Expand All @@ -320,24 +323,25 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 90-179/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
{"bytes 180-269/*", http.StatusServiceUnavailable},
// cum: 1s sleep on later chunk
{"bytes 180-269/*", http.StatusServiceUnavailable},
// cum: 2s sleep on later chunk
{"bytes 180-269/*", 308},
// cum: 1s sleep <-- resets because it's a new chunk
// cum: 3s sleep <-- resets because it's a new chunk
{"bytes 270-299/300", 200},
},
bodies: bodyTracker{},
}

rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
Client: &http.Client{Transport: tr},
Media: NewMediaBuffer(media, chunkSize),
MediaType: "text/plain",
Callback: func(int64) {},
ChunkRetryDeadline: 5 * time.Second,
}

oldRetryDeadline := retryDeadline
retryDeadline = 5 * time.Second
defer func() { retryDeadline = oldRetryDeadline }()

oldBackoff := backoff
backoff = func() Backoff { return new(PauseOneSecond) }
defer func() { backoff = oldBackoff }()
Expand Down
4 changes: 2 additions & 2 deletions internal/gensupport/retry.go
Expand Up @@ -20,8 +20,8 @@ type Backoff interface {

// These are declared as global variables so that tests can overwrite them.
var (
// Per-chunk deadline for resumable uploads.
retryDeadline = 32 * time.Second
// Default per-chunk deadline for resumable uploads.
defaultRetryDeadline = 32 * time.Second
// Default backoff timer.
backoff = func() Backoff {
return &gax.Backoff{Initial: 100 * time.Millisecond}
Expand Down