Skip to content

Commit

Permalink
copy: use blobinfocache2 compression info for reused blobs
Browse files Browse the repository at this point in the history
Teach BlobInfoCache2 implementations to be able to retrieve the type of
compression that was applied to a blob with a given digest, if we know
it.

If we successfully reuse a blob while writing an image, make sure we
update the returned BlobInfo with compression information so that the
MIME type in the updated image will be derived correctly.  When we're
copying a blob unmodified, return its BlobInfo with compression
information for the same reason.

In the containers-storage transport and while copying, when we
decompress a blob, record that its diffID is the digest of a blob that
we know is not compressed.

Signed-off-by: Nalin Dahyabhai <nalin@redhat.com>
  • Loading branch information
nalind committed Feb 5, 2021
1 parent 0d8f725 commit 21ae051
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 17 deletions.
19 changes: 18 additions & 1 deletion copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,6 +1087,18 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
// Update the BlobInfo with information about how the reused blob is compressed.
compressorName := ic.c.blobInfoCache.DigestCompressorName(blobInfo.Digest)
if compressorName == internalblobinfocache.UnknownCompression {
return types.BlobInfo{}, "", errors.Errorf("Tried to reuse blob %s with unknown compression", blobInfo.Digest)
}
compressionOperation, compressionAlgorithm, err := internalblobinfocache.OperationAndAlgorithmForCompressor(compressorName)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error determining type of compression for reused blob with digest %s", blobInfo.Digest)
}
blobInfo.CompressionOperation = compressionOperation
blobInfo.CompressionAlgorithm = compressionAlgorithm

logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
Expand Down Expand Up @@ -1129,6 +1141,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// This is safe because we have just computed diffIDResult.Digest ourselves, and in the process
// we have read all of the input blob, so srcInfo.Digest must have been validated by digestingReader.
ic.c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, diffIDResult.digest)
ic.c.blobInfoCache.RecordDigestCompressorName(diffIDResult.digest, internalblobinfocache.Uncompressed)
diffID = diffIDResult.digest
}
}
Expand Down Expand Up @@ -1354,7 +1367,11 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
uploadCompressorName = srcCompressorName
uploadCompressionFormat = nil
if isCompressed {
uploadCompressionFormat = &compressionFormat
} else {
uploadCompressionFormat = nil
}
}

// === Encrypt the stream for valid mediatypes if ociEncryptConfig provided
Expand Down
30 changes: 14 additions & 16 deletions docker/docker_image_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,18 +295,22 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}

// First, check whether the blob happens to already exist at the destination.
exists, size, err := d.blobExists(ctx, d.ref.ref, info.Digest, nil)
if err != nil {
return false, types.BlobInfo{}, err
}
if exists {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, MediaType: info.MediaType, Size: size}, nil
// First, if we know how it'd be compressed check whether the blob happens to already exist
// at the destination. If we don't know, we want to fall back to trying known locations
// with known compression types.
bic := blobinfocache.FromBlobInfoCache(cache)
if compressorName := bic.DigestCompressorName(info.Digest); compressorName != blobinfocache.UnknownCompression {
exists, size, err := d.blobExists(ctx, d.ref.ref, info.Digest, nil)
if err != nil {
return false, types.BlobInfo{}, err
}
if exists {
cache.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, newBICLocationReference(d.ref))
return true, types.BlobInfo{Digest: info.Digest, MediaType: info.MediaType, Size: size}, nil
}
}

