From b307dab300e2ff4e0ddaa7cc5864602accbc2e4e Mon Sep 17 00:00:00 2001 From: Maciej Zimnoch Date: Thu, 20 Aug 2020 17:17:15 +0200 Subject: [PATCH] gensupport: Allow user to provide his own buffer used for uploading --- googleapi/googleapi.go | 27 ++++++++++++++++ internal/gensupport/buffer.go | 12 +++++++ internal/gensupport/media.go | 11 +++++-- internal/gensupport/media_test.go | 41 +++++++++++++++++------- internal/gensupport/resumable_test.go | 46 +++++++++++++++++++++++++++ 5 files changed, 122 insertions(+), 15 deletions(-) diff --git a/googleapi/googleapi.go b/googleapi/googleapi.go index d1784f1a349..985f5d1feb5 100644 --- a/googleapi/googleapi.go +++ b/googleapi/googleapi.go @@ -211,6 +211,31 @@ type MediaOption interface { setOptions(o *MediaOptions) } +type noopOption struct{} + +func (bp noopOption) setOptions(o *MediaOptions) { +} + +type bufferOption []byte + +func (bp bufferOption) setOptions(o *MediaOptions) { + o.Buffer = bp +} + +// WithBuffer returns MediaOption which sets buffer used for media uploads. +// Buffer capacity needs to be at least MinUploadChunkSize, if it's not +// this option is a no op. +// If used together with ChunkSize, buffer needs to have at least ChunkSize capacity. +// If not set, each upload will allocate its own memory. +// Buffer can be reused only after request complete. Using the same buffer +// in concurrent calls will lead to data race. +func WithBuffer(buffer []byte) MediaOption { + if cap(buffer) < MinUploadChunkSize { + return noopOption{} + } + return bufferOption(buffer) +} + type contentTypeOption string func (ct contentTypeOption) setOptions(o *MediaOptions) { @@ -251,6 +276,8 @@ type MediaOptions struct { ForceEmptyContentType bool ChunkSize int + + Buffer []byte } // ProcessMediaOptions stores options from opts in a MediaOptions. diff --git a/internal/gensupport/buffer.go b/internal/gensupport/buffer.go index 3d0817ede98..8c060cf0bf4 100644 --- a/internal/gensupport/buffer.go +++ b/internal/gensupport/buffer.go @@ -28,6 +28,18 @@ func NewMediaBuffer(media io.Reader, chunkSize int) *MediaBuffer { return &MediaBuffer{media: media, chunk: make([]byte, 0, chunkSize)} } +// NewMediaBuffer initializes a MediaBuffer. +func NewMediaBufferWithBuffer(media io.Reader, chunkSize int, buffer []byte) *MediaBuffer { + // If buffer isn't long enough, allocate new one. + if cap(buffer) < chunkSize { + return NewMediaBuffer(media, chunkSize) + } + + // Implementation expects buffer of zero length. + buffer = buffer[:0] + return &MediaBuffer{media: media, chunk: buffer} +} + // Chunk returns the current buffered chunk, the offset in the underlying media // from which the chunk is drawn, and the size of the chunk. // Successive calls to Chunk return the same chunk between calls to Next. diff --git a/internal/gensupport/media.go b/internal/gensupport/media.go index 0460ab59406..e34ef3740ff 100644 --- a/internal/gensupport/media.go +++ b/internal/gensupport/media.go @@ -200,11 +200,16 @@ func typeHeader(contentType string) textproto.MIMEHeader { // // After PrepareUpload has been called, media should no longer be used: the // media content should be accessed via one of the return values. -func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer, singleChunk bool) { +func PrepareUpload(media io.Reader, chunkSize int, buffer []byte) (r io.Reader, mb *MediaBuffer, singleChunk bool) { if chunkSize == 0 { // do not chunk return media, nil, true } - mb = NewMediaBuffer(media, chunkSize) + if buffer != nil { + mb = NewMediaBufferWithBuffer(media, chunkSize, buffer) + } else { + mb = NewMediaBuffer(media, chunkSize) + } + _, _, _, err := mb.Chunk() // If err is io.EOF, we can upload this in a single request. Otherwise, err is // either nil or a non-EOF error. If it is the latter, then the next call to @@ -234,7 +239,7 @@ func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo { if !opts.ForceEmptyContentType { r, mi.mType = DetermineContentType(r, opts.ContentType) } - mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize) + mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize, opts.Buffer) return mi } diff --git a/internal/gensupport/media_test.go b/internal/gensupport/media_test.go index ee4f6d7ace4..f6ffbec7c22 100644 --- a/internal/gensupport/media_test.go +++ b/internal/gensupport/media_test.go @@ -149,19 +149,21 @@ func TestDetermineContentType(t *testing.T) { func TestNewInfoFromMedia(t *testing.T) { const textType = "text/plain; charset=utf-8" + chunkBuffer := make([]byte, 0) for _, test := range []struct { - desc string - r io.Reader - opts []googleapi.MediaOption - wantType string - wantMedia, wantBuffer, wantSingleChunk bool + desc string + r io.Reader + opts []googleapi.MediaOption + wantType string + wantChunkBuffer []byte + wantMedia, wantAnyBuffer, wantSingleChunk bool }{ { desc: "an empty reader results in a MediaBuffer with a single, empty chunk", r: new(bytes.Buffer), opts: nil, wantType: textType, - wantBuffer: true, + wantAnyBuffer: true, wantSingleChunk: true, }, { @@ -169,7 +171,7 @@ func TestNewInfoFromMedia(t *testing.T) { r: new(bytes.Buffer), opts: []googleapi.MediaOption{googleapi.ContentType("xyz")}, wantType: "xyz", - wantBuffer: true, + wantAnyBuffer: true, wantSingleChunk: true, }, { @@ -185,7 +187,7 @@ func TestNewInfoFromMedia(t *testing.T) { r: strings.NewReader("12345"), opts: []googleapi.MediaOption{googleapi.ChunkSize(100)}, wantType: textType, - wantBuffer: true, + wantAnyBuffer: true, wantSingleChunk: true, }, { @@ -193,7 +195,7 @@ func TestNewInfoFromMedia(t *testing.T) { r: &nullReader{googleapi.MinUploadChunkSize}, opts: []googleapi.MediaOption{googleapi.ChunkSize(1)}, wantType: "application/octet-stream", - wantBuffer: true, + wantAnyBuffer: true, wantSingleChunk: true, }, { @@ -202,7 +204,15 @@ func TestNewInfoFromMedia(t *testing.T) { r: &nullReader{2 * googleapi.MinUploadChunkSize}, opts: []googleapi.MediaOption{googleapi.ChunkSize(1)}, wantType: "application/octet-stream", - wantBuffer: true, + wantAnyBuffer: true, + wantSingleChunk: false, + }, + { + desc: "WithBuffer is observed", + r: new(bytes.Buffer), + opts: []googleapi.MediaOption{googleapi.WithBuffer(chunkBuffer)}, + wantType: textType, + wantChunkBuffer: chunkBuffer, wantSingleChunk: false, }, } { @@ -214,8 +224,15 @@ func TestNewInfoFromMedia(t *testing.T) { if got, want := (mi.media != nil), test.wantMedia; got != want { t.Errorf("%s: media non-nil: got %t, want %t", test.desc, got, want) } - if got, want := (mi.buffer != nil), test.wantBuffer; got != want { - t.Errorf("%s: buffer non-nil: got %t, want %t", test.desc, got, want) + if test.wantAnyBuffer { + if got, want := (mi.buffer != nil), test.wantAnyBuffer; got != want { + t.Errorf("%s: buffer non-nil: got %t, want %t", test.desc, got, want) + } + } + if test.wantChunkBuffer != nil { + if got, want := reflect.ValueOf(mi.buffer.chunk).Pointer(), reflect.ValueOf(test.wantChunkBuffer).Pointer(); got != want { + t.Errorf("%s: chunk buffer: got %v, want %v", test.desc, got, want) + } } if got, want := mi.singleChunk, test.wantSingleChunk; got != want { t.Errorf("%s: singleChunk: got %t, want %t", test.desc, got, want) diff --git a/internal/gensupport/resumable_test.go b/internal/gensupport/resumable_test.go index 44544f7d71c..d86cf8f0e51 100644 --- a/internal/gensupport/resumable_test.go +++ b/internal/gensupport/resumable_test.go @@ -364,3 +364,49 @@ func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) { } } } + +func TestResumableUploadWithPredefinedBuffer(t *testing.T) { + const ( + mediaSize = 256 + ) + media := strings.Repeat("a", mediaSize) + tr := &interruptibleTransport{ + buf: make([]byte, 0, mediaSize), + events: []event{ + {"bytes 0-255/*", 200}, + }, + bodies: bodyTracker{}, + } + buffer := make([]byte, 0, mediaSize) + rx := &ResumableUpload{ + Client: &http.Client{Transport: tr}, + Media: NewMediaBufferWithBuffer(strings.NewReader(media), mediaSize, buffer), + MediaType: "text/plain", + Callback: func(int64) {}, + } + + res, err := rx.Upload(context.Background()) + if err == nil { + res.Body.Close() + } + if err != nil || res == nil || res.StatusCode != http.StatusOK { + if res == nil { + t.Fatalf("Upload not successful, res=nil: %v", err) + } else { + t.Fatalf("Upload not successful, statusCode=%v, err=%v", res.StatusCode, err) + } + } + if !reflect.DeepEqual(tr.buf, []byte(media)) { + t.Fatalf("transferred contents:\ngot %s\nwant %s", tr.buf, []byte(media)) + } + // Media fits in single chunk, input buffer should have media content inside. + if !reflect.DeepEqual(buffer[:cap(buffer)], []byte(media)) { + t.Fatalf("buffer contents:\ngot %v\nwant %v", buffer, []byte(media)) + } + if len(tr.events) > 0 { + t.Fatalf("did not observe all expected events. leftover events: %v", tr.events) + } + if len(tr.bodies) > 0 { + t.Errorf("unclosed request bodies: %v", tr.bodies) + } +}