diff --git a/pkg/blobcache/blobcache_test.go b/pkg/blobcache/blobcache_test.go index 63eb3b53e..7f9d0f9ba 100644 --- a/pkg/blobcache/blobcache_test.go +++ b/pkg/blobcache/blobcache_test.go @@ -29,10 +29,11 @@ import ( ) var ( - _ types.ImageReference = &BlobCache{} - _ types.ImageSource = &blobCacheSource{} - _ private.ImageSource = (*blobCacheSource)(nil) - _ types.ImageDestination = &blobCacheDestination{} + _ types.ImageReference = &BlobCache{} + _ types.ImageSource = &blobCacheSource{} + _ private.ImageSource = (*blobCacheSource)(nil) + _ types.ImageDestination = &blobCacheDestination{} + _ private.ImageDestination = (*blobCacheDestination)(nil) ) func TestMain(m *testing.M) { diff --git a/pkg/blobcache/dest.go b/pkg/blobcache/dest.go index 68e2b6652..8bae8a24b 100644 --- a/pkg/blobcache/dest.go +++ b/pkg/blobcache/dest.go @@ -8,6 +8,10 @@ import ( "path/filepath" "sync" + "github.com/containers/image/v5/internal/blobinfocache" + "github.com/containers/image/v5/internal/imagedestination" + "github.com/containers/image/v5/internal/imagedestination/impl" + "github.com/containers/image/v5/internal/private" "github.com/containers/image/v5/manifest" "github.com/containers/image/v5/transports" "github.com/containers/image/v5/types" @@ -19,8 +23,10 @@ import ( ) type blobCacheDestination struct { + impl.Compat + reference *BlobCache - destination types.ImageDestination + destination private.ImageDestination } func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) { @@ -29,7 +35,9 @@ func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemCo return nil, perrors.Wrapf(err, "error creating new image destination %q", transports.ImageName(b.reference)) } logrus.Debugf("starting to write to image %q using blob cache in %q", transports.ImageName(b.reference), b.directory) - return &blobCacheDestination{reference: b, destination: dest}, nil + d := &blobCacheDestination{reference: b, destination: imagedestination.FromPublic(dest)} + d.Compat = impl.AddCompat(d) + return d, nil } func (d *blobCacheDestination) Reference() types.ImageReference { @@ -119,7 +127,14 @@ func (d *blobCacheDestination) HasThreadSafePutBlob() bool { return d.destination.HasThreadSafePutBlob() } -func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) { +// PutBlobWithOptions writes contents of stream and returns data representing the result. +// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents. +// inputInfo.Size is the expected length of stream, if known. +// inputInfo.MediaType describes the blob format, if known. +// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available +// to any other readers for download using the supplied digest. +// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far. +func (d *blobCacheDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) { var tempfile *os.File var err error var n int @@ -129,7 +144,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in needToWait := false compression := archive.Uncompressed if inputInfo.Digest != "" { - filename := d.reference.blobPath(inputInfo.Digest, isConfig) + filename := d.reference.blobPath(inputInfo.Digest, options.IsConfig) tempfile, err = os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)) if err == nil { stream = io.TeeReader(stream, tempfile) @@ -151,7 +166,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in } else { logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", filepath.Dir(filename), inputInfo.Digest.String(), err) } - if !isConfig { + if !options.IsConfig { initial := make([]byte, 8) n, err = stream.Read(initial) if n > 0 { @@ -177,13 +192,13 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in // Let saveStream() close the reading end and handle the temporary file. wg.Add(1) needToWait = true - go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest) + go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, options.IsConfig, &alternateDigest) } } } } } - newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig) + newBlobInfo, err := d.destination.PutBlobWithOptions(ctx, stream, inputInfo, options) if closer != nil { closer.Close() } @@ -201,8 +216,29 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in return newBlobInfo, nil } -func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) { - present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute) +// SupportsPutBlobPartial returns true if PutBlobPartial is supported. +func (d *blobCacheDestination) SupportsPutBlobPartial() bool { + return d.destination.SupportsPutBlobPartial() +} + +// PutBlobPartial attempts to create a blob using the data that is already present +// at the destination. chunkAccessor is accessed in a non-sequential way to retrieve the missing chunks. +// It is available only if SupportsPutBlobPartial(). +// Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller +// should fall back to PutBlobWithOptions. +func (d *blobCacheDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (types.BlobInfo, error) { + return d.destination.PutBlobPartial(ctx, chunkAccessor, srcInfo, cache) +} + +// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination +// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree). +// info.Digest must not be empty. +// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may +// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be +// reflected in the manifest that will be written. +// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure. +func (d *blobCacheDestination) TryReusingBlobWithOptions(ctx context.Context, info types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) { + present, reusedInfo, err := d.destination.TryReusingBlobWithOptions(ctx, info, options) if err != nil || present { return present, reusedInfo, err } @@ -215,7 +251,12 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl f, err := os.Open(blobPath) if err == nil { defer f.Close() - uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig) + uploadedInfo, err := d.destination.PutBlobWithOptions(ctx, f, info, private.PutBlobOptions{ + Cache: options.Cache, + IsConfig: isConfig, + EmptyLayer: options.EmptyLayer, + LayerIndex: options.LayerIndex, + }) if err != nil { return false, types.BlobInfo{}, err }