Skip to content

Commit

Permalink
Add trailing header checksums (#1705)
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost committed Oct 17, 2022
1 parent 7d71d94 commit bc82699
Show file tree
Hide file tree
Showing 12 changed files with 583 additions and 62 deletions.
71 changes: 44 additions & 27 deletions api-put-object-multipart.go
Expand Up @@ -159,8 +159,9 @@ func (c *Client) putObjectMultipartNoStream(ctx context.Context, bucketName, obj
crcBytes = append(crcBytes, cSum...)
}

p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, sha256Hex: sha256Hex, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
// Proceed to upload the part.
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, sha256Hex, int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down Expand Up @@ -269,57 +270,73 @@ func (c *Client) initiateMultipartUpload(ctx context.Context, bucketName, object
return initiateMultipartUploadResult, nil
}

type uploadPartParams struct {
bucketName string
objectName string
uploadID string
reader io.Reader
partNumber int
md5Base64 string
sha256Hex string
size int64
sse encrypt.ServerSide
streamSha256 bool
customHeader http.Header
trailer http.Header
}

// uploadPart - Uploads a part in a multipart upload.
func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName string, uploadID string, reader io.Reader, partNumber int, md5Base64 string, sha256Hex string, size int64, sse encrypt.ServerSide, streamSha256 bool, customHeader http.Header) (ObjectPart, error) {
func (c *Client) uploadPart(ctx context.Context, p uploadPartParams) (ObjectPart, error) {
// Input validation.
if err := s3utils.CheckValidBucketName(bucketName); err != nil {
if err := s3utils.CheckValidBucketName(p.bucketName); err != nil {
return ObjectPart{}, err
}
if err := s3utils.CheckValidObjectName(objectName); err != nil {
if err := s3utils.CheckValidObjectName(p.objectName); err != nil {
return ObjectPart{}, err
}
if size > maxPartSize {
return ObjectPart{}, errEntityTooLarge(size, maxPartSize, bucketName, objectName)
if p.size > maxPartSize {
return ObjectPart{}, errEntityTooLarge(p.size, maxPartSize, p.bucketName, p.objectName)
}
if size <= -1 {
return ObjectPart{}, errEntityTooSmall(size, bucketName, objectName)
if p.size <= -1 {
return ObjectPart{}, errEntityTooSmall(p.size, p.bucketName, p.objectName)
}
if partNumber <= 0 {
if p.partNumber <= 0 {
return ObjectPart{}, errInvalidArgument("Part number cannot be negative or equal to zero.")
}
if uploadID == "" {
if p.uploadID == "" {
return ObjectPart{}, errInvalidArgument("UploadID cannot be empty.")
}

// Get resources properly escaped and lined up before using them in http request.
urlValues := make(url.Values)
// Set part number.
urlValues.Set("partNumber", strconv.Itoa(partNumber))
urlValues.Set("partNumber", strconv.Itoa(p.partNumber))
// Set upload id.
urlValues.Set("uploadId", uploadID)
urlValues.Set("uploadId", p.uploadID)

// Set encryption headers, if any.
if customHeader == nil {
customHeader = make(http.Header)
if p.customHeader == nil {
p.customHeader = make(http.Header)
}
// https://docs.aws.amazon.com/AmazonS3/latest/API/mpUploadUploadPart.html
// Server-side encryption is supported by the S3 Multipart Upload actions.
// Unless you are using a customer-provided encryption key, you don't need
// to specify the encryption parameters in each UploadPart request.
if sse != nil && sse.Type() == encrypt.SSEC {
sse.Marshal(customHeader)
if p.sse != nil && p.sse.Type() == encrypt.SSEC {
p.sse.Marshal(p.customHeader)
}

reqMetadata := requestMetadata{
bucketName: bucketName,
objectName: objectName,
bucketName: p.bucketName,
objectName: p.objectName,
queryValues: urlValues,
customHeader: customHeader,
contentBody: reader,
contentLength: size,
contentMD5Base64: md5Base64,
contentSHA256Hex: sha256Hex,
streamSha256: streamSha256,
customHeader: p.customHeader,
contentBody: p.reader,
contentLength: p.size,
contentMD5Base64: p.md5Base64,
contentSHA256Hex: p.sha256Hex,
streamSha256: p.streamSha256,
trailer: p.trailer,
}

// Execute PUT on each part.
Expand All @@ -330,7 +347,7 @@ func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName s
}
if resp != nil {
if resp.StatusCode != http.StatusOK {
return ObjectPart{}, httpRespToErrorResponse(resp, bucketName, objectName)
return ObjectPart{}, httpRespToErrorResponse(resp, p.bucketName, p.objectName)
}
}
// Once successfully uploaded, return completed part.
Expand All @@ -341,8 +358,8 @@ func (c *Client) uploadPart(ctx context.Context, bucketName string, objectName s
ChecksumSHA1: h.Get("x-amz-checksum-sha1"),
ChecksumSHA256: h.Get("x-amz-checksum-sha256"),
}
objPart.Size = size
objPart.PartNumber = partNumber
objPart.Size = p.size
objPart.PartNumber = p.partNumber
// Trim off the odd double quotes from ETag in the beginning and end.
objPart.ETag = trimEtag(h.Get("ETag"))
return objPart, nil
Expand Down
55 changes: 49 additions & 6 deletions api-put-object-streaming.go
Expand Up @@ -107,11 +107,19 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
return UploadInfo{}, err
}

withChecksum := c.trailingHeaderSupport
if withChecksum {
if opts.UserMetadata == nil {
opts.UserMetadata = make(map[string]string, 1)
}
opts.UserMetadata["X-Amz-Checksum-Algorithm"] = "CRC32C"
}
// Initiate a new multipart upload.
uploadID, err := c.newUploadID(ctx, bucketName, objectName, opts)
if err != nil {
return UploadInfo{}, err
}
delete(opts.UserMetadata, "X-Amz-Checksum-Algorithm")

// Aborts the multipart upload in progress, if the
// function returns any error, since we do not resume
Expand Down Expand Up @@ -177,14 +185,33 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// As a special case if partNumber is lastPartNumber, we
// calculate the offset based on the last part size.
if uploadReq.PartNum == lastPartNumber {
readOffset = (size - lastPartSize)
readOffset = size - lastPartSize
partSize = lastPartSize
}

sectionReader := newHook(io.NewSectionReader(reader, readOffset, partSize), opts.Progress)
var trailer = make(http.Header, 1)
if withChecksum {
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
sectionReader = newHashReaderWrapper(sectionReader, crc, func(hash []byte) {
trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
})
}

// Proceed to upload the part.
objPart, err := c.uploadPart(ctx, bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, "", "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, nil)
p := uploadPartParams{bucketName: bucketName,
objectName: objectName,
uploadID: uploadID,
reader: sectionReader,
partNumber: uploadReq.PartNum,
size: partSize,
sse: opts.ServerSideEncryption,
streamSha256: !opts.DisableContentSha256,
sha256Hex: "",
trailer: trailer,
}
objPart, err := c.uploadPart(ctx, p)
if err != nil {
uploadedPartsCh <- uploadedPartRes{
Error: err,
Expand Down Expand Up @@ -221,8 +248,12 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Update the totalUploadedSize.
totalUploadedSize += uploadRes.Size
complMultipartUpload.Parts = append(complMultipartUpload.Parts, CompletePart{
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ETag: uploadRes.Part.ETag,
PartNumber: uploadRes.Part.PartNumber,
ChecksumCRC32: uploadRes.Part.ChecksumCRC32,
ChecksumCRC32C: uploadRes.Part.ChecksumCRC32C,
ChecksumSHA1: uploadRes.Part.ChecksumSHA1,
ChecksumSHA256: uploadRes.Part.ChecksumSHA256,
})
}
}
Expand All @@ -235,6 +266,18 @@ func (c *Client) putObjectMultipartStreamFromReadAt(ctx context.Context, bucketN
// Sort all completed parts.
sort.Sort(completedParts(complMultipartUpload.Parts))

if withChecksum {
// Add hash of hashes.
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
for _, part := range complMultipartUpload.Parts {
cs, err := base64.StdEncoding.DecodeString(part.ChecksumCRC32C)
if err == nil {
crc.Write(cs)
}
}
opts.UserMetadata = map[string]string{"X-Amz-Checksum-Crc32c": base64.StdEncoding.EncodeToString(crc.Sum(nil))}
}

uploadInfo, err := c.completeMultipartUpload(ctx, bucketName, objectName, uploadID, complMultipartUpload, PutObjectOptions{})
if err != nil {
return UploadInfo{}, err
Expand Down Expand Up @@ -339,8 +382,8 @@ func (c *Client) putObjectMultipartStreamOptionalChecksum(ctx context.Context, b
// Update progress reader appropriately to the latest offset
// as we read from the source.
hooked := newHook(bytes.NewReader(buf[:length]), opts.Progress)

objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, hooked, partNumber, md5Base64, "", partSize, opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: hooked, partNumber: partNumber, md5Base64: md5Base64, size: partSize, sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down
6 changes: 5 additions & 1 deletion api-put-object.go
Expand Up @@ -269,6 +269,9 @@ func (c *Client) putObjectCommon(ctx context.Context, bucketName, objectName str
}

if size < 0 {
if opts.DisableMultipart {
return UploadInfo{}, errors.New("no length provided and multipart disabled")
}
return c.putObjectMultipartStreamNoLength(ctx, bucketName, objectName, reader, opts)
}

Expand Down Expand Up @@ -366,7 +369,8 @@ func (c *Client) putObjectMultipartStreamNoLength(ctx context.Context, bucketNam
rd := newHook(bytes.NewReader(buf[:length]), opts.Progress)

// Proceed to upload the part.
objPart, uerr := c.uploadPart(ctx, bucketName, objectName, uploadID, rd, partNumber, md5Base64, "", int64(length), opts.ServerSideEncryption, !opts.DisableContentSha256, customHeader)
p := uploadPartParams{bucketName: bucketName, objectName: objectName, uploadID: uploadID, reader: rd, partNumber: partNumber, md5Base64: md5Base64, size: int64(length), sse: opts.ServerSideEncryption, streamSha256: !opts.DisableContentSha256, customHeader: customHeader}
objPart, uerr := c.uploadPart(ctx, p)
if uerr != nil {
return UploadInfo{}, uerr
}
Expand Down
8 changes: 4 additions & 4 deletions api-s3-datatypes.go
Expand Up @@ -323,10 +323,10 @@ type CompletePart struct {
ETag string

// Checksum values
ChecksumCRC32 string
ChecksumCRC32C string
ChecksumSHA1 string
ChecksumSHA256 string
ChecksumCRC32 string `xml:"ChecksumCRC32,omitempty"`
ChecksumCRC32C string `xml:"ChecksumCRC32C,omitempty"`
ChecksumSHA1 string `xml:"ChecksumSHA1,omitempty"`
ChecksumSHA256 string `xml:"ChecksumSHA256,omitempty"`
}

// completeMultipartUpload container for completing multipart upload.
Expand Down
44 changes: 39 additions & 5 deletions api.go
Expand Up @@ -20,8 +20,10 @@ package minio
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"hash/crc32"
"io"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -93,6 +95,8 @@ type Client struct {
sha256Hasher func() md5simd.Hasher

healthStatus int32

trailingHeaderSupport bool
}

// Options for New method
Expand All @@ -103,6 +107,10 @@ type Options struct {
Region string
BucketLookup BucketLookupType

// TrailingHeaders indicates server support of trailing headers.
// Only supported for v4 signatures.
TrailingHeaders bool

// Custom hash routines. Leave nil to use standard.
CustomMD5 func() md5simd.Hasher
CustomSHA256 func() md5simd.Hasher
Expand Down Expand Up @@ -246,6 +254,9 @@ func privateNew(endpoint string, opts *Options) (*Client, error) {
if clnt.sha256Hasher == nil {
clnt.sha256Hasher = newSHA256Hasher
}

clnt.trailingHeaderSupport = opts.TrailingHeaders && clnt.overrideSignerType.IsV4()

// Sets bucket lookup style, whether server accepts DNS or Path lookup. Default is Auto - determined
// by the SDK. When Auto is specified, DNS lookup is used for Amazon/Google cloud endpoints and Path for all other endpoints.
clnt.lookup = opts.BucketLookup
Expand Down Expand Up @@ -419,6 +430,8 @@ type requestMetadata struct {
contentMD5Base64 string // carries base64 encoded md5sum
contentSHA256Hex string // carries hex encoded sha256sum
streamSha256 bool
addCrc bool
trailer http.Header // (http.Request).Trailer. Requires v4 signature.
}

// dumpHTTP - dump HTTP request and response.
Expand Down Expand Up @@ -581,6 +594,17 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
}
}

if metadata.addCrc {
if metadata.trailer == nil {
metadata.trailer = make(http.Header, 1)
}
crc := crc32.New(crc32.MakeTable(crc32.Castagnoli))
metadata.contentBody = newHashReaderWrapper(metadata.contentBody, crc, func(hash []byte) {
// Update trailer when done.
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(hash))
})
metadata.trailer.Set("x-amz-checksum-crc32c", base64.StdEncoding.EncodeToString(crc.Sum(nil)))
}
// Instantiate a new request.
var req *http.Request
req, err = c.newRequest(ctx, method, metadata)
Expand All @@ -592,6 +616,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ

return nil, err
}

// Initiate the request.
res, err = c.do(req)
if err != nil {
Expand Down Expand Up @@ -632,7 +657,7 @@ func (c *Client) executeMethod(ctx context.Context, method string, metadata requ
// code dictates invalid region, we can retry the request
// with the new region.
//
// Additionally we should only retry if bucketLocation and custom
// Additionally, we should only retry if bucketLocation and custom
// region is empty.
if c.region == "" {
switch errResponse.Code {
Expand Down Expand Up @@ -814,21 +839,30 @@ func (c *Client) newRequest(ctx context.Context, method string, metadata request
// Add signature version '2' authorization header.
req = signer.SignV2(*req, accessKeyID, secretAccessKey, isVirtualHost)
case metadata.streamSha256 && !c.secure:
// Streaming signature is used by default for a PUT object request. Additionally we also
// look if the initialized client is secure, if yes then we don't need to perform
// streaming signature.
if len(metadata.trailer) > 0 {
req.Trailer = metadata.trailer
}
// Streaming signature is used by default for a PUT object request.
// Additionally, we also look if the initialized client is secure,
// if yes then we don't need to perform streaming signature.
req = signer.StreamingSignV4(req, accessKeyID,
secretAccessKey, sessionToken, location, metadata.contentLength, time.Now().UTC())
default:
// Set sha256 sum for signature calculation only with signature version '4'.
shaHeader := unsignedPayload
if metadata.contentSHA256Hex != "" {
shaHeader = metadata.contentSHA256Hex
if len(metadata.trailer) > 0 {
// Sanity check, we should not end up here if upstream is sane.
return nil, errors.New("internal error: contentSHA256Hex with trailer not supported")
}
} else if len(metadata.trailer) > 0 {
shaHeader = unsignedPayloadTrailer
}
req.Header.Set("X-Amz-Content-Sha256", shaHeader)

// Add signature version '4' authorization header.
req = signer.SignV4(*req, accessKeyID, secretAccessKey, sessionToken, location)
req = signer.SignV4Trailer(*req, accessKeyID, secretAccessKey, sessionToken, location, metadata.trailer)
}

// Return request.
Expand Down
4 changes: 4 additions & 0 deletions constants.go
Expand Up @@ -46,6 +46,10 @@ const maxMultipartPutObjectSize = 1024 * 1024 * 1024 * 1024 * 5
// we don't want to sign the request payload
const unsignedPayload = "UNSIGNED-PAYLOAD"

// unsignedPayloadTrailer value to be set to X-Amz-Content-Sha256 header when
// we don't want to sign the request payload, but have a trailer.
const unsignedPayloadTrailer = "STREAMING-UNSIGNED-PAYLOAD-TRAILER"

// Total number of parallel workers used for multipart operation.
const totalWorkers = 4

Expand Down

0 comments on commit bc82699

Please sign in to comment.