Skip to content

Commit

Permalink
Merge pull request #1468 from mtrmac/storage-private-primary
Browse files Browse the repository at this point in the history
Reorganize storageImageDestination to prioritize the private …WithOptions methods
  • Loading branch information
vrothberg committed Feb 15, 2022
2 parents 721e7b7 + 4da6615 commit 1ca5fad
Showing 1 changed file with 62 additions and 60 deletions.
122 changes: 62 additions & 60 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ func (s *storageImageDestination) computeNextBlobCacheFile() string {
return filepath.Join(s.directory, fmt.Sprintf("%d", atomic.AddInt32(&s.nextTempFileID, 1)))
}

// HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently.
func (s *storageImageDestination) HasThreadSafePutBlob() bool {
return true
}

// PutBlobWithOptions writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents.
// inputInfo.Size is the expected length of stream, if known.
Expand All @@ -454,7 +459,7 @@ func (s *storageImageDestination) computeNextBlobCacheFile() string {
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
info, err := s.PutBlob(ctx, stream, blobinfo, options.Cache, options.IsConfig)
info, err := s.putBlobToPendingFile(ctx, stream, blobinfo, &options)
if err != nil {
return info, err
}
Expand All @@ -466,11 +471,6 @@ func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream
return info, s.queueOrCommit(ctx, info, *options.LayerIndex, options.EmptyLayer)
}

// HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently.
func (s *storageImageDestination) HasThreadSafePutBlob() bool {
return true
}

// PutBlob writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents.
// inputInfo.Size is the expected length of stream, if known.
Expand All @@ -480,6 +480,15 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool {
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
return s.PutBlobWithOptions(ctx, stream, blobinfo, private.PutBlobOptions{
Cache: cache,
IsConfig: isConfig,
})
}

// putBlobToPendingFile implements ImageDestination.PutBlobWithOptions, storing stream into an on-disk file.
// The caller must arrange the blob to be eventually commited using s.commitLayer().
func (s *storageImageDestination) putBlobToPendingFile(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, options *private.PutBlobOptions) (types.BlobInfo, error) {
// Stores a layer or data blob in our temporary directory, checking that any information
// in the blobinfo matches the incoming data.
errorBlobInfo := types.BlobInfo{
Expand Down Expand Up @@ -534,57 +543,14 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader,
s.lock.Unlock()
// This is safe because we have just computed diffID, and blobDigest was either computed
// by us, or validated by the caller (usually copy.digestingReader).
cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest())
options.Cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest())
return types.BlobInfo{
Digest: blobDigest,
Size: blobSize,
MediaType: blobinfo.MediaType,
}, nil
}

// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context, blobinfo types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
reused, info, err := s.tryReusingBlobWithSrcRef(ctx, blobinfo, options.Cache, options.CanSubstitute, options.SrcRef)
if err != nil || !reused || options.LayerIndex == nil {
return reused, info, err
}

return reused, info, s.queueOrCommit(ctx, info, *options.LayerIndex, options.EmptyLayer)
}

// tryReusingBlobWithSrcRef is a wrapper around TryReusingBlob.
// If ref is provided, this function first tries to get layer from Additional Layer Store.
func (s *storageImageDestination) tryReusingBlobWithSrcRef(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool, ref reference.Named) (bool, types.BlobInfo, error) {
// lock the entire method as it executes fairly quickly
s.lock.Lock()
defer s.lock.Unlock()

if ref != nil {
// Check if we have the layer in the underlying additional layer store.
aLayer, err := s.imageRef.transport.store.LookupAdditionalLayer(blobinfo.Digest, ref.String())
if err != nil && errors.Cause(err) != storage.ErrLayerUnknown {
return false, types.BlobInfo{}, errors.Wrapf(err, `looking for compressed layers with digest %q and labels`, blobinfo.Digest)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[blobinfo.Digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[blobinfo.Digest] = aLayer
return true, types.BlobInfo{
Digest: blobinfo.Digest,
Size: aLayer.CompressedSize(),
MediaType: blobinfo.MediaType,
}, nil
}
}

return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute)
}

type zstdFetcher struct {
chunkAccessor private.BlobChunkAccessor
ctx context.Context
Expand Down Expand Up @@ -643,6 +609,22 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
return srcInfo, nil
}

// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context, blobinfo types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
reused, info, err := s.tryReusingBlobAsPending(ctx, blobinfo, &options)
if err != nil || !reused || options.LayerIndex == nil {
return reused, info, err
}

return reused, info, s.queueOrCommit(ctx, info, *options.LayerIndex, options.EmptyLayer)
}

// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
Expand All @@ -653,16 +635,36 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
return s.TryReusingBlobWithOptions(ctx, blobinfo, private.TryReusingBlobOptions{
Cache: cache,
CanSubstitute: canSubstitute,
})
}

// tryReusingBlobAsPending implements TryReusingBlobWithOptions, filling s.blobDiffIDs and other metadata.
// The caller must arrange the blob to be eventually commited using s.commitLayer().
func (s *storageImageDestination) tryReusingBlobAsPending(ctx context.Context, blobinfo types.BlobInfo, options *private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
// lock the entire method as it executes fairly quickly
s.lock.Lock()
defer s.lock.Unlock()

return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute)
}
if options.SrcRef != nil {
// Check if we have the layer in the underlying additional layer store.
aLayer, err := s.imageRef.transport.store.LookupAdditionalLayer(blobinfo.Digest, options.SrcRef.String())
if err != nil && errors.Cause(err) != storage.ErrLayerUnknown {
return false, types.BlobInfo{}, errors.Wrapf(err, `looking for compressed layers with digest %q and labels`, blobinfo.Digest)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[blobinfo.Digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[blobinfo.Digest] = aLayer
return true, types.BlobInfo{
Digest: blobinfo.Digest,
Size: aLayer.CompressedSize(),
MediaType: blobinfo.MediaType,
}, nil
}
}

// tryReusingBlobLocked implements a core functionality of TryReusingBlob.
// This must be called with a lock being held on storageImageDestination.
func (s *storageImageDestination) tryReusingBlobLocked(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if blobinfo.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`)
}
Expand Down Expand Up @@ -711,9 +713,9 @@ func (s *storageImageDestination) tryReusingBlobLocked(ctx context.Context, blob

// Does the blob correspond to a known DiffID which we already have available?
// Because we must return the size, which is unknown for unavailable compressed blobs, the returned BlobInfo refers to the
// uncompressed layer, and that can happen only if canSubstitute, or if the incoming manifest already specifies the size.
if canSubstitute || blobinfo.Size != -1 {
if uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest {
// uncompressed layer, and that can happen only if options.CanSubstitute, or if the incoming manifest already specifies the size.
if options.CanSubstitute || blobinfo.Size != -1 {
if uncompressedDigest := options.Cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest {
layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(uncompressedDigest)
if err != nil && errors.Cause(err) != storage.ErrLayerUnknown {
return false, types.BlobInfo{}, errors.Wrapf(err, `looking for layers with digest %q`, uncompressedDigest)
Expand All @@ -723,8 +725,8 @@ func (s *storageImageDestination) tryReusingBlobLocked(ctx context.Context, blob
s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest
return true, blobinfo, nil
}
if !canSubstitute {
return false, types.BlobInfo{}, fmt.Errorf("Internal error: canSubstitute was expected to be true for blobInfo %v", blobinfo)
if !options.CanSubstitute {
return false, types.BlobInfo{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blobInfo %v", blobinfo)
}
s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest
return true, types.BlobInfo{
Expand Down

0 comments on commit 1ca5fad

Please sign in to comment.