Skip to content

Commit

Permalink
gensupport: Allow user to provide his own buffer used for uploading
Browse files Browse the repository at this point in the history
  • Loading branch information
zimnx committed Sep 1, 2020
1 parent d4bd3aa commit b307dab
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 15 deletions.
27 changes: 27 additions & 0 deletions googleapi/googleapi.go
Expand Up @@ -211,6 +211,31 @@ type MediaOption interface {
setOptions(o *MediaOptions)
}

type noopOption struct{}

func (bp noopOption) setOptions(o *MediaOptions) {
}

type bufferOption []byte

func (bp bufferOption) setOptions(o *MediaOptions) {
o.Buffer = bp
}

// WithBuffer returns MediaOption which sets buffer used for media uploads.
// Buffer capacity needs to be at least MinUploadChunkSize, if it's not
// this option is a no op.
// If used together with ChunkSize, buffer needs to have at least ChunkSize capacity.
// If not set, each upload will allocate its own memory.
// Buffer can be reused only after request complete. Using the same buffer
// in concurrent calls will lead to data race.
func WithBuffer(buffer []byte) MediaOption {
if cap(buffer) < MinUploadChunkSize {
return noopOption{}
}
return bufferOption(buffer)
}

type contentTypeOption string

func (ct contentTypeOption) setOptions(o *MediaOptions) {
Expand Down Expand Up @@ -251,6 +276,8 @@ type MediaOptions struct {
ForceEmptyContentType bool

ChunkSize int

Buffer []byte
}

// ProcessMediaOptions stores options from opts in a MediaOptions.
Expand Down
12 changes: 12 additions & 0 deletions internal/gensupport/buffer.go
Expand Up @@ -28,6 +28,18 @@ func NewMediaBuffer(media io.Reader, chunkSize int) *MediaBuffer {
return &MediaBuffer{media: media, chunk: make([]byte, 0, chunkSize)}
}

// NewMediaBuffer initializes a MediaBuffer.
func NewMediaBufferWithBuffer(media io.Reader, chunkSize int, buffer []byte) *MediaBuffer {
// If buffer isn't long enough, allocate new one.
if cap(buffer) < chunkSize {
return NewMediaBuffer(media, chunkSize)
}

// Implementation expects buffer of zero length.
buffer = buffer[:0]
return &MediaBuffer{media: media, chunk: buffer}
}

// Chunk returns the current buffered chunk, the offset in the underlying media
// from which the chunk is drawn, and the size of the chunk.
// Successive calls to Chunk return the same chunk between calls to Next.
Expand Down
11 changes: 8 additions & 3 deletions internal/gensupport/media.go
Expand Up @@ -200,11 +200,16 @@ func typeHeader(contentType string) textproto.MIMEHeader {
//
// After PrepareUpload has been called, media should no longer be used: the
// media content should be accessed via one of the return values.
func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
func PrepareUpload(media io.Reader, chunkSize int, buffer []byte) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
if chunkSize == 0 { // do not chunk
return media, nil, true
}
mb = NewMediaBuffer(media, chunkSize)
if buffer != nil {
mb = NewMediaBufferWithBuffer(media, chunkSize, buffer)
} else {
mb = NewMediaBuffer(media, chunkSize)
}

_, _, _, err := mb.Chunk()
// If err is io.EOF, we can upload this in a single request. Otherwise, err is
// either nil or a non-EOF error. If it is the latter, then the next call to
Expand Down Expand Up @@ -234,7 +239,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
if !opts.ForceEmptyContentType {
r, mi.mType = DetermineContentType(r, opts.ContentType)
}
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize, opts.Buffer)
return mi
}

