diff --git a/copy/blob.go b/copy/blob.go new file mode 100644 index 000000000..4aea636c6 --- /dev/null +++ b/copy/blob.go @@ -0,0 +1,173 @@ +package copy + +import ( + "context" + "io" + + "github.com/containers/image/v5/internal/private" + compressiontypes "github.com/containers/image/v5/pkg/compression/types" + "github.com/containers/image/v5/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// copyBlobFromStream copies a blob with srcInfo (with known Digest and Annotations and possibly known Size) from srcReader to dest, +// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, +// perhaps (de/re/)compressing it if canModifyBlob, +// and returns a complete blobInfo of the copied blob. +func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo, + getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer, + canModifyBlob bool, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) { + if isConfig { // This is guaranteed by the caller, but set it here to be explicit. + canModifyBlob = false + } + + // The copying happens through a pipeline of connected io.Readers; + // that pipeline is built by updating stream. + // === Input: srcReader + stream := sourceStream{ + reader: srcReader, + info: srcInfo, + } + + // === Process input through digestingReader to validate against the expected digest. + // Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader, + // use a separate validation failure indicator. + // Note that for this check we don't use the stronger "validationSucceeded" indicator, because + // dest.PutBlob may detect that the layer already exists, in which case we don't + // read stream to the end, and validation does not happen. + digestingReader, err := newDigestingReader(stream.reader, srcInfo.Digest) + if err != nil { + return types.BlobInfo{}, errors.Wrapf(err, "preparing to verify blob %s", srcInfo.Digest) + } + stream.reader = digestingReader + + // === Update progress bars + stream.reader = bar.ProxyReader(stream.reader) + + // === Decrypt the stream, if required. + decryptionStep, err := c.blobPipelineDecryptionStep(&stream, srcInfo) + if err != nil { + return types.BlobInfo{}, err + } + + // === Detect compression of the input stream. + // This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression. + detectedCompression, err := blobPipelineDetectCompressionStep(&stream, srcInfo) + if err != nil { + return types.BlobInfo{}, err + } + + // === Send a copy of the original, uncompressed, stream, to a separate path if necessary. + var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. + if getOriginalLayerCopyWriter != nil { + stream.reader = io.TeeReader(stream.reader, getOriginalLayerCopyWriter(detectedCompression.decompressor)) + originalLayerReader = stream.reader + } + + // === Deal with layer compression/decompression if necessary + // WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists + // short-circuit conditions + compressionStep, err := c.blobPipelineCompressionStep(&stream, canModifyBlob, detectedCompression) + if err != nil { + return types.BlobInfo{}, err + } + defer compressionStep.close() + + // === Encrypt the stream for valid mediatypes if ociEncryptConfig provided + if decryptionStep.decrypting && toEncrypt { + // If nothing else, we can only set uploadedInfo.CryptoOperation to a single value. + // Before relaxing this, see the original pull request’s review if there are other reasons to reject this. + return types.BlobInfo{}, errors.New("Unable to support both decryption and encryption in the same copy") + } + encryptionStep, err := c.blobPipelineEncryptionStep(&stream, toEncrypt, srcInfo, decryptionStep) + if err != nil { + return types.BlobInfo{}, err + } + + // === Report progress using the c.progress channel, if required. + if c.progress != nil && c.progressInterval > 0 { + progressReader := newProgressReader( + stream.reader, + c.progress, + c.progressInterval, + srcInfo, + ) + defer progressReader.reportDone() + stream.reader = progressReader + } + + // === Finally, send the layer stream to dest. + options := private.PutBlobOptions{ + Cache: c.blobInfoCache, + IsConfig: isConfig, + EmptyLayer: emptyLayer, + } + if !isConfig { + options.LayerIndex = &layerIndex + } + uploadedInfo, err := c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{stream.reader}, stream.info, options) + if err != nil { + return types.BlobInfo{}, errors.Wrap(err, "writing blob") + } + + uploadedInfo.Annotations = stream.info.Annotations + + compressionStep.updateCompressionEdits(&uploadedInfo.CompressionOperation, &uploadedInfo.CompressionAlgorithm, &uploadedInfo.Annotations) + decryptionStep.updateCryptoOperation(&uploadedInfo.CryptoOperation) + if err := encryptionStep.updateCryptoOperationAndAnnotations(&uploadedInfo.CryptoOperation, &uploadedInfo.Annotations); err != nil { + return types.BlobInfo{}, err + } + + // This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consume + // all of the input (to compute DiffIDs), even if dest.PutBlob does not need it. + // So, read everything from originalLayerReader, which will cause the rest to be + // sent there if we are not already at EOF. + if getOriginalLayerCopyWriter != nil { + logrus.Debugf("Consuming rest of the original blob to satisfy getOriginalLayerCopyWriter") + _, err := io.Copy(io.Discard, originalLayerReader) + if err != nil { + return types.BlobInfo{}, errors.Wrapf(err, "reading input blob %s", srcInfo.Digest) + } + } + + if digestingReader.validationFailed { // Coverage: This should never happen. + return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, digest verification failed but was ignored", srcInfo.Digest) + } + if stream.info.Digest != "" && uploadedInfo.Digest != stream.info.Digest { + return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, stream.info.Digest, uploadedInfo.Digest) + } + if digestingReader.validationSucceeded { + if err := compressionStep.recordValidatedDigestData(c, uploadedInfo, srcInfo, encryptionStep, decryptionStep); err != nil { + return types.BlobInfo{}, err + } + } + + return uploadedInfo, nil +} + +// sourceStream encapsulates an input consumed by copyBlobFromStream, in progress of being built. +// This allows handles of individual aspects to build the copy pipeline without _too much_ +// specific cooperation by the caller. +// +// We are currently very far from a generalized plug-and-play API for building/consuming the pipeline +// without specific knowledge of various aspects in copyBlobFromStream; that may come one day. +type sourceStream struct { + reader io.Reader + info types.BlobInfo // corresponding to the data available in reader. +} + +// errorAnnotationReader wraps the io.Reader passed to PutBlob for annotating the error happened during read. +// These errors are reported as PutBlob errors, so we would otherwise misleadingly attribute them to the copy destination. +type errorAnnotationReader struct { + reader io.Reader +} + +// Read annotates the error happened during read +func (r errorAnnotationReader) Read(b []byte) (n int, err error) { + n, err = r.reader.Read(b) + if err != io.EOF { + return n, errors.Wrapf(err, "happened during read") + } + return n, err +} diff --git a/copy/compression.go b/copy/compression.go new file mode 100644 index 000000000..ddfe67982 --- /dev/null +++ b/copy/compression.go @@ -0,0 +1,312 @@ +package copy + +import ( + "io" + + internalblobinfocache "github.com/containers/image/v5/internal/blobinfocache" + "github.com/containers/image/v5/pkg/compression" + compressiontypes "github.com/containers/image/v5/pkg/compression/types" + "github.com/containers/image/v5/types" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +// bpDetectCompressionStepData contains data that the copy pipeline needs about the “detect compression” step. +type bpDetectCompressionStepData struct { + isCompressed bool + format compressiontypes.Algorithm // Valid if isCompressed + decompressor compressiontypes.DecompressorFunc // Valid if isCompressed + srcCompressorName string // Compressor name to possibly record in the blob info cache for the source blob. +} + +// blobPipelineDetectCompressionStep updates *stream to detect its current compression format. +// srcInfo is only used for error messages. +// Returns data for other steps. +func blobPipelineDetectCompressionStep(stream *sourceStream, srcInfo types.BlobInfo) (bpDetectCompressionStepData, error) { + // This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression. + format, decompressor, reader, err := compression.DetectCompressionFormat(stream.reader) // We could skip this in some cases, but let's keep the code path uniform + if err != nil { + return bpDetectCompressionStepData{}, errors.Wrapf(err, "reading blob %s", srcInfo.Digest) + } + stream.reader = reader + + res := bpDetectCompressionStepData{ + isCompressed: decompressor != nil, + format: format, + decompressor: decompressor, + } + if res.isCompressed { + res.srcCompressorName = format.Name() + } else { + res.srcCompressorName = internalblobinfocache.Uncompressed + } + + if expectedFormat, known := expectedCompressionFormats[stream.info.MediaType]; known && res.isCompressed && format.Name() != expectedFormat.Name() { + logrus.Debugf("blob %s with type %s should be compressed with %s, but compressor appears to be %s", srcInfo.Digest.String(), srcInfo.MediaType, expectedFormat.Name(), format.Name()) + } + return res, nil +} + +// bpCompressionStepData contains data that the copy pipeline needs about the compression step. +type bpCompressionStepData struct { + operation types.LayerCompression // Operation to use for updating the blob metadata. + uploadedAlgorithm *compressiontypes.Algorithm // An algorithm parameter for the compressionOperation edits. + uploadedAnnotations map[string]string // Annotations that should be set on the uploaded blob. WARNING: This is only set after the srcStream.reader is fully consumed. + srcCompressorName string // Compressor name to record in the blob info cache for the source blob. + uploadedCompressorName string // Compressor name to record in the blob info cache for the uploaded blob. + closers []io.Closer // Objects to close after the upload is done, if any. +} + +// blobPipelineCompressionStep updates *stream to compress and/or decompress it. +// srcInfo is only used for error messages. +// Returns data for other steps; the caller should eventually call updateCompressionEdits and perhaps recordValidatedBlobData, +// and must eventually call close. +func (c *copier) blobPipelineCompressionStep(stream *sourceStream, canModifyBlob bool, + detected bpDetectCompressionStepData) (*bpCompressionStepData, error) { + // WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists + // short-circuit conditions + if canModifyBlob { + for _, fn := range []func(*sourceStream, bpDetectCompressionStepData) (*bpCompressionStepData, error){ + c.bpcPreserveEncrypted, + c.bpcCompressUncompressed, + c.bpcRecompressCompressed, + c.bpcDecompressCompressed, + } { + res, err := fn(stream, detected) + if err != nil { + return nil, err + } + if res != nil { + return res, nil + } + } + } + return c.bpcPreserveOriginal(stream, detected), nil +} + +// bpcPreserveEncrypted checks if the input is encrypted, and returns a *bpCompressionStepData if so. +func (c *copier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectCompressionStepData) (*bpCompressionStepData, error) { + if isOciEncrypted(stream.info.MediaType) { + // PreserveOriginal due to any compression not being able to be done on an encrypted blob unless decrypted + logrus.Debugf("Using original blob without modification for encrypted blob") + return &bpCompressionStepData{ + operation: types.PreserveOriginal, + uploadedAlgorithm: nil, + srcCompressorName: internalblobinfocache.UnknownCompression, + uploadedCompressorName: internalblobinfocache.UnknownCompression, + }, nil + } + return nil, nil +} + +// bpcCompressUncompressed checks if we should be compressing an uncompressed input, and returns a *bpCompressionStepData if so. +func (c *copier) bpcCompressUncompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) { + if c.dest.DesiredLayerCompression() == types.Compress && !detected.isCompressed { + logrus.Debugf("Compressing blob on the fly") + var uploadedAlgorithm *compressiontypes.Algorithm + if c.compressionFormat != nil { + uploadedAlgorithm = c.compressionFormat + } else { + uploadedAlgorithm = defaultCompressionFormat + } + + reader, annotations := c.compressedStream(stream.reader, *uploadedAlgorithm) + // Note: reader must be closed on all return paths. + stream.reader = reader + stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info? + Digest: "", + Size: -1, + } + return &bpCompressionStepData{ + operation: types.Compress, + uploadedAlgorithm: uploadedAlgorithm, + uploadedAnnotations: annotations, + srcCompressorName: detected.srcCompressorName, + uploadedCompressorName: uploadedAlgorithm.Name(), + closers: []io.Closer{reader}, + }, nil + } + return nil, nil +} + +// bpcRecompressCompressed checks if we should be recompressing a compressed input to another format, and returns a *bpCompressionStepData if so. +func (c *copier) bpcRecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) { + if c.dest.DesiredLayerCompression() == types.Compress && detected.isCompressed && + c.compressionFormat != nil && c.compressionFormat.Name() != detected.format.Name() { + // When the blob is compressed, but the desired format is different, it first needs to be decompressed and finally + // re-compressed using the desired format. + logrus.Debugf("Blob will be converted") + + decompressed, err := detected.decompressor(stream.reader) + if err != nil { + return nil, err + } + succeeded := false + defer func() { + if !succeeded { + decompressed.Close() + } + }() + + recompressed, annotations := c.compressedStream(decompressed, *c.compressionFormat) + // Note: recompressed must be closed on all return paths. + stream.reader = recompressed + stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info? + Digest: "", + Size: -1, + } + succeeded = true + return &bpCompressionStepData{ + operation: types.PreserveOriginal, + uploadedAlgorithm: c.compressionFormat, + uploadedAnnotations: annotations, + srcCompressorName: detected.srcCompressorName, + uploadedCompressorName: c.compressionFormat.Name(), + closers: []io.Closer{decompressed, recompressed}, + }, nil + } + return nil, nil +} + +// bpcDecompressCompressed checks if we should be decompressing a compressed input, and returns a *bpCompressionStepData if so. +func (c *copier) bpcDecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) { + if c.dest.DesiredLayerCompression() == types.Decompress && detected.isCompressed { + logrus.Debugf("Blob will be decompressed") + s, err := detected.decompressor(stream.reader) + if err != nil { + return nil, err + } + // Note: s must be closed on all return paths. + stream.reader = s + stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info? + Digest: "", + Size: -1, + } + return &bpCompressionStepData{ + operation: types.Decompress, + uploadedAlgorithm: nil, + srcCompressorName: detected.srcCompressorName, + uploadedCompressorName: internalblobinfocache.Uncompressed, + closers: []io.Closer{s}, + }, nil + } + return nil, nil +} + +// bpcPreserveOriginal returns a *bpCompressionStepData for not changing the original blob. +func (c *copier) bpcPreserveOriginal(stream *sourceStream, detected bpDetectCompressionStepData) *bpCompressionStepData { + // PreserveOriginal might also need to recompress the original blob if the desired compression format is different. + logrus.Debugf("Using original blob without modification") + // Remember if the original blob was compressed, and if so how, so that if + // LayerInfosForCopy() returned something that differs from what was in the + // source's manifest, and UpdatedImage() needs to call UpdateLayerInfos(), + // it will be able to correctly derive the MediaType for the copied blob. + var algorithm *compressiontypes.Algorithm + if detected.isCompressed { + algorithm = &detected.format + } else { + algorithm = nil + } + return &bpCompressionStepData{ + operation: types.PreserveOriginal, + uploadedAlgorithm: algorithm, + srcCompressorName: detected.srcCompressorName, + uploadedCompressorName: detected.srcCompressorName, + } +} + +// updateCompressionEdits sets *operation, *algorithm and updates *annotations, if necessary. +func (d *bpCompressionStepData) updateCompressionEdits(operation *types.LayerCompression, algorithm **compressiontypes.Algorithm, annotations *map[string]string) { + *operation = d.operation + // If we can modify the layer's blob, set the desired algorithm for it to be set in the manifest. + *algorithm = d.uploadedAlgorithm + if *annotations == nil { + *annotations = map[string]string{} + } + for k, v := range d.uploadedAnnotations { + (*annotations)[k] = v + } +} + +// recordValidatedBlobData updates b.blobInfoCache with data about the created uploadedInfo adnd the original srcInfo. +// This must ONLY be called if all data has been validated by OUR code, and is not comming from third parties. +func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInfo types.BlobInfo, srcInfo types.BlobInfo, + encryptionStep *bpEncryptionStepData, decryptionStep *bpDecryptionStepData) error { + // Don’t record any associations that involve encrypted data. This is a bit crude, + // some blob substitutions (replacing pulls of encrypted data with local reuse of known decryption outcomes) + // might be safe, but it’s not trivially obvious, so let’s be conservative for now. + // This crude approach also means we don’t need to record whether a blob is encrypted + // in the blob info cache (which would probably be necessary for any more complex logic), + // and the simplicity is attractive. + if !encryptionStep.encrypting && !decryptionStep.decrypting { + // If d.operation != types.PreserveOriginal, we now have two reliable digest values: + // srcinfo.Digest describes the pre-d.operation input, verified by digestingReader + // uploadedInfo.Digest describes the post-d.operation output, computed by PutBlob + // (because stream.info.Digest == "", this must have been computed afresh). + switch d.operation { + case types.PreserveOriginal: + break // Do nothing, we have only one digest and we might not have even verified it. + case types.Compress: + c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest) + case types.Decompress: + c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest) + default: + return errors.Errorf("Internal error: Unexpected d.operation value %#v", d.operation) + } + } + if d.uploadedCompressorName != "" && d.uploadedCompressorName != internalblobinfocache.UnknownCompression { + c.blobInfoCache.RecordDigestCompressorName(uploadedInfo.Digest, d.uploadedCompressorName) + } + if srcInfo.Digest != "" && d.srcCompressorName != "" && d.srcCompressorName != internalblobinfocache.UnknownCompression { + c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, d.srcCompressorName) + } + return nil +} + +// close closes objects that carry state throughout the compression/decompression operation. +func (d *bpCompressionStepData) close() { + for _, c := range d.closers { + c.Close() + } +} + +// doCompression reads all input from src and writes its compressed equivalent to dest. +func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm, compressionLevel *int) error { + compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, compressionLevel) + if err != nil { + return err + } + + buf := make([]byte, compressionBufferSize) + + _, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close() + if err != nil { + compressor.Close() + return err + } + + return compressor.Close() +} + +// compressGoroutine reads all input from src and writes its compressed equivalent to dest. +func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm) { + err := errors.New("Internal error: unexpected panic in compressGoroutine") + defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily. + _ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil + }() + + err = doCompression(dest, src, metadata, compressionFormat, c.compressionLevel) +} + +// compressedStream returns a stream the input reader compressed using format, and a metadata map. +// The caller must close the returned reader. +// AFTER the stream is consumed, metadata will be updated with annotations to use on the data. +func (c *copier) compressedStream(reader io.Reader, algorithm compressiontypes.Algorithm) (io.ReadCloser, map[string]string) { + pipeReader, pipeWriter := io.Pipe() + annotations := map[string]string{} + // If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise, + // e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed, + // we don’t care. + go c.compressGoroutine(pipeWriter, reader, annotations, algorithm) // Closes pipeWriter + return pipeReader, annotations +} diff --git a/copy/copy.go b/copy/copy.go index 8c7941b58..d54859239 100644 --- a/copy/copy.go +++ b/copy/copy.go @@ -25,7 +25,6 @@ import ( "github.com/containers/image/v5/signature" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" - "github.com/containers/ocicrypt" encconfig "github.com/containers/ocicrypt/config" digest "github.com/opencontainers/go-digest" imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" @@ -1331,350 +1330,3 @@ func computeDiffID(stream io.Reader, decompressor compressiontypes.DecompressorF return digest.Canonical.FromReader(stream) } - -// errorAnnotationReader wraps the io.Reader passed to PutBlob for annotating the error happened during read. -// These errors are reported as PutBlob errors, so we would otherwise misleadingly attribute them to the copy destination. -type errorAnnotationReader struct { - reader io.Reader -} - -// Read annotates the error happened during read -func (r errorAnnotationReader) Read(b []byte) (n int, err error) { - n, err = r.reader.Read(b) - if err != io.EOF { - return n, errors.Wrapf(err, "happened during read") - } - return n, err -} - -// copyBlobFromStream copies a blob with srcInfo (with known Digest and Annotations and possibly known Size) from srcStream to dest, -// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil, -// perhaps (de/re/)compressing it if canModifyBlob, -// and returns a complete blobInfo of the copied blob. -func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo, - getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer, - canModifyBlob bool, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) { - if isConfig { // This is guaranteed by the caller, but set it here to be explicit. - canModifyBlob = false - } - - // The copying happens through a pipeline of connected io.Readers. - // === Input: srcStream - - // === Process input through digestingReader to validate against the expected digest. - // Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader, - // use a separate validation failure indicator. - // Note that for this check we don't use the stronger "validationSucceeded" indicator, because - // dest.PutBlob may detect that the layer already exists, in which case we don't - // read stream to the end, and validation does not happen. - digestingReader, err := newDigestingReader(srcStream, srcInfo.Digest) - if err != nil { - return types.BlobInfo{}, errors.Wrapf(err, "preparing to verify blob %s", srcInfo.Digest) - } - var destStream io.Reader = digestingReader - - // === Update progress bars - destStream = bar.ProxyReader(destStream) - - // === Decrypt the stream, if required. - var decrypted bool - if isOciEncrypted(srcInfo.MediaType) && c.ociDecryptConfig != nil { - newDesc := imgspecv1.Descriptor{ - Annotations: srcInfo.Annotations, - } - - var d digest.Digest - destStream, d, err = ocicrypt.DecryptLayer(c.ociDecryptConfig, destStream, newDesc, false) - if err != nil { - return types.BlobInfo{}, errors.Wrapf(err, "decrypting layer %s", srcInfo.Digest) - } - - srcInfo.Digest = d - srcInfo.Size = -1 - for k := range srcInfo.Annotations { - if strings.HasPrefix(k, "org.opencontainers.image.enc") { - delete(srcInfo.Annotations, k) - } - } - decrypted = true - } - - // === Detect compression of the input stream. - // This requires us to “peek ahead” into the stream to read the initial part, which requires us to chain through another io.Reader returned by DetectCompression. - compressionFormat, decompressor, destStream, err := compression.DetectCompressionFormat(destStream) // We could skip this in some cases, but let's keep the code path uniform - if err != nil { - return types.BlobInfo{}, errors.Wrapf(err, "reading blob %s", srcInfo.Digest) - } - isCompressed := decompressor != nil - if expectedCompressionFormat, known := expectedCompressionFormats[srcInfo.MediaType]; known && isCompressed && compressionFormat.Name() != expectedCompressionFormat.Name() { - logrus.Debugf("blob %s with type %s should be compressed with %s, but compressor appears to be %s", srcInfo.Digest.String(), srcInfo.MediaType, expectedCompressionFormat.Name(), compressionFormat.Name()) - } - - // === Send a copy of the original, uncompressed, stream, to a separate path if necessary. - var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so. - if getOriginalLayerCopyWriter != nil { - destStream = io.TeeReader(destStream, getOriginalLayerCopyWriter(decompressor)) - originalLayerReader = destStream - } - - compressionMetadata := map[string]string{} - // === Deal with layer compression/decompression if necessary - // WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists - // short-circuit conditions - var inputInfo types.BlobInfo - var compressionOperation types.LayerCompression - var uploadCompressionFormat *compressiontypes.Algorithm - srcCompressorName := internalblobinfocache.Uncompressed - if isCompressed { - srcCompressorName = compressionFormat.Name() - } - var uploadCompressorName string - if canModifyBlob && isOciEncrypted(srcInfo.MediaType) { - // PreserveOriginal due to any compression not being able to be done on an encrypted blob unless decrypted - logrus.Debugf("Using original blob without modification for encrypted blob") - compressionOperation = types.PreserveOriginal - inputInfo = srcInfo - srcCompressorName = internalblobinfocache.UnknownCompression - uploadCompressionFormat = nil - uploadCompressorName = internalblobinfocache.UnknownCompression - } else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && !isCompressed { - logrus.Debugf("Compressing blob on the fly") - compressionOperation = types.Compress - pipeReader, pipeWriter := io.Pipe() - defer pipeReader.Close() - - if c.compressionFormat != nil { - uploadCompressionFormat = c.compressionFormat - } else { - uploadCompressionFormat = defaultCompressionFormat - } - // If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise, - // e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed, - // we don’t care. - go c.compressGoroutine(pipeWriter, destStream, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter - destStream = pipeReader - inputInfo.Digest = "" - inputInfo.Size = -1 - uploadCompressorName = uploadCompressionFormat.Name() - } else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && isCompressed && - c.compressionFormat != nil && c.compressionFormat.Name() != compressionFormat.Name() { - // When the blob is compressed, but the desired format is different, it first needs to be decompressed and finally - // re-compressed using the desired format. - logrus.Debugf("Blob will be converted") - - compressionOperation = types.PreserveOriginal - s, err := decompressor(destStream) - if err != nil { - return types.BlobInfo{}, err - } - defer s.Close() - - pipeReader, pipeWriter := io.Pipe() - defer pipeReader.Close() - - uploadCompressionFormat = c.compressionFormat - go c.compressGoroutine(pipeWriter, s, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter - - destStream = pipeReader - inputInfo.Digest = "" - inputInfo.Size = -1 - uploadCompressorName = uploadCompressionFormat.Name() - } else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Decompress && isCompressed { - logrus.Debugf("Blob will be decompressed") - compressionOperation = types.Decompress - s, err := decompressor(destStream) - if err != nil { - return types.BlobInfo{}, err - } - defer s.Close() - destStream = s - inputInfo.Digest = "" - inputInfo.Size = -1 - uploadCompressionFormat = nil - uploadCompressorName = internalblobinfocache.Uncompressed - } else { - // PreserveOriginal might also need to recompress the original blob if the desired compression format is different. - logrus.Debugf("Using original blob without modification") - compressionOperation = types.PreserveOriginal - inputInfo = srcInfo - // Remember if the original blob was compressed, and if so how, so that if - // LayerInfosForCopy() returned something that differs from what was in the - // source's manifest, and UpdatedImage() needs to call UpdateLayerInfos(), - // it will be able to correctly derive the MediaType for the copied blob. - if isCompressed { - uploadCompressionFormat = &compressionFormat - } else { - uploadCompressionFormat = nil - } - uploadCompressorName = srcCompressorName - } - - // === Encrypt the stream for valid mediatypes if ociEncryptConfig provided - var ( - encrypted bool - finalizer ocicrypt.EncryptLayerFinalizer - ) - if toEncrypt { - if decrypted { - return types.BlobInfo{}, errors.New("Unable to support both decryption and encryption in the same copy") - } - - if !isOciEncrypted(srcInfo.MediaType) && c.ociEncryptConfig != nil { - var annotations map[string]string - if !decrypted { - annotations = srcInfo.Annotations - } - desc := imgspecv1.Descriptor{ - MediaType: srcInfo.MediaType, - Digest: srcInfo.Digest, - Size: srcInfo.Size, - Annotations: annotations, - } - - s, fin, err := ocicrypt.EncryptLayer(c.ociEncryptConfig, destStream, desc) - if err != nil { - return types.BlobInfo{}, errors.Wrapf(err, "encrypting blob %s", srcInfo.Digest) - } - - destStream = s - finalizer = fin - inputInfo.Digest = "" - inputInfo.Size = -1 - encrypted = true - } - } - - // === Report progress using the c.progress channel, if required. - if c.progress != nil && c.progressInterval > 0 { - progressReader := newProgressReader( - destStream, - c.progress, - c.progressInterval, - srcInfo, - ) - defer progressReader.reportDone() - destStream = progressReader - } - - // === Finally, send the layer stream to dest. - options := private.PutBlobOptions{ - Cache: c.blobInfoCache, - IsConfig: isConfig, - EmptyLayer: emptyLayer, - } - if !isConfig { - options.LayerIndex = &layerIndex - } - uploadedInfo, err := c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{destStream}, inputInfo, options) - if err != nil { - return types.BlobInfo{}, errors.Wrap(err, "writing blob") - } - - uploadedInfo.Annotations = srcInfo.Annotations - - uploadedInfo.CompressionOperation = compressionOperation - // If we can modify the layer's blob, set the desired algorithm for it to be set in the manifest. - uploadedInfo.CompressionAlgorithm = uploadCompressionFormat - if decrypted { - uploadedInfo.CryptoOperation = types.Decrypt - } else if encrypted { - encryptAnnotations, err := finalizer() - if err != nil { - return types.BlobInfo{}, errors.Wrap(err, "Unable to finalize encryption") - } - uploadedInfo.CryptoOperation = types.Encrypt - if uploadedInfo.Annotations == nil { - uploadedInfo.Annotations = map[string]string{} - } - for k, v := range encryptAnnotations { - uploadedInfo.Annotations[k] = v - } - } - - // This is fairly horrible: the writer from getOriginalLayerCopyWriter wants to consume - // all of the input (to compute DiffIDs), even if dest.PutBlob does not need it. - // So, read everything from originalLayerReader, which will cause the rest to be - // sent there if we are not already at EOF. - if getOriginalLayerCopyWriter != nil { - logrus.Debugf("Consuming rest of the original blob to satisfy getOriginalLayerCopyWriter") - _, err := io.Copy(io.Discard, originalLayerReader) - if err != nil { - return types.BlobInfo{}, errors.Wrapf(err, "reading input blob %s", srcInfo.Digest) - } - } - - if digestingReader.validationFailed { // Coverage: This should never happen. - return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, digest verification failed but was ignored", srcInfo.Digest) - } - if inputInfo.Digest != "" && uploadedInfo.Digest != inputInfo.Digest { - return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, inputInfo.Digest, uploadedInfo.Digest) - } - if digestingReader.validationSucceeded { - // Don’t record any associations that involve encrypted data. This is a bit crude, - // some blob substitutions (replacing pulls of encrypted data with local reuse of known decryption outcomes) - // might be safe, but it’s not trivially obvious, so let’s be conservative for now. - // This crude approach also means we don’t need to record whether a blob is encrypted - // in the blob info cache (which would probably be necessary for any more complex logic), - // and the simplicity is attractive. - if !encrypted && !decrypted { - // If compressionOperation != types.PreserveOriginal, we now have two reliable digest values: - // srcinfo.Digest describes the pre-compressionOperation input, verified by digestingReader - // uploadedInfo.Digest describes the post-compressionOperation output, computed by PutBlob - // (because inputInfo.Digest == "", this must have been computed afresh). - switch compressionOperation { - case types.PreserveOriginal: - break // Do nothing, we have only one digest and we might not have even verified it. - case types.Compress: - c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest) - case types.Decompress: - c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest) - default: - return types.BlobInfo{}, errors.Errorf("Internal error: Unexpected compressionOperation value %#v", compressionOperation) - } - } - if uploadCompressorName != "" && uploadCompressorName != internalblobinfocache.UnknownCompression { - c.blobInfoCache.RecordDigestCompressorName(uploadedInfo.Digest, uploadCompressorName) - } - if srcInfo.Digest != "" && srcCompressorName != "" && srcCompressorName != internalblobinfocache.UnknownCompression { - c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, srcCompressorName) - } - } - - // Copy all the metadata generated by the compressor into the annotations. - if uploadedInfo.Annotations == nil { - uploadedInfo.Annotations = map[string]string{} - } - for k, v := range compressionMetadata { - uploadedInfo.Annotations[k] = v - } - - return uploadedInfo, nil -} - -// doCompression reads all input from src and writes its compressed equivalent to dest. -func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm, compressionLevel *int) error { - compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, compressionLevel) - if err != nil { - return err - } - - buf := make([]byte, compressionBufferSize) - - _, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close() - if err != nil { - compressor.Close() - return err - } - - return compressor.Close() -} - -// compressGoroutine reads all input from src and writes its compressed equivalent to dest. -func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm) { - err := errors.New("Internal error: unexpected panic in compressGoroutine") - defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily. - _ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil - }() - - err = doCompression(dest, src, metadata, compressionFormat, c.compressionLevel) -} diff --git a/copy/encrypt.go b/copy/encrypt.go deleted file mode 100644 index a18d6f151..000000000 --- a/copy/encrypt.go +++ /dev/null @@ -1,24 +0,0 @@ -package copy - -import ( - "strings" - - "github.com/containers/image/v5/types" -) - -// isOciEncrypted returns a bool indicating if a mediatype is encrypted -// This function will be moved to be part of OCI spec when adopted. -func isOciEncrypted(mediatype string) bool { - return strings.HasSuffix(mediatype, "+encrypted") -} - -// isEncrypted checks if an image is encrypted -func isEncrypted(i types.Image) bool { - layers := i.LayerInfos() - for _, l := range layers { - if isOciEncrypted(l.MediaType) { - return true - } - } - return false -} diff --git a/copy/encryption.go b/copy/encryption.go new file mode 100644 index 000000000..ae0576da4 --- /dev/null +++ b/copy/encryption.go @@ -0,0 +1,129 @@ +package copy + +import ( + "strings" + + "github.com/containers/image/v5/types" + "github.com/containers/ocicrypt" + imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +// isOciEncrypted returns a bool indicating if a mediatype is encrypted +// This function will be moved to be part of OCI spec when adopted. +func isOciEncrypted(mediatype string) bool { + return strings.HasSuffix(mediatype, "+encrypted") +} + +// isEncrypted checks if an image is encrypted +func isEncrypted(i types.Image) bool { + layers := i.LayerInfos() + for _, l := range layers { + if isOciEncrypted(l.MediaType) { + return true + } + } + return false +} + +// bpDecryptionStepData contains data that the copy pipeline needs about the decryption step. +type bpDecryptionStepData struct { + decrypting bool // We are actually decrypting the stream +} + +// blobPipelineDecryptionStep updates *stream to decrypt if, it necessary. +// srcInfo is only used for error messages. +// Returns data for other steps; the caller should eventually use updateCryptoOperation. +func (c *copier) blobPipelineDecryptionStep(stream *sourceStream, srcInfo types.BlobInfo) (*bpDecryptionStepData, error) { + if isOciEncrypted(stream.info.MediaType) && c.ociDecryptConfig != nil { + desc := imgspecv1.Descriptor{ + Annotations: stream.info.Annotations, + } + reader, decryptedDigest, err := ocicrypt.DecryptLayer(c.ociDecryptConfig, stream.reader, desc, false) + if err != nil { + return nil, errors.Wrapf(err, "decrypting layer %s", srcInfo.Digest) + } + + stream.reader = reader + stream.info.Digest = decryptedDigest + stream.info.Size = -1 + for k := range stream.info.Annotations { + if strings.HasPrefix(k, "org.opencontainers.image.enc") { + delete(stream.info.Annotations, k) + } + } + return &bpDecryptionStepData{ + decrypting: true, + }, nil + } + return &bpDecryptionStepData{ + decrypting: false, + }, nil +} + +// updateCryptoOperation sets *operation, if necessary. +func (d *bpDecryptionStepData) updateCryptoOperation(operation *types.LayerCrypto) { + if d.decrypting { + *operation = types.Decrypt + } +} + +// bpdData contains data that the copy pipeline needs about the encryption step. +type bpEncryptionStepData struct { + encrypting bool // We are actually encrypting the stream + finalizer ocicrypt.EncryptLayerFinalizer +} + +// blobPipelineEncryptionStep updates *stream to encrypt if, it required by toEncrypt. +// srcInfo is primarily used for error messages. +// Returns data for other steps; the caller should eventually call updateCryptoOperationAndAnnotations. +func (c *copier) blobPipelineEncryptionStep(stream *sourceStream, toEncrypt bool, srcInfo types.BlobInfo, + decryptionStep *bpDecryptionStepData) (*bpEncryptionStepData, error) { + if toEncrypt && !isOciEncrypted(srcInfo.MediaType) && c.ociEncryptConfig != nil { + var annotations map[string]string + if !decryptionStep.decrypting { + annotations = srcInfo.Annotations + } + desc := imgspecv1.Descriptor{ + MediaType: srcInfo.MediaType, + Digest: srcInfo.Digest, + Size: srcInfo.Size, + Annotations: annotations, + } + reader, finalizer, err := ocicrypt.EncryptLayer(c.ociEncryptConfig, stream.reader, desc) + if err != nil { + return nil, errors.Wrapf(err, "encrypting blob %s", srcInfo.Digest) + } + + stream.reader = reader + stream.info.Digest = "" + stream.info.Size = -1 + return &bpEncryptionStepData{ + encrypting: true, + finalizer: finalizer, + }, nil + } + return &bpEncryptionStepData{ + encrypting: false, + }, nil +} + +// updateCryptoOperationAndAnnotations sets *operation and updates *annotations, if necessary. +func (d *bpEncryptionStepData) updateCryptoOperationAndAnnotations(operation *types.LayerCrypto, annotations *map[string]string) error { + if !d.encrypting { + return nil + } + + encryptAnnotations, err := d.finalizer() + if err != nil { + return errors.Wrap(err, "Unable to finalize encryption") + } + *operation = types.Encrypt + if *annotations == nil { + *annotations = map[string]string{} + } + for k, v := range encryptAnnotations { + (*annotations)[k] = v + } + return nil +}