Skip to content

Commit

Permalink
Merge pull request #4066 from milosgajdos/optimise-s3-push
Browse files Browse the repository at this point in the history
Optimise push in S3 driver
  • Loading branch information
milosgajdos committed Sep 29, 2023
2 parents 23083ac + 4fce3c0 commit 735c161
Showing 1 changed file with 157 additions and 72 deletions.
229 changes: 157 additions & 72 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -48,6 +49,7 @@ const driverName = "s3aws"
const minChunkSize = 5 << 20

// maxChunkSize defines the maximum multipart upload chunk size allowed by S3.
// S3 API requires max upload chunk to be 5GB.
const maxChunkSize = 5 << 30

const defaultChunkSize = 2 * minChunkSize
Expand Down Expand Up @@ -166,6 +168,7 @@ type driver struct {
RootDirectory string
StorageClass string
ObjectACL string
pool *sync.Pool
}

type baseEmbed struct {
Expand Down Expand Up @@ -580,6 +583,13 @@ func New(params DriverParameters) (*Driver, error) {
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
pool: &sync.Pool{
New: func() interface{} {
return &buffer{
data: make([]byte, 0, params.ChunkSize),
}
},
},
}

return &Driver{
Expand Down Expand Up @@ -1251,21 +1261,64 @@ func (d *driver) getStorageClass() *string {
return aws.String(d.StorageClass)
}

// buffer is a static size bytes buffer.
type buffer struct {
data []byte
}

// NewBuffer returns a new bytes buffer from driver's memory pool.
// The size of the buffer is static and set to params.ChunkSize.
func (d *driver) NewBuffer() *buffer {
return d.pool.Get().(*buffer)
}

// ReadFrom reads as much data as it can fit in from r without growing its size.
// It returns the number of bytes successfully read from r or error.
func (b *buffer) ReadFrom(r io.Reader) (offset int64, err error) {
for len(b.data) < cap(b.data) && err == nil {
var n int
n, err = r.Read(b.data[len(b.data):cap(b.data)])
offset += int64(n)
b.data = b.data[:len(b.data)+n]
}
// NOTE(milosgajdos): io.ReaderFrom "swallows" io.EOF
// See: https://pkg.go.dev/io#ReaderFrom
if err == io.EOF {
err = nil
}
return offset, err
}

// Cap returns the capacity of the buffer's underlying byte slice.
func (b *buffer) Cap() int {
return cap(b.data)
}

// Len returns the length of the data in the buffer
func (b *buffer) Len() int {
return len(b.data)
}

// Clear the buffer data.
func (b *buffer) Clear() {
b.data = b.data[:0]
}

// writer attempts to upload parts to S3 in a buffered fashion where the last
// part is at least as large as the chunksize, so the multipart upload could be
// cleanly resumed in the future. This is violated if Close is called after less
// than a full chunk is written.
type writer struct {
driver *driver
key string
uploadID string
parts []*s3.Part
size int64
readyPart []byte
pendingPart []byte
closed bool
committed bool
cancelled bool
driver *driver
key string
uploadID string
parts []*s3.Part
size int64
ready *buffer
pending *buffer
closed bool
committed bool
cancelled bool
}

func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver.FileWriter {
Expand All @@ -1279,6 +1332,8 @@ func (d *driver) newWriter(key, uploadID string, parts []*s3.Part) storagedriver
uploadID: uploadID,
parts: parts,
size: size,
ready: d.NewBuffer(),
pending: d.NewBuffer(),
}
}

Expand All @@ -1300,12 +1355,12 @@ func (w *writer) Write(p []byte) (int, error) {
// If the last written part is smaller than minChunkSize, we need to make a
// new multipart upload :sadface:
if len(w.parts) > 0 && int(*w.parts[len(w.parts)-1].Size) < minChunkSize {
var completedUploadedParts completedParts
for _, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
completedUploadedParts := make(completedParts, len(w.parts))
for i, part := range w.parts {
completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
}

sort.Sort(completedUploadedParts)
Expand All @@ -1319,11 +1374,13 @@ func (w *writer) Write(p []byte) (int, error) {
},
})
if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
}); aErr != nil {
return 0, errors.Join(err, aErr)
}
return 0, err
}

Expand Down Expand Up @@ -1351,11 +1408,18 @@ func (w *writer) Write(p []byte) (int, error) {
return 0, err
}
defer resp.Body.Close()

// reset uploaded parts
w.parts = nil
w.readyPart, err = io.ReadAll(resp.Body)
w.ready.Clear()

