Skip to content

Commit

Permalink
blob: Add Upload and Download methods that may be more efficient for …
Browse files Browse the repository at this point in the history
…some drivers
  • Loading branch information
vangent committed Jun 13, 2023
1 parent 54c7124 commit 0c8428b
Show file tree
Hide file tree
Showing 14 changed files with 2,017 additions and 57 deletions.
74 changes: 48 additions & 26 deletions blob/azureblob/azureblob.go
Expand Up @@ -815,9 +815,17 @@ type writer struct {
client *blockblob.Client
uploadOpts *azblob.UploadStreamOptions

w *io.PipeWriter
donec chan struct{}
err error
// Ends of an io.Pipe, created when the first byte is written.
pw *io.PipeWriter
pr *io.PipeReader

// Alternatively, upload is set to true when Upload was
// used to upload data.
upload bool

donec chan struct{} // closed when done writing
// The following fields will be written before donec closes:
err error
}

// escapeKey does all required escaping for UTF-8 strings to work with Azure.
Expand Down Expand Up @@ -916,50 +924,64 @@ func (b *bucket) NewTypedWriter(ctx context.Context, key string, contentType str
}, nil
}

// Write appends p to w. User must call Close to close the w after done writing.
// Write appends p to w.pw. User must call Close to close the w after done writing.
func (w *writer) Write(p []byte) (int, error) {
// Avoid opening the pipe for a zero-length write;
// the concrete can do these for empty blobs.
if len(p) == 0 {
return 0, nil
}
if w.w == nil {
pr, pw := io.Pipe()
w.w = pw
if err := w.open(pr); err != nil {
return 0, err
}
if w.pw == nil {
// We'll write into pw and use pr as an io.Reader for the
// Upload call to Azure.
w.pr, w.pw = io.Pipe()
w.open(w.pr, true)
}
return w.w.Write(p)
return w.pw.Write(p)
}

// Upload reads from r. Per the driver, it is guaranteed to be the only
// write call for this writer.
func (w *writer) Upload(r io.Reader) error {
w.upload = true
w.open(r, false)
return nil
}

func (w *writer) open(pr *io.PipeReader) error {
// r may be nil if we're Closing and no data was written.
// If closePipeOnError is true, w.pr will be closed if there's an
// error uploading to Azure.
func (w *writer) open(r io.Reader, closePipeOnError bool) {
go func() {
defer close(w.donec)

var body io.Reader
if pr == nil {
body = http.NoBody
} else {
body = pr
if r == nil {
r = http.NoBody
}
_, w.err = w.client.UploadStream(w.ctx, body, w.uploadOpts)
_, w.err = w.client.UploadStream(w.ctx, r, w.uploadOpts)
if w.err != nil {
if pr != nil {
pr.CloseWithError(w.err)
if closePipeOnError {
w.pr.CloseWithError(w.err)
w.pr = nil
}
return
}
}()
return nil
}

// Close completes the writer and closes it. Any error occurring during write will
// be returned. If a writer is closed before any Write is called, Close will
// create an empty file at the given key.
func (w *writer) Close() error {
if w.w == nil {
w.open(nil)
} else if err := w.w.Close(); err != nil {
return err
if !w.upload {
if w.pr != nil {
defer w.pr.Close()
}
if w.pw == nil {
// We never got any bytes written. We'll write an http.NoBody.
w.open(nil, false)
} else if err := w.pw.Close(); err != nil {
return err
}
}
<-w.donec
return w.err
Expand Down
273 changes: 273 additions & 0 deletions blob/azureblob/testdata/TestConformance/TestUploadDownload.replay

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 0c8428b

Please sign in to comment.