Expand Down
41 changes: 29 additions & 12 deletions internal/gensupport/media_test.go
Expand Up @@ -149,27 +149,29 @@ func TestDetermineContentType(t *testing.T) {

func TestNewInfoFromMedia(t *testing.T) {
const textType = "text/plain; charset=utf-8"
chunkBuffer := make([]byte, 0)
for _, test := range []struct {
desc string
r io.Reader
opts []googleapi.MediaOption
wantType string
wantMedia, wantBuffer, wantSingleChunk bool
desc string
r io.Reader
opts []googleapi.MediaOption
wantType string
wantChunkBuffer []byte
wantMedia, wantAnyBuffer, wantSingleChunk bool
}{
{
desc: "an empty reader results in a MediaBuffer with a single, empty chunk",
r: new(bytes.Buffer),
opts: nil,
wantType: textType,
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
desc: "ContentType is observed",
r: new(bytes.Buffer),
opts: []googleapi.MediaOption{googleapi.ContentType("xyz")},
wantType: "xyz",
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
Expand All @@ -185,15 +187,15 @@ func TestNewInfoFromMedia(t *testing.T) {
r: strings.NewReader("12345"),
opts: []googleapi.MediaOption{googleapi.ChunkSize(100)},
wantType: textType,
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
desc: "chunk size == data size: MediaBuffer with single chunk",
r: &nullReader{googleapi.MinUploadChunkSize},
opts: []googleapi.MediaOption{googleapi.ChunkSize(1)},
wantType: "application/octet-stream",
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: true,
},
{
Expand All @@ -202,7 +204,15 @@ func TestNewInfoFromMedia(t *testing.T) {
r: &nullReader{2 * googleapi.MinUploadChunkSize},
opts: []googleapi.MediaOption{googleapi.ChunkSize(1)},
wantType: "application/octet-stream",
wantBuffer: true,
wantAnyBuffer: true,
wantSingleChunk: false,
},
{
desc: "WithBuffer is observed",
r: new(bytes.Buffer),
opts: []googleapi.MediaOption{googleapi.WithBuffer(chunkBuffer)},
wantType: textType,
wantChunkBuffer: chunkBuffer,
wantSingleChunk: false,
},
} {
Expand All @@ -214,8 +224,15 @@ func TestNewInfoFromMedia(t *testing.T) {
if got, want := (mi.media != nil), test.wantMedia; got != want {
t.Errorf("%s: media non-nil: got %t, want %t", test.desc, got, want)
}
if got, want := (mi.buffer != nil), test.wantBuffer; got != want {
t.Errorf("%s: buffer non-nil: got %t, want %t", test.desc, got, want)
if test.wantAnyBuffer {
if got, want := (mi.buffer != nil), test.wantAnyBuffer; got != want {
t.Errorf("%s: buffer non-nil: got %t, want %t", test.desc, got, want)
}
}
if test.wantChunkBuffer != nil {
if got, want := reflect.ValueOf(mi.buffer.chunk).Pointer(), reflect.ValueOf(test.wantChunkBuffer).Pointer(); got != want {
t.Errorf("%s: chunk buffer: got %v, want %v", test.desc, got, want)
}
}
if got, want := mi.singleChunk, test.wantSingleChunk; got != want {
t.Errorf("%s: singleChunk: got %t, want %t", test.desc, got, want)
Expand Down
46 changes: 46 additions & 0 deletions internal/gensupport/resumable_test.go
Expand Up @@ -364,3 +364,49 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
}
}
}

func TestResumableUploadWithPredefinedBuffer(t *testing.T) {
const (
mediaSize = 256
)
media := strings.Repeat("a", mediaSize)
tr := &interruptibleTransport{
buf: make([]byte, 0, mediaSize),
events: []event{
{"bytes 0-255/*", 200},
},
bodies: bodyTracker{},
}
buffer := make([]byte, 0, mediaSize)
rx := &ResumableUpload{
Client: &http.Client{Transport: tr},
Media: NewMediaBufferWithBuffer(strings.NewReader(media), mediaSize, buffer),
MediaType: "text/plain",
Callback: func(int64) {},
}

res, err := rx.Upload(context.Background())
if err == nil {
res.Body.Close()
}
if err != nil || res == nil || res.StatusCode != http.StatusOK {
if res == nil {
t.Fatalf("Upload not successful, res=nil: %v", err)
} else {
t.Fatalf("Upload not successful, statusCode=%v, err=%v", res.StatusCode, err)
}
}
if !reflect.DeepEqual(tr.buf, []byte(media)) {
t.Fatalf("transferred contents:\ngot %s\nwant %s", tr.buf, []byte(media))
}
// Media fits in single chunk, input buffer should have media content inside.
if !reflect.DeepEqual(buffer[:cap(buffer)], []byte(media)) {
t.Fatalf("buffer contents:\ngot %v\nwant %v", buffer, []byte(media))
}
if len(tr.events) > 0 {
t.Fatalf("did not observe all expected events. leftover events: %v", tr.events)
}
if len(tr.bodies) > 0 {
t.Errorf("unclosed request bodies: %v", tr.bodies)
}
}

0 comments on commit b307dab

Please sign in to comment.