n, err := w.ready.ReadFrom(resp.Body)
if err != nil {
return 0, err
}
if resp.ContentLength != nil && n < *resp.ContentLength {
return 0, io.ErrShortBuffer
}
} else {
// Otherwise we can use the old file as the new first part
copyPartResp, err := w.driver.S3.UploadPartCopy(&s3.UploadPartCopyInput{
Expand All @@ -1380,51 +1444,60 @@ func (w *writer) Write(p []byte) (int, error) {

var n int

for len(p) > 0 {
// If no parts are ready to write, fill up the first part
if neededBytes := int(w.driver.ChunkSize) - len(w.readyPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.readyPart = append(w.readyPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
} else {
w.readyPart = append(w.readyPart, p...)
n += len(p)
p = nil
}
defer func() { w.size += int64(n) }()

reader := bytes.NewReader(p)

for reader.Len() > 0 {
// NOTE(milosgajdos): we do some seemingly unsafe conversions
// from int64 to int in this for loop. These are fine as the
// offset returned from buffer.ReadFrom can only ever be
// maxChunkSize large which fits in to int. The reason why
// we return int64 is to play nice with Go interfaces where
// the buffer implements io.ReaderFrom interface.

// fill up the ready parts buffer
offset, err := w.ready.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}

if neededBytes := int(w.driver.ChunkSize) - len(w.pendingPart); neededBytes > 0 {
if len(p) >= neededBytes {
w.pendingPart = append(w.pendingPart, p[:neededBytes]...)
n += neededBytes
p = p[neededBytes:]
err := w.flushPart()
if err != nil {
w.size += int64(n)
return n, err
}
} else {
w.pendingPart = append(w.pendingPart, p...)
n += len(p)
p = nil
// try filling up the pending parts buffer
offset, err = w.pending.ReadFrom(reader)
n += int(offset)
if err != nil {
return n, err
}

// we filled up pending buffer, flush
if w.pending.Len() == w.pending.Cap() {
if err := w.flush(); err != nil {
return n, err
}
}
}
w.size += int64(n)

return n, nil
}

func (w *writer) Size() int64 {
return w.size
}

func (w *writer) Close() error {
if w.closed {
return fmt.Errorf("already closed")
}
w.closed = true
return w.flushPart()

defer func() {
w.ready.Clear()
w.driver.pool.Put(w.ready)
w.pending.Clear()
w.driver.pool.Put(w.pending)
}()

return w.flush()
}

func (w *writer) Cancel(ctx context.Context) error {
Expand All @@ -1450,25 +1523,28 @@ func (w *writer) Commit() error {
} else if w.cancelled {
return fmt.Errorf("already cancelled")
}
err := w.flushPart()

err := w.flush()
if err != nil {
return err
}

w.committed = true

completedUploadedParts := make(completedParts, 0, len(w.parts))
for _, part := range w.parts {
completedUploadedParts = append(completedUploadedParts, &s3.CompletedPart{
completedUploadedParts := make(completedParts, len(w.parts))
for i, part := range w.parts {
completedUploadedParts[i] = &s3.CompletedPart{
ETag: part.ETag,
PartNumber: part.PartNumber,
})
}
}

// This is an edge case when we are trying to upload an empty chunk of data using
// a MultiPart upload. As a result we are trying to complete the MultipartUpload
// with an empty slice of `completedUploadedParts` which will always lead to 400
// being returned from S3 See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload
// Solution: we upload an empty i.e. 0 byte part as a single part and then append it
// This is an edge case when we are trying to upload an empty file as part of
// the MultiPart upload. We get a PUT with Content-Length: 0 and sad things happen.
// The result is we are trying to Complete MultipartUpload with an empty list of
// completedUploadedParts which will always lead to 400 being returned from S3
// See: https://docs.aws.amazon.com/sdk-for-go/api/service/s3/#CompletedMultipartUpload
// Solution: we upload the empty i.e. 0 byte part as a single part and then append it
// to the completedUploadedParts slice used to complete the Multipart upload.
if len(w.parts) == 0 {
resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
Expand Down Expand Up @@ -1499,47 +1575,56 @@ func (w *writer) Commit() error {
},
})
if err != nil {
w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
if _, aErr := w.driver.S3.AbortMultipartUpload(&s3.AbortMultipartUploadInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
UploadId: aws.String(w.uploadID),
})
}); aErr != nil {
return errors.Join(err, aErr)
}
return err
}
return nil
}

// flushPart flushes buffers to write a part to S3.
// Only called by Write (with both buffers full) and Close/Commit (always)
func (w *writer) flushPart() error {
if len(w.readyPart) == 0 && len(w.pendingPart) == 0 {
// nothing to write
// flush flushes all buffers to write a part to S3.
// flush is only called by Write (with both buffers full) and Close/Commit (always)
func (w *writer) flush() error {
if w.ready.Len() == 0 && w.pending.Len() == 0 {
return nil
}
if w.driver.MultipartCombineSmallPart && len(w.pendingPart) < int(w.driver.ChunkSize) {
// closing with a small pending part
// combine ready and pending to avoid writing a small part
w.readyPart = append(w.readyPart, w.pendingPart...)
w.pendingPart = nil

buf := bytes.NewBuffer(w.ready.data)
if w.driver.MultipartCombineSmallPart && (w.pending.Len() > 0 && w.pending.Len() < int(w.driver.ChunkSize)) {
if _, err := buf.Write(w.pending.data); err != nil {
return err
}
w.pending.Clear()
}

partSize := buf.Len()
partNumber := aws.Int64(int64(len(w.parts) + 1))

resp, err := w.driver.S3.UploadPart(&s3.UploadPartInput{
Bucket: aws.String(w.driver.Bucket),
Key: aws.String(w.key),
PartNumber: partNumber,
UploadId: aws.String(w.uploadID),
Body: bytes.NewReader(w.readyPart),
Body: bytes.NewReader(buf.Bytes()),
})
if err != nil {
return err
}

w.parts = append(w.parts, &s3.Part{
ETag: resp.ETag,
PartNumber: partNumber,
Size: aws.Int64(int64(len(w.readyPart))),
Size: aws.Int64(int64(partSize)),
})
w.readyPart = w.pendingPart
w.pendingPart = nil

// reset the flushed buffer and swap buffers
w.ready.Clear()
w.ready, w.pending = w.pending, w.ready

return nil
}

0 comments on commit 735c161

Please sign in to comment.