Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trailing header checksums #1705

Merged
merged 7 commits into from Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 44 additions & 27 deletions api-put-object-multipart.go
Expand Up @@ -159,8 +159,9 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
crcBytes = append(crcBytes, cSum...)
}

p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
// Proceed to upload the part.
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down Expand Up @@ -269,57 +270,73 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, object
return initiateMultipartUploadResult, nil
}

type uploadPartParams struct {
bucketName string
objectName string
uploadID string
reader io.Reader
partNumber int
md5Base64 string
sha256Hex string
size int64
sse encrypt.ServerSide
streamSha256 bool
customHeader http.Header
trailer http.Header
}

// uploadPart - Uploads a part in a multipart upload.
func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName string, uploadID string, reader io.Reader, partNumber int, md5Base64 string, sha256Hex string, size int64, sse encrypt.ServerSide, streamSha256 bool, customHeader http.Header) (ObjectPart, error) {
func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
if err := s3utils.CheckValidBucketName(p.bucketName); err != nil {
return ObjectPart{}, err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
if err := s3utils.CheckValidObjectName(p.objectName); err != nil {
return ObjectPart{}, err
}
if size > maxPartSize {
return ObjectPart{}, errEntityTooLarge(size, maxPartSize, bucketName, objectName)
if p.size > maxPartSize {
return ObjectPart{}, errEntityTooLarge(p.size, maxPartSize, p.bucketName, p.objectName)
}
if size <= -1 {
return ObjectPart{}, errEntityTooSmall(size, bucketName, objectName)
if p.size <= -1 {
return ObjectPart{}, errEntityTooSmall(p.size, p.bucketName, p.objectName)
}
if partNumber <= 0 {
if p.partNumber <= 0 {
return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
}
if uploadID == "" {
if p.uploadID == "" {
return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
}

// Get resources properly escaped and lined up before using them in http request.
urlValues := make(url.Values)
// Set part number.
urlValues.Set("partNumber", strconv.Itoa(partNumber))
urlValues.Set("partNumber", strconv.Itoa(p.partNumber))
// Set upload id.
urlValues.Set("uploadId", uploadID)
urlValues.Set("uploadId", p.uploadID)

// Set encryption headers, if any.
if customHeader == nil {
customHeader = make(http.Header)
if p.customHeader == nil {
p.customHeader = make(http.Header)
}
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
// Server-side encryption is supported by the S3 Multipart Upload actions.
// Unless you are using a customer-provided encryption key, you don't need
// to specify the encryption parameters in each UploadPart request.
if sse != nil && sse.Type() == encrypt.SSEC {
sse.Marshal(customHeader)
if p.sse != nil && p.sse.Type() == encrypt.SSEC {
p.sse.Marshal(p.customHeader)
}

reqMetadata := requestMetadata{
bucketName: bucketName,
objectName: objectName,
bucketName: p.bucketName,
objectName: p.objectName,
queryValues: urlValues,
customHeader: customHeader,
contentBody: reader,
contentLength: size,
contentMD5Base64: md5Base64,
contentSHA256Hex: sha256Hex,
streamSha256: streamSha256,
customHeader: p.customHeader,
contentBody: p.reader,
contentLength: p.size,
contentMD5Base64: p.md5Base64,
contentSHA256Hex: p.sha256Hex,
streamSha256: p.streamSha256,
trailer: p.trailer,
}

// Execute PUT on each part.
Expand All @@ -330,7 +347,7 @@ func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName s
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
return ObjectPart{}, httpRespToErrorResponse(resp, p.bucketName, p.objectName)
}
}
// Once successfully uploaded, return completed part.
Expand All @@ -341,8 +358,8 @@ func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName s
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
}
objPart.Size = size
objPart.PartNumber = partNumber
objPart.Size = p.size
objPart.PartNumber = p.partNumber
// Trim off the odd double quotes from ETag in the beginning and end.
objPart.ETag = trimEtag(h.Get("ETag"))
return objPart, nil
Expand Down
55 changes: 49 additions & 6 deletions api-put-object-streaming.go
Expand Up @@ -107,11 +107,19 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
return UploadInfo{}, err
}

withChecksum := c.trailingHeaderSupport
if withChecksum {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}
// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

// Aborts the multipart upload in progress, if the
// function returns any error, since we do not resume
Expand Down Expand Up @@ -177,14 +185,33 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// As a special case if partNumber is lastPartNumber, we
// calculate the offset based on the last part size.
if uploadReq.PartNum == lastPartNumber {
readOffset = (size - lastPartSize)
readOffset = size - lastPartSize
partSize = lastPartSize
}

sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
var trailer = make(http.Header, 1)
if withChecksum {
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) {
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
})
}

// Proceed to upload the part.
objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, nil)
p := uploadPartParams{bucketName: bucketName,
objectName: objectName,
uploadID: uploadID,
reader: sectionReader,
partNumber: uploadReq.PartNum,
size: partSize,
sse: opts.ServerSideEncryption,
streamSha256: !opts.DisableContentSha256,
sha256Hex: "",
trailer: trailer,
}
objPart, err := c.uploadPart(ctx, p)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
Expand Down Expand Up @@ -221,8 +248,12 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Update the totalUploadedSize.
totalUploadedSize += uploadRes.Size
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
})
}
}
Expand All @@ -235,6 +266,18 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

if withChecksum {
// Add hash of hashes.
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
for _, part := range complMultipartUpload.Parts {
cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C)
if err == nil {
crc.Write(cs)
}
}
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -339,8 +382,8 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// Update progress reader appropriately to the latest offset
// as we read from the source.
hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)

objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, hooked, partNumber, md5Base64, "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down
6 changes: 5 additions & 1 deletion api-put-object.go
Expand Up @@ -269,6 +269,9 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
}

