Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Record (TOC digest → DiffID) mapping in BlobInfoCache #2321

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
10 changes: 10 additions & 0 deletions copy/compression.go
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
chunkedToc "github.com/containers/storage/pkg/chunked/toc"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -308,6 +309,15 @@ func (d *bpCompressionStepData) recordValidatedDigestData(c *copier, uploadedInf
// No useful information
case bpcOpCompressUncompressed:
c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest)
if d.uploadedAnnotations != nil {
tocDigest, err := chunkedToc.GetTOCDigest(d.uploadedAnnotations)
if err != nil {
return fmt.Errorf("parsing a just-created compression annotations: %w", err)
}
if tocDigest != nil {
c.blobInfoCache.RecordTOCUncompressedPair(*tocDigest, srcInfo.Digest)
}
}
case bpcOpDecompressCompressed:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest)
case bpcOpRecompressCompressed, bpcOpPreserveCompressed:
Expand Down
7 changes: 7 additions & 0 deletions internal/blobinfocache/blobinfocache.go
Expand Up @@ -27,6 +27,13 @@ func (bic *v1OnlyBlobInfoCache) Open() {
func (bic *v1OnlyBlobInfoCache) Close() {
}

func (bic *v1OnlyBlobInfoCache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
return ""
}

func (bic *v1OnlyBlobInfoCache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
}

func (bic *v1OnlyBlobInfoCache) RecordDigestCompressorName(anyDigest digest.Digest, compressorName string) {
}

Expand Down
9 changes: 9 additions & 0 deletions internal/blobinfocache/types.go
Expand Up @@ -26,6 +26,15 @@ type BlobInfoCache2 interface {
// Close destroys state created by Open().
Close()

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TOC digest is the checksum of the uncompressed JSON document, so I think the compression should not matter in this case

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree we probably don’t need that right now (with GetTOCDigest refusing to work on manifests which contain multiple TOC digest annotations, and presumably with the zstd / estargz code being unable to decompress the other one).

This comment is a looking a bit more into the future, for lookups in the other direction, where we will want to look up (UncompressedDigest → (compressed digest, TOC digest, algorithm)) and match that against “the user wants the destination to contain zstd:chunked” (i.e. reject estargz matches).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for lookups in the other direction,

That will be done in a separate data structure (an extension of RecordDigestCompressorName: We need the full set of annotations for reuse of a TOC-compressed blob, so this simple mapping is not sufficient anyway. And the other structure does record the algorithm.

// RecordDigestUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest)

// RecordDigestCompressorName records a compressor for the blob with the specified digest,
// or Uncompressed or UnknownCompression.
// WARNING: Only call this with LOCALLY VERIFIED data; don’t record a compressor for a
Expand Down
52 changes: 52 additions & 0 deletions pkg/blobinfocache/boltdb/boltdb.go
Expand Up @@ -25,6 +25,8 @@ var (

// uncompressedDigestBucket stores a mapping from any digest to an uncompressed digest.
uncompressedDigestBucket = []byte("uncompressedDigest")
// uncompressedDigestByTOCBucket stores a mapping from a TOC digest to an uncompressed digest.
uncompressedDigestByTOCBucket = []byte("uncompressedDigestByTOC")
// digestCompressorBucket stores a mapping from any digest to a compressor, or blobinfocache.Uncompressed (not blobinfocache.UnknownCompression).
// It may not exist in caches created by older versions, even if uncompressedDigestBucket is present.
digestCompressorBucket = []byte("digestCompressor")
Expand Down Expand Up @@ -243,6 +245,56 @@ func (bdc *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompre
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (bdc *cache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
var res digest.Digest
if err := bdc.view(func(tx *bolt.Tx) error {
if b := tx.Bucket(uncompressedDigestByTOCBucket); b != nil {
if uncompressedBytes := b.Get([]byte(tocDigest.String())); uncompressedBytes != nil {
d, err := digest.Parse(string(uncompressedBytes))
if err == nil {
res = d
return nil
}
// FIXME? Log err (but throttle the log volume on repeated accesses)?
}
}
res = ""
return nil
}); err != nil { // Including os.IsNotExist(err)
return "" // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return res
}

// RecordDigestUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (bdc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
_ = bdc.update(func(tx *bolt.Tx) error {
b, err := tx.CreateBucketIfNotExists(uncompressedDigestByTOCBucket)
if err != nil {
return err
}
key := []byte(tocDigest.String())
if previousBytes := b.Get(key); previousBytes != nil {
previous, err := digest.Parse(string(previousBytes))
if err != nil {
return err
}
if previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob with TOC %q previously recorded as %q, now %q", tocDigest, previous, uncompressed)
}
}
if err := b.Put(key, []byte(uncompressed.String())); err != nil {
return err
}
return nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// RecordDigestCompressorName records that the blob with digest anyDigest was compressed with the specified
// compressor, or is blobinfocache.Uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
Expand Down
24 changes: 24 additions & 0 deletions pkg/blobinfocache/internal/test/test.go
Expand Up @@ -43,6 +43,8 @@ func GenericCache(t *testing.T, newTestCache func(t *testing.T) blobinfocache.Bl
}{
{"UncompressedDigest", testGenericUncompressedDigest},
{"RecordDigestUncompressedPair", testGenericRecordDigestUncompressedPair},
{"UncompressedDigestForTOC", testGenericUncompressedDigestForTOC},
{"RecordTOCUncompressedPair", testGenericRecordTOCUncompressedPair},
{"RecordKnownLocations", testGenericRecordKnownLocations},
{"CandidateLocations", testGenericCandidateLocations},
{"CandidateLocations2", testGenericCandidateLocations2},
Expand Down Expand Up @@ -99,6 +101,28 @@ func testGenericRecordDigestUncompressedPair(t *testing.T, cache blobinfocache.B
}
}

func testGenericUncompressedDigestForTOC(t *testing.T, cache blobinfocache.BlobInfoCache2) {
// Nothing is known.
assert.Equal(t, digest.Digest(""), cache.UncompressedDigestForTOC(digestUnknown))

cache.RecordTOCUncompressedPair(digestCompressedA, digestUncompressed)
cache.RecordTOCUncompressedPair(digestCompressedB, digestUncompressed)
// Known TOC→uncompressed mapping
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedA))
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedB))
}

func testGenericRecordTOCUncompressedPair(t *testing.T, cache blobinfocache.BlobInfoCache2) {
for i := 0; i < 2; i++ { // Record the same data twice to ensure redundant writes don’t break things.
// Known TOC→uncompressed mapping
cache.RecordTOCUncompressedPair(digestCompressedA, digestUncompressed)
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedA))
// Two mappings to the same uncompressed digest
cache.RecordTOCUncompressedPair(digestCompressedB, digestUncompressed)
assert.Equal(t, digestUncompressed, cache.UncompressedDigestForTOC(digestCompressedB))
}
}

func testGenericRecordKnownLocations(t *testing.T, cache blobinfocache.BlobInfoCache2) {
transport := mocks.NameImageTransport("==BlobInfocache transport mock")
for i := 0; i < 2; i++ { // Record the same data twice to ensure redundant writes don’t break things.
Expand Down
42 changes: 34 additions & 8 deletions pkg/blobinfocache/memory/memory.go
Expand Up @@ -24,10 +24,11 @@ type locationKey struct {
type cache struct {
mutex sync.Mutex
// The following fields can only be accessed with mutex held.
uncompressedDigests map[digest.Digest]digest.Digest
digestsByUncompressed map[digest.Digest]*set.Set[digest.Digest] // stores a set of digests for each uncompressed digest
knownLocations map[locationKey]map[types.BICLocationReference]time.Time // stores last known existence time for each location reference
compressors map[digest.Digest]string // stores a compressor name, or blobinfocache.Unknown (not blobinfocache.UnknownCompression), for each digest
uncompressedDigests map[digest.Digest]digest.Digest
uncompressedDigestsByTOC map[digest.Digest]digest.Digest
digestsByUncompressed map[digest.Digest]*set.Set[digest.Digest] // stores a set of digests for each uncompressed digest
knownLocations map[locationKey]map[types.BICLocationReference]time.Time // stores last known existence time for each location reference
compressors map[digest.Digest]string // stores a compressor name, or blobinfocache.Unknown (not blobinfocache.UnknownCompression), for each digest
}

// New returns a BlobInfoCache implementation which is in-memory only.
Expand All @@ -44,10 +45,11 @@ func New() types.BlobInfoCache {

func new2() *cache {
return &cache{
uncompressedDigests: map[digest.Digest]digest.Digest{},
digestsByUncompressed: map[digest.Digest]*set.Set[digest.Digest]{},
knownLocations: map[locationKey]map[types.BICLocationReference]time.Time{},
compressors: map[digest.Digest]string{},
uncompressedDigests: map[digest.Digest]digest.Digest{},
uncompressedDigestsByTOC: map[digest.Digest]digest.Digest{},
digestsByUncompressed: map[digest.Digest]*set.Set[digest.Digest]{},
knownLocations: map[locationKey]map[types.BICLocationReference]time.Time{},
compressors: map[digest.Digest]string{},
}
}

Expand Down Expand Up @@ -104,6 +106,30 @@ func (mem *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompre
anyDigestSet.Add(anyDigest)
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (mem *cache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
mem.mutex.Lock()
defer mem.mutex.Unlock()
if d, ok := mem.uncompressedDigestsByTOC[tocDigest]; ok {
return d
}
return ""
}

// RecordDigestUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (mem *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
mem.mutex.Lock()
defer mem.mutex.Unlock()
if previous, ok := mem.uncompressedDigestsByTOC[tocDigest]; ok && previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob with TOC %q previously recorded as %q, now %q", tocDigest, previous, uncompressed)
}
mem.uncompressedDigestsByTOC[tocDigest] = uncompressed
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (mem *cache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, blobDigest digest.Digest, location types.BICLocationReference) {
Expand Down
13 changes: 13 additions & 0 deletions pkg/blobinfocache/none/none.go
Expand Up @@ -34,6 +34,19 @@ func (noCache) UncompressedDigest(anyDigest digest.Digest) digest.Digest {
func (noCache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompressed digest.Digest) {
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (noCache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
return ""
}

// RecordDigestUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (noCache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (noCache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, blobDigest digest.Digest, location types.BICLocationReference) {
Expand Down
59 changes: 59 additions & 0 deletions pkg/blobinfocache/sqlite/sqlite.go
Expand Up @@ -295,6 +295,14 @@ func ensureDBHasCurrentSchema(db *sql.DB) error {
`PRIMARY KEY (transport, scope, digest, location)
)`,
},
{
"DigestTOCUncompressedPairs",
`CREATE TABLE IF NOT EXISTS DigestTOCUncompressedPairs(` +
// index implied by PRIMARY KEY
`tocDigest TEXT PRIMARY KEY NOT NULL,` +
`uncompressedDigest TEXT NOT NULL
)`,
},
}

_, err := dbTransaction(db, func(tx *sql.Tx) (void, error) {
Expand Down Expand Up @@ -385,6 +393,57 @@ func (sqc *cache) RecordDigestUncompressedPair(anyDigest digest.Digest, uncompre
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// UncompressedDigestForTOC returns an uncompressed digest corresponding to anyDigest.
// Returns "" if the uncompressed digest is unknown.
func (sqc *cache) UncompressedDigestForTOC(tocDigest digest.Digest) digest.Digest {
res, err := transaction(sqc, func(tx *sql.Tx) (digest.Digest, error) {
uncompressedString, found, err := querySingleValue[string](tx, "SELECT uncompressedDigest FROM DigestTOCUncompressedPairs WHERE tocDigest = ?", tocDigest.String())
if err != nil {
return "", err
}
if found {
d, err := digest.Parse(uncompressedString)
if err != nil {
return "", err
}
return d, nil

}
return "", nil
})
if err != nil {
return "" // FIXME? Log err (but throttle the log volume on repeated accesses)?
}
return res
}

// RecordDigestUncompressedPair records that the tocDigest corresponds to uncompressed.
// WARNING: Only call this for LOCALLY VERIFIED data; don’t record a digest pair just because some remote author claims so (e.g.
// because a manifest/config pair exists); otherwise the cache could be poisoned and allow substituting unexpected blobs.
// (Eventually, the DiffIDs in image config could detect the substitution, but that may be too late, and not all image formats contain that data.)
func (sqc *cache) RecordTOCUncompressedPair(tocDigest digest.Digest, uncompressed digest.Digest) {
_, _ = transaction(sqc, func(tx *sql.Tx) (void, error) {
previousString, gotPrevious, err := querySingleValue[string](tx, "SELECT uncompressedDigest FROM DigestTOCUncompressedPairs WHERE tocDigest = ?", tocDigest.String())
if err != nil {
return void{}, fmt.Errorf("looking for uncompressed digest for blob with TOC %q", tocDigest)
}
if gotPrevious {
previous, err := digest.Parse(previousString)
if err != nil {
return void{}, err
}
if previous != uncompressed {
logrus.Warnf("Uncompressed digest for blob with TOC %s previously recorded as %s, now %s", tocDigest, previous, uncompressed)
}
}
if _, err := tx.Exec("INSERT OR REPLACE INTO DigestTOCUncompressedPairs(tocDigest, uncompressedDigest) VALUES (?, ?)",
tocDigest.String(), uncompressed.String()); err != nil {
return void{}, fmt.Errorf("recording uncompressed digest %q for blob with TOC %q: %w", uncompressed, tocDigest, err)
}
return void{}, nil
}) // FIXME? Log error (but throttle the log volume on repeated accesses)?
}

// RecordKnownLocation records that a blob with the specified digest exists within the specified (transport, scope) scope,
// and can be reused given the opaque location data.
func (sqc *cache) RecordKnownLocation(transport types.ImageTransport, scope types.BICTransportScope, digest digest.Digest, location types.BICLocationReference) {
Expand Down