Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support OCI artifacts #1574

Merged
merged 14 commits into from Jun 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 14 additions & 17 deletions copy/blob.go
Expand Up @@ -15,13 +15,9 @@ import (
// perhaps sending a copy to an io.Writer if getOriginalLayerCopyWriter != nil,
// perhaps (de/re/)compressing it if canModifyBlob,
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo,
func (ic *imageCopier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
if isConfig { // This is guaranteed by the caller, but set it here to be explicit.
canModifyBlob = false
}

isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
// The copying happens through a pipeline of connected io.Readers;
// that pipeline is built by updating stream.
// === Input: srcReader
Expand All @@ -46,7 +42,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, sr
stream.reader = bar.ProxyReader(stream.reader)

// === Decrypt the stream, if required.
decryptionStep, err := c.blobPipelineDecryptionStep(&stream, srcInfo)
decryptionStep, err := ic.c.blobPipelineDecryptionStep(&stream, srcInfo)
if err != nil {
return types.BlobInfo{}, err
}
Expand All @@ -65,10 +61,11 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, sr
originalLayerReader = stream.reader
}

// === Deal with layer compression/decompression if necessary
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
// short-circuit conditions
compressionStep, err := c.blobPipelineCompressionStep(&stream, canModifyBlob, detectedCompression)
canModifyBlob := !isConfig && ic.cannotModifyManifestReason == ""
// === Deal with layer compression/decompression if necessary
compressionStep, err := ic.blobPipelineCompressionStep(&stream, canModifyBlob, srcInfo, detectedCompression)
if err != nil {
return types.BlobInfo{}, err
}
Expand All @@ -80,17 +77,17 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, sr
// Before relaxing this, see the original pull request’s review if there are other reasons to reject this.
return types.BlobInfo{}, errors.New("Unable to support both decryption and encryption in the same copy")
}
encryptionStep, err := c.blobPipelineEncryptionStep(&stream, toEncrypt, srcInfo, decryptionStep)
encryptionStep, err := ic.c.blobPipelineEncryptionStep(&stream, toEncrypt, srcInfo, decryptionStep)
if err != nil {
return types.BlobInfo{}, err
}