if size < 0 {
if opts.DisableMultipart {
return UploadInfo{}, errors.New("no length provided and multipart disabled")
}
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}

Expand Down Expand Up @@ -366,7 +369,8 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)

// Proceed to upload the part.
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, "", int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down
8 changes: 4 additions & 4 deletions api-s3-datatypes.go
Expand Up @@ -323,10 +323,10 @@ type CompletePart struct {
ETag string

// Checksum values
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC32 string `xml:"ChecksumCRC32,omitempty"`
ChecksumCRC32C string `xml:"ChecksumCRC32C,omitempty"`
ChecksumSHA1 string `xml:"ChecksumSHA1,omitempty"`
ChecksumSHA256 string `xml:"ChecksumSHA256,omitempty"`
}

// completeMultipartUpload container for completing multipart upload.
Expand Down
44 changes: 39 additions & 5 deletions api.go
Expand Up @@ -20,8 +20,10 @@ package minio
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -93,6 +95,8 @@ type Client struct {
sha256Hasher func() md5simd.Hasher

healthStatus int32

trailingHeaderSupport bool
}

// Options for New method
Expand All @@ -103,6 +107,10 @@ type Options struct {
Region string
BucketLookup BucketLookupType

// TrailingHeaders indicates server support of trailing headers.
// Only supported for v4 signatures.
TrailingHeaders bool

// Custom hash routines. Leave nil to use standard.
CustomMD5 func() md5simd.Hasher
CustomSHA256 func() md5simd.Hasher
Expand Down Expand Up @@ -246,6 +254,9 @@ func privateNew(endpoint string, opts *Options) (*Client, error) {
if clnt.sha256Hasher == nil {
clnt.sha256Hasher = newSHA256Hasher
}

clnt.trailingHeaderSupport = opts.TrailingHeaders && clnt.overrideSignerType.IsV4()

// Sets bucket lookup style, whether server accepts DNS or Path lookup. Default is Auto - determined
// by the SDK. When Auto is specified, DNS lookup is used for Amazon/Google cloud endpoints and Path for all other endpoints.
clnt.lookup = opts.BucketLookup
Expand Down Expand Up @@ -419,6 +430,8 @@ type requestMetadata struct {
contentMD5Base64 string // carries base64 encoded md5sum
contentSHA256Hex string // carries hex encoded sha256sum
streamSha256 bool
addCrc bool
trailer http.Header // (http.Request).Trailer. Requires v4 signature.
}

// dumpHTTP - dump HTTP request and response.
Expand Down Expand Up @@ -581,6 +594,17 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
}
}

if metadata.addCrc {
if metadata.trailer == nil {
metadata.trailer = make(http.Header, 1)
}
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
metadata.contentBody = newHashReaderWrapper(metadata.contentBody, crc, func(hash []byte) {
// Update trailer when done.
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
})
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
}
// Instantiate a new request.
var req *http.Request
req, err = c.newRequest(ctx, method, metadata)
Expand All @@ -592,6 +616,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ

return nil, err
}

// Initiate the request.
res, err = c.do(req)
if err != nil {
Expand Down Expand Up @@ -632,7 +657,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
// code dictates invalid region, we can retry the request
// with the new region.
//
// Additionally we should only retry if bucketLocation and custom
// Additionally, we should only retry if bucketLocation and custom
// region is empty.
if c.region == "" {
switch errResponse.Code {
Expand Down Expand Up @@ -814,21 +839,30 @@ func (c *Client) newRequest(ctx context.Context, method string, metadata request
// Add signature version '2' authorization header.
req = signer.SignV2(*req, accessKeyID, secretAccessKey, isVirtualHost)
case metadata.streamSha256 && !c.secure:
// Streaming signature is used by default for a PUT object request. Additionally we also
// look if the initialized client is secure, if yes then we don't need to perform
// streaming signature.
if len(metadata.trailer) > 0 {
req.Trailer = metadata.trailer
}
// Streaming signature is used by default for a PUT object request.
// Additionally, we also look if the initialized client is secure,
// if yes then we don't need to perform streaming signature.
req = signer.StreamingSignV4(req, accessKeyID,
secretAccessKey, sessionToken, location, metadata.contentLength, time.Now().UTC())
default:
// Set sha256 sum for signature calculation only with signature version '4'.
shaHeader := unsignedPayload
if metadata.contentSHA256Hex != "" {
shaHeader = metadata.contentSHA256Hex
if len(metadata.trailer) > 0 {
// Sanity check, we should not end up here if upstream is sane.
return nil, errors.New("internal error: contentSHA256Hex with trailer not supported")
}
} else if len(metadata.trailer) > 0 {
shaHeader = unsignedPayloadTrailer
}
req.Header.Set("X-Amz-Content-Sha256", shaHeader)

// Add signature version '4' authorization header.
req = signer.SignV4(*req, accessKeyID, secretAccessKey, sessionToken, location)
req = signer.SignV4Trailer(*req, accessKeyID, secretAccessKey, sessionToken, location, metadata.trailer)
}

// Return request.
Expand Down
4 changes: 4 additions & 0 deletions constants.go
Expand Up @@ -46,6 +46,10 @@ const maxMultipartPutObjectSize = 1024 * 1024 * 1024 * 1024 * 5
// we don't want to sign the request payload
const unsignedPayload = "UNSIGNED-PAYLOAD"

// unsignedPayloadTrailer value to be set to X-Amz-Content-Sha256 header when
// we don't want to sign the request payload, but have a trailer.
const unsignedPayloadTrailer = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"

// Total number of parallel workers used for multipart operation.
const totalWorkers = 4

Expand Down