// Then try reusing blobs from other locations.
bic := blobinfocache.FromBlobInfoCache(cache)
candidates := bic.CandidateLocations2(d.ref.Transport(), bicTransportScope(d.ref), info.Digest, canSubstitute)
for _, candidate := range candidates {
candidateRepo, err := parseBICLocationReference(candidate.Location)
Expand Down Expand Up @@ -363,13 +367,7 @@ func (d *dockerImageDestination) TryReusingBlob(ctx context.Context, info types.

bic.RecordKnownLocation(d.ref.Transport(), bicTransportScope(d.ref), candidate.Digest, newBICLocationReference(d.ref))

compressionOperation, compressionAlgorithm, err := blobinfocache.OperationAndAlgorithmForCompressor(candidate.CompressorName)
if err != nil {
logrus.Debugf("... Failed: %v", err)
continue
}

return true, types.BlobInfo{Digest: candidate.Digest, MediaType: info.MediaType, Size: size, CompressionOperation: compressionOperation, CompressionAlgorithm: compressionAlgorithm}, nil
return true, types.BlobInfo{Digest: candidate.Digest, MediaType: info.MediaType, Size: size}, nil
}

return false, types.BlobInfo{}, nil
Expand Down
4 changes: 4 additions & 0 deletions internal/blobinfocache/blobinfocache.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ type v1OnlyBlobInfoCache struct {
func (bic *v1OnlyBlobInfoCache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
}

func (bic *v1OnlyBlobInfoCache) DigestCompressorName(anyDigest digest.Digest) string {
return UnknownCompression
}

func (bic *v1OnlyBlobInfoCache) CandidateLocations2(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, canSubstitute bool) []BICReplacementCandidate2 {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions internal/blobinfocache/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type BlobInfoCache2 interface {
// otherwise the cache could be poisoned and cause us to make incorrect edits to type
// information in a manifest.
RecordDigestCompressorName(anyDigest digest.Digest, compressorName string)
// DigestCompressorName returns the type of compression that we know is applied to the
// blob with the specified digest, or Uncompressed or UnknownCompression.
DigestCompressorName(anyDigest digest.Digest) string
// CandidateLocations2 returns a prioritized, limited, number of blobs and their locations
// that could possibly be reused within the specified (transport scope) (if they still
// exist, which is not guaranteed).
Expand Down
20 changes: 20 additions & 0 deletions pkg/blobinfocache/boltdb/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,26 @@ func (bdc *cache) RecordDigestCompressorName(anyDigest digest.Digest, compressor
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// DigestCompressorName returns the type of compression that we know is applied to the
// blob with the specified digest, or Uncompressed or UnknownCompression.
func (bdc *cache) DigestCompressorName(anyDigest digest.Digest) string {
compressorName := blobinfocache.UnknownCompression
if err := bdc.view(func(tx *bolt.Tx) error {
compressionBucket := tx.Bucket(digestCompressorBucket)
if compressionBucket == nil {
return nil
}
digestKey := []byte(anyDigest.String())
if compressorNameValue := compressionBucket.Get(digestKey); len(compressorNameValue) > 0 {
compressorName = string(compressorNameValue)
}
return nil
}); err != nil { // Including os.IsNotExist(err)
return blobinfocache.UnknownCompression // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return compressorName
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (bdc *cache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, blobDigest digest.Digest, location types.BICLocationReference) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/blobinfocache/internal/test/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa
for _, e := range digestNameSet {
cache.RecordDigestCompressorName(e.d, blobinfocache.UnknownCompression)
}
for _, e := range digestNameSet {
assert.Equal(t, blobinfocache.UnknownCompression, cache.DigestCompressorName(e.d))
}
}

// No substitutions allowed:
Expand Down Expand Up @@ -276,6 +279,11 @@ func testGenericCandidateLocations2(t *testing.T, cache blobinfocache.BlobInfoCa
cache.RecordDigestCompressorName(e.d, e.m)
}

// Check that we can retrieve compression values
for _, e := range digestNameSet {
assert.Equal(t, e.m, cache.DigestCompressorName(e.d))
}

// No substitutions allowed:
for _, e := range digestNameSet {
assertCandidatesMatch(t, scopeName, []candidate{
Expand Down
11 changes: 11 additions & 0 deletions pkg/blobinfocache/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,17 @@ func (mem *cache) RecordDigestCompressorName(blobDigest digest.Digest, compresso
mem.compressors[blobDigest] = compressorName
}

// DigestCompressorName returns the type of compression that we know is applied to the
// blob with the specified digest, or Uncompressed or UnknownCompression.
func (mem *cache) DigestCompressorName(blobDigest digest.Digest) string {
mem.mutex.Lock()
defer mem.mutex.Unlock()
if compressorName, ok := mem.compressors[blobDigest]; ok {
return compressorName
}
return blobinfocache.UnknownCompression
}

// appendReplacementCandiates creates prioritize.CandidateWithTime values for (transport, scope, digest), and returns the result of appending them to candidates.
func (mem *cache) appendReplacementCandidates(candidates []prioritize.CandidateWithTime, transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, requireCompressionInfo bool) []prioritize.CandidateWithTime {
locations := mem.knownLocations[locationKey{transport: transport.Name(), scope: scope, blobDigest: digest}] // nil if not present
Expand Down
3 changes: 3 additions & 0 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/image"
"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/tmpdir"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache/none"
Expand Down Expand Up @@ -452,6 +453,8 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader,
}
// This is safe because we have just computed both values ourselves.
cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest())
bic := blobinfocache.FromBlobInfoCache(cache)
bic.RecordDigestCompressorName(diffID.Digest(), blobinfocache.Uncompressed)
return types.BlobInfo{
Digest: blobDigest,
Size: blobSize,
Expand Down

0 comments on commit 21ae051

Please sign in to comment.