Skip to content

Commit

Permalink
Implement private.ImageDestination in blobCacheDestination
Browse files Browse the repository at this point in the history
Signed-off-by: Miloslav Trmač <mitr@redhat.com>
  • Loading branch information
mtrmac committed Jul 5, 2022
1 parent f7703dd commit 305e6df
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 14 deletions.
9 changes: 5 additions & 4 deletions pkg/blobcache/blobcache_test.go
Expand Up @@ -29,10 +29,11 @@ import (
)

var (
_ types.ImageReference = &BlobCache{}
_ types.ImageSource = &blobCacheSource{}
_ private.ImageSource = (*blobCacheSource)(nil)
_ types.ImageDestination = &blobCacheDestination{}
_ types.ImageReference = &BlobCache{}
_ types.ImageSource = &blobCacheSource{}
_ private.ImageSource = (*blobCacheSource)(nil)
_ types.ImageDestination = &blobCacheDestination{}
_ private.ImageDestination = (*blobCacheDestination)(nil)
)

func TestMain(m *testing.M) {
Expand Down
61 changes: 51 additions & 10 deletions pkg/blobcache/dest.go
Expand Up @@ -8,6 +8,10 @@ import (
"path/filepath"
"sync"

"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/imagedestination"
"github.com/containers/image/v5/internal/imagedestination/impl"
"github.com/containers/image/v5/internal/private"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/transports"
"github.com/containers/image/v5/types"
Expand All @@ -19,8 +23,10 @@ import (
)

type blobCacheDestination struct {
impl.Compat

reference *BlobCache
destination types.ImageDestination
destination private.ImageDestination
}

func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) {
Expand All @@ -29,7 +35,9 @@ func (b *BlobCache) NewImageDestination(ctx context.Context, sys *types.SystemCo
return nil, perrors.Wrapf(err, "error creating new image destination %q", transports.ImageName(b.reference))
}
logrus.Debugf("starting to write to image %q using blob cache in %q", transports.ImageName(b.reference), b.directory)
return &blobCacheDestination{reference: b, destination: dest}, nil
d := &blobCacheDestination{reference: b, destination: imagedestination.FromPublic(dest)}
d.Compat = impl.AddCompat(d)
return d, nil
}

func (d *blobCacheDestination) Reference() types.ImageReference {
Expand Down Expand Up @@ -119,7 +127,14 @@ func (d *blobCacheDestination) HasThreadSafePutBlob() bool {
return d.destination.HasThreadSafePutBlob()
}

func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
// PutBlobWithOptions writes contents of stream and returns data representing the result.
// inputInfo.Digest can be optionally provided if known; if provided, and stream is read to the end without error, the digest MUST match the stream contents.
// inputInfo.Size is the expected length of stream, if known.
// inputInfo.MediaType describes the blob format, if known.
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *blobCacheDestination) PutBlobWithOptions(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, options private.PutBlobOptions) (types.BlobInfo, error) {
var tempfile *os.File
var err error
var n int
Expand All @@ -129,7 +144,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in
needToWait := false
compression := archive.Uncompressed
if inputInfo.Digest != "" {
filename := d.reference.blobPath(inputInfo.Digest, isConfig)
filename := d.reference.blobPath(inputInfo.Digest, options.IsConfig)
tempfile, err = os.CreateTemp(filepath.Dir(filename), filepath.Base(filename))
if err == nil {
stream = io.TeeReader(stream, tempfile)
Expand All @@ -151,7 +166,7 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in
} else {
logrus.Debugf("error while creating a temporary file under %q to hold blob %q: %v", filepath.Dir(filename), inputInfo.Digest.String(), err)
}
if !isConfig {
if !options.IsConfig {
initial := make([]byte, 8)
n, err = stream.Read(initial)
if n > 0 {
Expand All @@ -177,13 +192,13 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in
// Let saveStream() close the reading end and handle the temporary file.
wg.Add(1)
needToWait = true
go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, isConfig, &alternateDigest)
go d.saveStream(wg, decompressReader, decompressedTemp, filename, inputInfo.Digest, options.IsConfig, &alternateDigest)
}
}
}
}
}
newBlobInfo, err := d.destination.PutBlob(ctx, stream, inputInfo, cache, isConfig)
newBlobInfo, err := d.destination.PutBlobWithOptions(ctx, stream, inputInfo, options)
if closer != nil {
closer.Close()
}
Expand All @@ -201,8 +216,29 @@ func (d *blobCacheDestination) PutBlob(ctx context.Context, stream io.Reader, in
return newBlobInfo, nil
}

func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
present, reusedInfo, err := d.destination.TryReusingBlob(ctx, info, cache, canSubstitute)
// SupportsPutBlobPartial returns true if PutBlobPartial is supported.
func (d *blobCacheDestination) SupportsPutBlobPartial() bool {
return d.destination.SupportsPutBlobPartial()
}

// PutBlobPartial attempts to create a blob using the data that is already present
// at the destination. chunkAccessor is accessed in a non-sequential way to retrieve the missing chunks.
// It is available only if SupportsPutBlobPartial().
// Even if SupportsPutBlobPartial() returns true, the call can fail, in which case the caller
// should fall back to PutBlobWithOptions.
func (d *blobCacheDestination) PutBlobPartial(ctx context.Context, chunkAccessor private.BlobChunkAccessor, srcInfo types.BlobInfo, cache blobinfocache.BlobInfoCache2) (types.BlobInfo, error) {
return d.destination.PutBlobPartial(ctx, chunkAccessor, srcInfo, cache)
}

// TryReusingBlobWithOptions checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If the blob has been successfully reused, returns (true, info, nil); info must contain at least a digest and size, and may
// include CompressionOperation and CompressionAlgorithm fields to indicate that a change to the compression type should be
// reflected in the manifest that will be written.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
func (d *blobCacheDestination) TryReusingBlobWithOptions(ctx context.Context, info types.BlobInfo, options private.TryReusingBlobOptions) (bool, types.BlobInfo, error) {
present, reusedInfo, err := d.destination.TryReusingBlobWithOptions(ctx, info, options)
if err != nil || present {
return present, reusedInfo, err
}
Expand All @@ -215,7 +251,12 @@ func (d *blobCacheDestination) TryReusingBlob(ctx context.Context, info types.Bl
f, err := os.Open(blobPath)
if err == nil {
defer f.Close()
uploadedInfo, err := d.destination.PutBlob(ctx, f, info, cache, isConfig)
uploadedInfo, err := d.destination.PutBlobWithOptions(ctx, f, info, private.PutBlobOptions{
Cache: options.Cache,
IsConfig: isConfig,
EmptyLayer: options.EmptyLayer,
LayerIndex: options.LayerIndex,
})
if err != nil {
return false, types.BlobInfo{}, err
}
Expand Down

0 comments on commit 305e6df

Please sign in to comment.