// === Report progress using the c.progress channel, if required.
if c.progress != nil && c.progressInterval > 0 {
// === Report progress using the ic.c.progress channel, if required.
if ic.c.progress != nil && ic.c.progressInterval > 0 {
progressReader := newProgressReader(
stream.reader,
c.progress,
c.progressInterval,
ic.c.progress,
ic.c.progressInterval,
srcInfo,
)
defer progressReader.reportDone()
Expand All @@ -99,14 +96,14 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, sr

// === Finally, send the layer stream to dest.
options := private.PutBlobOptions{
Cache: c.blobInfoCache,
Cache: ic.c.blobInfoCache,
IsConfig: isConfig,
EmptyLayer: emptyLayer,
}
if !isConfig {
options.LayerIndex = &layerIndex
}
uploadedInfo, err := c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{stream.reader}, stream.info, options)
uploadedInfo, err := ic.c.dest.PutBlobWithOptions(ctx, &errorAnnotationReader{stream.reader}, stream.info, options)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "writing blob")
}
Expand Down Expand Up @@ -138,7 +135,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcReader io.Reader, sr
return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, stream.info.Digest, uploadedInfo.Digest)
}
if digestingReader.validationSucceeded {
if err := compressionStep.recordValidatedDigestData(c, uploadedInfo, srcInfo, encryptionStep, decryptionStep); err != nil {
if err := compressionStep.recordValidatedDigestData(ic.c, uploadedInfo, srcInfo, encryptionStep, decryptionStep); err != nil {
return types.BlobInfo{}, err
}
}
Expand Down
60 changes: 34 additions & 26 deletions copy/compression.go
Expand Up @@ -58,19 +58,23 @@ type bpCompressionStepData struct {
}

// blobPipelineCompressionStep updates *stream to compress and/or decompress it.
// srcInfo is only used for error messages.
// srcInfo is primarily used for error messages.
// Returns data for other steps; the caller should eventually call updateCompressionEdits and perhaps recordValidatedBlobData,
// and must eventually call close.
func (c *copier) blobPipelineCompressionStep(stream *sourceStream, canModifyBlob bool,
func (ic *imageCopier) blobPipelineCompressionStep(stream *sourceStream, canModifyBlob bool, srcInfo types.BlobInfo,
detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
// short-circuit conditions
if canModifyBlob {
layerCompressionChangeSupported := ic.src.CanChangeLayerCompression(stream.info.MediaType)
if !layerCompressionChangeSupported {
logrus.Debugf("Compression change for blob %s (%q) not supported", srcInfo.Digest, stream.info.MediaType)
}
if canModifyBlob && layerCompressionChangeSupported {
for _, fn := range []func(*sourceStream, bpDetectCompressionStepData) (*bpCompressionStepData, error){
c.bpcPreserveEncrypted,
c.bpcCompressUncompressed,
c.bpcRecompressCompressed,
c.bpcDecompressCompressed,
ic.bpcPreserveEncrypted,
ic.bpcCompressUncompressed,
ic.bpcRecompressCompressed,
ic.bpcDecompressCompressed,
} {
res, err := fn(stream, detected)
if err != nil {
Expand All @@ -81,14 +85,14 @@ func (c *copier) blobPipelineCompressionStep(stream *sourceStream, canModifyBlob
}
}
}
return c.bpcPreserveOriginal(stream, detected), nil
return ic.bpcPreserveOriginal(stream, detected, layerCompressionChangeSupported), nil
}

// bpcPreserveEncrypted checks if the input is encrypted, and returns a *bpCompressionStepData if so.
func (c *copier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectCompressionStepData) (*bpCompressionStepData, error) {
func (ic *imageCopier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if isOciEncrypted(stream.info.MediaType) {
// PreserveOriginal due to any compression not being able to be done on an encrypted blob unless decrypted
logrus.Debugf("Using original blob without modification for encrypted blob")
// PreserveOriginal due to any compression not being able to be done on an encrypted blob unless decrypted
return &bpCompressionStepData{
operation: types.PreserveOriginal,
uploadedAlgorithm: nil,
Expand All @@ -100,17 +104,17 @@ func (c *copier) bpcPreserveEncrypted(stream *sourceStream, _ bpDetectCompressio
}

// bpcCompressUncompressed checks if we should be compressing an uncompressed input, and returns a *bpCompressionStepData if so.
func (c *copier) bpcCompressUncompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if c.dest.DesiredLayerCompression() == types.Compress && !detected.isCompressed {
func (ic *imageCopier) bpcCompressUncompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if ic.c.dest.DesiredLayerCompression() == types.Compress && !detected.isCompressed {
logrus.Debugf("Compressing blob on the fly")
var uploadedAlgorithm *compressiontypes.Algorithm
if c.compressionFormat != nil {
uploadedAlgorithm = c.compressionFormat
if ic.c.compressionFormat != nil {
uploadedAlgorithm = ic.c.compressionFormat
} else {
uploadedAlgorithm = defaultCompressionFormat
}

reader, annotations := c.compressedStream(stream.reader, *uploadedAlgorithm)
reader, annotations := ic.c.compressedStream(stream.reader, *uploadedAlgorithm)
// Note: reader must be closed on all return paths.
stream.reader = reader
stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info?
Expand All @@ -130,9 +134,9 @@ func (c *copier) bpcCompressUncompressed(stream *sourceStream, detected bpDetect
}

// bpcRecompressCompressed checks if we should be recompressing a compressed input to another format, and returns a *bpCompressionStepData if so.
func (c *copier) bpcRecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if c.dest.DesiredLayerCompression() == types.Compress && detected.isCompressed &&
c.compressionFormat != nil && c.compressionFormat.Name() != detected.format.Name() {
func (ic *imageCopier) bpcRecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if ic.c.dest.DesiredLayerCompression() == types.Compress && detected.isCompressed &&
ic.c.compressionFormat != nil && ic.c.compressionFormat.Name() != detected.format.Name() {
// When the blob is compressed, but the desired format is different, it first needs to be decompressed and finally
// re-compressed using the desired format.
logrus.Debugf("Blob will be converted")
Expand All @@ -148,7 +152,7 @@ func (c *copier) bpcRecompressCompressed(stream *sourceStream, detected bpDetect
}
}()

recompressed, annotations := c.compressedStream(decompressed, *c.compressionFormat)
recompressed, annotations := ic.c.compressedStream(decompressed, *ic.c.compressionFormat)
// Note: recompressed must be closed on all return paths.
stream.reader = recompressed
stream.info = types.BlobInfo{ // FIXME? Should we preserve more data in src.info?
Expand All @@ -158,19 +162,19 @@ func (c *copier) bpcRecompressCompressed(stream *sourceStream, detected bpDetect
succeeded = true
return &bpCompressionStepData{
operation: types.PreserveOriginal,
uploadedAlgorithm: c.compressionFormat,
uploadedAlgorithm: ic.c.compressionFormat,
uploadedAnnotations: annotations,
srcCompressorName: detected.srcCompressorName,
uploadedCompressorName: c.compressionFormat.Name(),
uploadedCompressorName: ic.c.compressionFormat.Name(),
closers: []io.Closer{decompressed, recompressed},
}, nil
}
return nil, nil
}

// bpcDecompressCompressed checks if we should be decompressing a compressed input, and returns a *bpCompressionStepData if so.
func (c *copier) bpcDecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if c.dest.DesiredLayerCompression() == types.Decompress && detected.isCompressed {
func (ic *imageCopier) bpcDecompressCompressed(stream *sourceStream, detected bpDetectCompressionStepData) (*bpCompressionStepData, error) {
if ic.c.dest.DesiredLayerCompression() == types.Decompress && detected.isCompressed {
logrus.Debugf("Blob will be decompressed")
s, err := detected.decompressor(stream.reader)
if err != nil {
Expand All @@ -194,15 +198,19 @@ func (c *copier) bpcDecompressCompressed(stream *sourceStream, detected bpDetect
}

// bpcPreserveOriginal returns a *bpCompressionStepData for not changing the original blob.
func (c *copier) bpcPreserveOriginal(stream *sourceStream, detected bpDetectCompressionStepData) *bpCompressionStepData {
// PreserveOriginal might also need to recompress the original blob if the desired compression format is different.
func (ic *imageCopier) bpcPreserveOriginal(stream *sourceStream, detected bpDetectCompressionStepData,
layerCompressionChangeSupported bool) *bpCompressionStepData {
logrus.Debugf("Using original blob without modification")
// Remember if the original blob was compressed, and if so how, so that if
// LayerInfosForCopy() returned something that differs from what was in the
// source's manifest, and UpdatedImage() needs to call UpdateLayerInfos(),
// it will be able to correctly derive the MediaType for the copied blob.
//
// But don’t touch blobs in objects where we can’t change compression,
// so that src.UpdatedImage() doesn’t fail; assume that for such blobs
// LayerInfosForCopy() should not be making any changes in the first place.
var algorithm *compressiontypes.Algorithm
if detected.isCompressed {
if layerCompressionChangeSupported && detected.isCompressed {
algorithm = &detected.format
} else {
algorithm = nil
Expand Down
36 changes: 21 additions & 15 deletions copy/copy.go
Expand Up @@ -678,12 +678,14 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli
cannotModifyManifestReason: cannotModifyManifestReason,
ociEncryptLayers: options.OciEncryptLayers,
}
// Ensure _this_ copy sees exactly the intended data when either processing a signed image or signing it.
// This may be too conservative, but for now, better safe than sorry, _especially_ on the SignBy path:
// The signature makes the content non-repudiable, so it very much matters that the signature is made over exactly what the user intended.
// We do intend the RecordDigestUncompressedPair calls to only work with reliable data, but at least there’s a risk
// that the compressed version coming from a third party may be designed to attack some other decompressor implementation,
// and we would reuse and sign it.
// Decide whether we can substitute blobs with semantic equivalents:
// - Don’t do that if we can’t modify the manifest at all
// - Ensure _this_ copy sees exactly the intended data when either processing a signed image or signing it.
// This may be too conservative, but for now, better safe than sorry, _especially_ on the SignBy path:
// The signature makes the content non-repudiable, so it very much matters that the signature is made over exactly what the user intended.
// We do intend the RecordDigestUncompressedPair calls to only work with reliable data, but at least there’s a risk
// that the compressed version coming from a third party may be designed to attack some other decompressor implementation,
// and we would reuse and sign it.
ic.canSubstituteBlobs = ic.cannotModifyManifestReason == "" && options.SignBy == ""

if err := ic.updateEmbeddedDockerReference(); err != nil {
Expand Down Expand Up @@ -1044,7 +1046,7 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
return nil, "", errors.Wrap(err, "reading manifest")
}

if err := ic.c.copyConfig(ctx, pendingImage); err != nil {
if err := ic.copyConfig(ctx, pendingImage); err != nil {
return nil, "", err
}

Expand All @@ -1064,27 +1066,27 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
}

// copyConfig copies config.json, if any, from src to dest.
func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
func (ic *imageCopier) copyConfig(ctx context.Context, src types.Image) error {
srcInfo := src.ConfigInfo()
if srcInfo.Digest != "" {
if err := c.concurrentBlobCopiesSemaphore.Acquire(ctx, 1); err != nil {
if err := ic.c.concurrentBlobCopiesSemaphore.Acquire(ctx, 1); err != nil {
// This can only fail with ctx.Err(), so no need to blame acquiring the semaphore.
return fmt.Errorf("copying config: %w", err)
}
defer c.concurrentBlobCopiesSemaphore.Release(1)
defer ic.c.concurrentBlobCopiesSemaphore.Release(1)

destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool := c.newProgressPool()
progressPool := ic.c.newProgressPool()
defer progressPool.Wait()
bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done")
bar := ic.c.createProgressBar(progressPool, false, srcInfo, "config", "done")
defer bar.Abort(false)

configBlob, err := src.ConfigBlob(ctx)
if err != nil {
return types.BlobInfo{}, errors.Wrapf(err, "reading config blob %s", srcInfo.Digest)
}

destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false)
destInfo, err := ic.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, true, false, bar, -1, false)
if err != nil {
return types.BlobInfo{}, err
}
Expand Down Expand Up @@ -1143,6 +1145,10 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to

// Don’t read the layer from the source if we already have the blob, and optimizations are acceptable.
if canAvoidProcessingCompleteLayer {
canChangeLayerCompression := ic.src.CanChangeLayerCompression(srcInfo.MediaType)
logrus.Debugf("Checking if we can reuse blob %s: general substitution = %v, compression for MIME type %q = %v",
srcInfo.Digest, ic.canSubstituteBlobs, srcInfo.MediaType, canChangeLayerCompression)
canSubstitute := ic.canSubstituteBlobs && ic.src.CanChangeLayerCompression(srcInfo.MediaType)
// TODO: at this point we don't know whether or not a blob we end up reusing is compressed using an algorithm
// that is acceptable for use on layers in the manifest that we'll be writing later, so if we end up reusing
// a blob that's compressed with e.g. zstd, but we're only allowed to write a v2s2 manifest, this will cause
Expand All @@ -1151,7 +1157,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// the ImageDestination interface lets us pass in.
reused, blobInfo, err := ic.c.dest.TryReusingBlobWithOptions(ctx, srcInfo, private.TryReusingBlobOptions{
Cache: ic.c.blobInfoCache,
CanSubstitute: ic.canSubstituteBlobs,
CanSubstitute: canSubstitute,
EmptyLayer: emptyLayer,
LayerIndex: &layerIndex,
SrcRef: srcRef,
Expand Down Expand Up @@ -1300,7 +1306,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
}
}

blobInfo, err := ic.c.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, ic.cannotModifyManifestReason == "", false, toEncrypt, bar, layerIndex, emptyLayer) // Sets err to nil on success
blobInfo, err := ic.copyBlobFromStream(ctx, srcStream, srcInfo, getDiffIDRecorder, false, toEncrypt, bar, layerIndex, emptyLayer) // Sets err to nil on success
return blobInfo, diffIDChan, err
// We need the defer … pipeWriter.CloseWithError() to happen HERE so that the caller can block on reading from diffIDChan
}
Expand Down
9 changes: 9 additions & 0 deletions internal/image/docker_schema1.go
Expand Up @@ -246,3 +246,12 @@ func (m *manifestSchema1) convertToManifestOCI1(ctx context.Context, options *ty
func (m *manifestSchema1) SupportsEncryption(context.Context) bool {
return false
}

// CanChangeLayerCompression returns true if we can compress/decompress layers with mimeType in the current image
// (and the code can handle that).
// NOTE: Even if this returns true, the relevant format might not accept all compression algorithms; the set of accepted
// algorithms depends not on the current format, but possibly on the target of a conversion (if UpdatedImage converts
// to a different manifest format).
func (m *manifestSchema1) CanChangeLayerCompression(mimeType string) bool {
return true // There are no MIME types in the manifest, so we must assume a valid image.
}
9 changes: 9 additions & 0 deletions internal/image/docker_schema1_test.go
Expand Up @@ -667,3 +667,12 @@ func TestConvertSchema1ToManifestOCIWithAnnotations(t *testing.T) {
require.NoError(t, err)
assert.NotEqual(t, res.LayerInfos(), layerInfoOverwrites)
}

func TestManifestSchema1CanChangeLayerCompression(t *testing.T) {
for _, m := range []genericManifest{
manifestSchema1FromFixture(t, "schema1.json"),
manifestSchema1FromComponentsLikeFixture(t),
} {
assert.True(t, m.CanChangeLayerCompression(""))
}
}