Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reorganize storageImageDestination to prioritize the private …WithOptions methods #1468

Merged
merged 12 commits into from
Feb 15, 2022
Merged
122 changes: 62 additions & 60 deletions storage/storage_image.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,11 @@ func (s *storageImageDestination) computeNextBlobCacheFile() string {
return filepath.Join(s.directory, fmt.Sprintf("%d", atomic.AddInt32(&s.nextTempFileID, 1)))
}

// HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently.
func (s *storageImageDestination) HasThreadSafePutBlob() bool {
return true
}

// PutBlobWithOptions writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents.
// inputInfo.Size is the expected length of stream, if known.
Expand All @@ -454,7 +459,7 @@ func (s *storageImageDestination) computeNextBlobCacheFile() string {
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
info, err := s.PutBlob(ctx, stream, blobinfo, options.Cache, options.IsConfig)
info, err := s.putBlobToPendingFile(ctx, stream, blobinfo, &options)
if err != nil {
return info, err
}
Expand All @@ -466,11 +471,6 @@ func (s *storageImageDestination) PutBlobWithOptions(ctx context.Context, stream
return info, s.queueOrCommit(ctx, info, *options.LayerIndex, options.EmptyLayer)
}

// HasThreadSafePutBlob indicates whether PutBlob can be executed concurrently.
func (s *storageImageDestination) HasThreadSafePutBlob() bool {
return true
}

// PutBlob writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents.
// inputInfo.Size is the expected length of stream, if known.
Expand All @@ -480,6 +480,15 @@ func (s *storageImageDestination) HasThreadSafePutBlob() bool {
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
return s.PutBlobWithOptions(ctx, stream, blobinfo, private.PutBlobOptions{
Cache: cache,
IsConfig: isConfig,
})
}

// putBlobToPendingFile implements ImageDestination.PutBlobWithOptions, storing stream into an on-disk file.
// The caller must arrange the blob to be eventually commited using s.commitLayer().
func (s *storageImageDestination) putBlobToPendingFile(ctx context.Context, stream io.Reader, blobinfo types.BlobInfo, options *private.PutBlobOptions) (types.BlobInfo, error) {
// Stores a layer or data blob in our temporary directory, checking that any information
// in the blobinfo matches the incoming data.
errorBlobInfo := types.BlobInfo{
Expand Down Expand Up @@ -534,57 +543,14 @@ func (s *storageImageDestination) PutBlob(ctx context.Context, stream io.Reader,
s.lock.Unlock()
// This is safe because we have just computed diffID, and blobDigest was either computed
// by us, or validated by the caller (usually copy.digestingReader).
cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest())
options.Cache.RecordDigestUncompressedPair(blobDigest, diffID.Digest())
return types.BlobInfo{
Digest: blobDigest,
Size: blobSize,
MediaType: blobinfo.MediaType,
}, nil
}

// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context, blobinfo types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
reused, info, err := s.tryReusingBlobWithSrcRef(ctx, blobinfo, options.Cache, options.CanSubstitute, options.SrcRef)
if err != nil || !reused || options.LayerIndex == nil {
return reused, info, err
}

return reused, info, s.queueOrCommit(ctx, info, *options.LayerIndex, options.EmptyLayer)
}

// tryReusingBlobWithSrcRef is a wrapper around TryReusingBlob.
// If ref is provided, this function first tries to get layer from Additional Layer Store.
func (s *storageImageDestination) tryReusingBlobWithSrcRef(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool, ref reference.Named) (bool, types.BlobInfo, error) {
// lock the entire method as it executes fairly quickly
s.lock.Lock()
defer s.lock.Unlock()

if ref != nil {
// Check if we have the layer in the underlying additional layer store.
aLayer, err := s.imageRef.transport.store.LookupAdditionalLayer(blobinfo.Digest, ref.String())
if err != nil && errors.Cause(err) != storage.ErrLayerUnknown {
return false, types.BlobInfo{}, errors.Wrapf(err, `looking for compressed layers with digest %q and labels`, blobinfo.Digest)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[blobinfo.Digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[blobinfo.Digest] = aLayer
return true, types.BlobInfo{
Digest: blobinfo.Digest,
Size: aLayer.CompressedSize(),
MediaType: blobinfo.MediaType,
}, nil
}
}

return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute)
}

type zstdFetcher struct {
chunkAccessor private.BlobChunkAccessor
ctx context.Context
Expand Down Expand Up @@ -643,6 +609,22 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
return srcInfo, nil
}

// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
func (s *storageImageDestination) TryReusingBlobWithOptions(ctx context.Context, blobinfo types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
reused, info, err := s.tryReusingBlobAsPending(ctx, blobinfo, &options)
if err != nil || !reused || options.LayerIndex == nil {
return reused, info, err
}

return reused, info, s.queueOrCommit(ctx, info, *options.LayerIndex, options.EmptyLayer)
}

// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
Expand All @@ -653,16 +635,36 @@ func (s *storageImageDestination) PutBlobPartial(ctx context.Context, chunkAcces
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (s *storageImageDestination) TryReusingBlob(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
return s.TryReusingBlobWithOptions(ctx, blobinfo, private.TryReusingBlobOptions{
Cache: cache,
CanSubstitute: canSubstitute,
})
}

// tryReusingBlobAsPending implements TryReusingBlobWithOptions, filling s.blobDiffIDs and other metadata.
// The caller must arrange the blob to be eventually commited using s.commitLayer().
func (s *storageImageDestination) tryReusingBlobAsPending(ctx context.Context, blobinfo types.BlobInfo, options *private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
// lock the entire method as it executes fairly quickly
s.lock.Lock()
defer s.lock.Unlock()

return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute)
}
if options.SrcRef != nil {
// Check if we have the layer in the underlying additional layer store.
aLayer, err := s.imageRef.transport.store.LookupAdditionalLayer(blobinfo.Digest, options.SrcRef.String())
if err != nil && errors.Cause(err) != storage.ErrLayerUnknown {
return false, types.BlobInfo{}, errors.Wrapf(err, `looking for compressed layers with digest %q and labels`, blobinfo.Digest)
} else if err == nil {
// Record the uncompressed value so that we can use it to calculate layer IDs.
s.blobDiffIDs[blobinfo.Digest] = aLayer.UncompressedDigest()
s.blobAdditionalLayer[blobinfo.Digest] = aLayer
return true, types.BlobInfo{
Digest: blobinfo.Digest,
Size: aLayer.CompressedSize(),
MediaType: blobinfo.MediaType,
}, nil
}
}

// tryReusingBlobLocked implements a core functionality of TryReusingBlob.
// This must be called with a lock being held on storageImageDestination.
func (s *storageImageDestination) tryReusingBlobLocked(ctx context.Context, blobinfo types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if blobinfo.Digest == "" {
return false, types.BlobInfo{}, errors.Errorf(`Can not check for a blob with unknown digest`)
}
Expand Down Expand Up @@ -711,9 +713,9 @@ func (s *storageImageDestination) tryReusingBlobLocked(ctx context.Context, blob

// Does the blob correspond to a known DiffID which we already have available?
// Because we must return the size, which is unknown for unavailable compressed blobs, the returned BlobInfo refers to the
// uncompressed layer, and that can happen only if canSubstitute, or if the incoming manifest already specifies the size.
if canSubstitute || blobinfo.Size != -1 {
if uncompressedDigest := cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest {
// uncompressed layer, and that can happen only if options.CanSubstitute, or if the incoming manifest already specifies the size.
if options.CanSubstitute || blobinfo.Size != -1 {
if uncompressedDigest := options.Cache.UncompressedDigest(blobinfo.Digest); uncompressedDigest != "" && uncompressedDigest != blobinfo.Digest {
layers, err := s.imageRef.transport.store.LayersByUncompressedDigest(uncompressedDigest)
if err != nil && errors.Cause(err) != storage.ErrLayerUnknown {
return false, types.BlobInfo{}, errors.Wrapf(err, `looking for layers with digest %q`, uncompressedDigest)
Expand All @@ -723,8 +725,8 @@ func (s *storageImageDestination) tryReusingBlobLocked(ctx context.Context, blob
s.blobDiffIDs[blobinfo.Digest] = layers[0].UncompressedDigest
return true, blobinfo, nil
}
if !canSubstitute {
return false, types.BlobInfo{}, fmt.Errorf("Internal error: canSubstitute was expected to be true for blobInfo %v", blobinfo)
if !options.CanSubstitute {
return false, types.BlobInfo{}, fmt.Errorf("Internal error: options.CanSubstitute was expected to be true for blobInfo %v", blobinfo)
}
s.blobDiffIDs[uncompressedDigest] = layers[0].UncompressedDigest
return true, types.BlobInfo{
Expand Down