Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add parallel buffered multipart upload #1745

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
208 changes: 207 additions & 1 deletion api-put-object-streaming.go
Expand Up @@ -28,6 +28,7 @@ import (
"net/url"
"sort"
"strings"
"sync"

"github.com/google/uuid"
"github.com/minio/minio-go/v7/pkg/s3utils"
Expand All @@ -44,7 +45,9 @@ import (
func (c *Client) putObjectMultipartStream(ctx context.Context, bucketName, objectName string,
reader io.Reader, size int64, opts PutObjectOptions,
) (info UploadInfo, err error) {
if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
info, err = c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
} else if !isObject(reader) && isReadAt(reader) && !opts.SendContentMd5 {
// Verify if the reader implements ReadAt and it is not a *minio.Object then we will use parallel uploader.
info, err = c.putObjectMultipartStreamFromReadAt(ctx, bucketName, objectName, reader.(io.ReaderAt), size, opts)
} else {
Expand Down Expand Up @@ -446,6 +449,209 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
return uploadInfo, nil
}

// putObjectMultipartStreamParallel uploads opts.NumThreads parts in parallel.
// This is expected to take opts.PartSize * opts.NumThreads * (GOGC / 100) bytes of buffer.
func (c *Client) putObjectMultipartStreamParallel(ctx context.Context, bucketName, objectName string,
reader io.Reader, opts PutObjectOptions) (info UploadInfo, err error) {
// Input validation.
if err = s3utils.CheckValidBucketName(bucketName); err != nil {
return UploadInfo{}, err
}

if err = s3utils.CheckValidObjectName(objectName); err != nil {
return UploadInfo{}, err
}

if !opts.SendContentMd5 {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}

// Cancel all when an error occurs.
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, _, err := OptimalPartInfo(-1, opts.PartSize)
if err != nil {
return UploadInfo{}, err
}

// Initiates a new multipart request
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

// Aborts the multipart upload if the function returns
// any error, since we do not resume we should purge
// the parts which have been uploaded to relinquish
// storage space.
defer func() {
if err != nil {
c.abortMultipartUpload(ctx, bucketName, objectName, uploadID)
}
}()

// Create checksums
// CRC32C is ~50% faster on AMD64 @ 30GB/s
var crcBytes []byte
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
md5Hash := c.md5Hasher()
defer md5Hash.Close()

// Total data read and written to server. should be equal to 'size' at the end of the call.
var totalUploadedSize int64

// Initialize parts uploaded map.
partsInfo := make(map[int]ObjectPart)

// Create a buffer.
nBuffers := int64(opts.NumThreads)
bufs := make(chan []byte, nBuffers)
all := make([]byte, nBuffers*partSize)
for i := int64(0); i < nBuffers; i++ {
bufs <- all[i*partSize : i*partSize+partSize]
}

var wg sync.WaitGroup
var mu sync.Mutex
errCh := make(chan error, opts.NumThreads)

reader = newHook(reader, opts.Progress)

// Part number always starts with '1'.
var partNumber int
for partNumber = 1; partNumber <= totalPartsCount; partNumber++ {
// Proceed to upload the part.
var buf []byte
select {
case buf = <-bufs:
case err = <-errCh:
cancel()
wg.Wait()
return UploadInfo{}, err
}

if int64(len(buf)) != partSize {
return UploadInfo{}, fmt.Errorf("read buffer < %d than expected partSize: %d", len(buf), partSize)
}

length, rerr := readFull(reader, buf)
if rerr == io.EOF && partNumber > 1 {
// Done
break
}

if rerr != nil && rerr != io.ErrUnexpectedEOF && err != io.EOF {
cancel()
wg.Wait()
return UploadInfo{}, rerr
}

// Calculate md5sum.
customHeader := make(http.Header)
if !opts.SendContentMd5 {
// Add CRC32C instead.
crc.Reset()
crc.Write(buf[:length])
cSum := crc.Sum(nil)
customHeader.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(cSum))
crcBytes = append(crcBytes, cSum...)
}

wg.Add(1)
go func(partNumber int) {
// Avoid declaring variables in the for loop
var md5Base64 string

if opts.SendContentMd5 {
md5Hash.Reset()
md5Hash.Write(buf[:length])
md5Base64 = base64.StdEncoding.EncodeToString(md5Hash.Sum(nil))
}

defer wg.Done()
p := uploadPartParams{
bucketName: bucketName,
objectName: objectName,
uploadID: uploadID,
reader: bytes.NewReader(buf[:length]),
partNumber: partNumber,
md5Base64: md5Base64,
size: int64(length),
sse: opts.ServerSideEncryption,
streamSha256: !opts.DisableContentSha256,
customHeader: customHeader,
}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
errCh <- uerr
}

// Save successfully uploaded part metadata.
mu.Lock()
partsInfo[partNumber] = objPart
mu.Unlock()

// Send buffer back so it can be reused.
bufs <- buf
}(partNumber)

// Save successfully uploaded size.
totalUploadedSize += int64(length)
}
wg.Wait()

// Collect any error
select {
case err = <-errCh:
return UploadInfo{}, err
default:
}

// Complete multipart upload.
var complMultipartUpload completeMultipartUpload

// Loop over total uploaded parts to save them in
// Parts array before completing the multipart request.
for i := 1; i < partNumber; i++ {
part, ok := partsInfo[i]
if !ok {
return UploadInfo{}, errInvalidArgument(fmt.Sprintf("Missing part number %d", i))
}
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: part.ETag,
PartNumber: part.PartNumber,
ChecksumCRC32: part.ChecksumCRC32,
ChecksumCRC32C: part.ChecksumCRC32C,
ChecksumSHA1: part.ChecksumSHA1,
ChecksumSHA256: part.ChecksumSHA256,
})
}

// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

opts = PutObjectOptions{}
if len(crcBytes) > 0 {
// Add hash of hashes.
crc.Reset()
crc.Write(crcBytes)
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}
uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, opts)
if err != nil {
return UploadInfo{}, err
}

uploadInfo.Size = totalUploadedSize
return uploadInfo, nil
}

// putObject special function used Google Cloud Storage. This special function
// is used for Google Cloud Storage since Google's multipart API is not S3 compatible.
func (c *Client) putObject(ctx context.Context, bucketName, objectName string, reader io.Reader, size int64, opts PutObjectOptions) (info UploadInfo, err error) {
Expand Down
10 changes: 9 additions & 1 deletion api-put-object.go
Expand Up @@ -87,7 +87,12 @@ type PutObjectOptions struct {
SendContentMd5 bool
DisableContentSha256 bool
DisableMultipart bool
Internal AdvancedPutOptions

// ConcurrentStreamParts will create NumThreads buffers of PartSize bytes,
// fill them serially and upload them in parallel.
// This can be used for faster uploads on non-seekable or slow-to-seek input.
ConcurrentStreamParts bool
Internal AdvancedPutOptions
}

// getNumThreads - gets the number of threads to be used in the multipart
Expand Down Expand Up @@ -272,6 +277,9 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
if opts.DisableMultipart {
return UploadInfo{}, errors.New("no length provided and multipart disabled")
}
if opts.ConcurrentStreamParts && opts.NumThreads > 1 {
return c.putObjectMultipartStreamParallel(ctx, bucketName, objectName, reader, opts)
}
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}

Expand Down