Skip to content

Commit

Permalink
Forward all of private.ImageSource by blobCacheSource
Browse files Browse the repository at this point in the history
Also add support for reading blob chunks from a (complete,
not partial) cached blob.

Signed-off-by: Miloslav Trmač <mitr@redhat.com>
  • Loading branch information
mtrmac committed Jul 5, 2022
1 parent b3bf595 commit f7703dd
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 2 deletions.
2 changes: 2 additions & 0 deletions pkg/blobcache/blobcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
cp "github.com/containers/image/v5/copy"
"github.com/containers/image/v5/directory"
"github.com/containers/image/v5/internal/image"
"github.com/containers/image/v5/internal/private"
"github.com/containers/image/v5/pkg/blobinfocache/none"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/types"
Expand All @@ -30,6 +31,7 @@ import (
var (
_ types.ImageReference = &BlobCache{}
_ types.ImageSource = &blobCacheSource{}
_ private.ImageSource = (*blobCacheSource)(nil)
_ types.ImageDestination = &blobCacheDestination{}
)

Expand Down
89 changes: 87 additions & 2 deletions pkg/blobcache/src.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package blobcache

import (
"context"
"fmt"
"io"
"os"
"sync"

"github.com/containers/image/v5/internal/image"
"github.com/containers/image/v5/internal/imagesource"
"github.com/containers/image/v5/internal/private"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/compression"
"github.com/containers/image/v5/transports"
Expand All @@ -19,7 +22,7 @@ import (

type blobCacheSource struct {
reference *BlobCache
source types.ImageSource
source private.ImageSource
sys types.SystemContext
// this mutex synchronizes the counters below
mu sync.Mutex
Expand All @@ -34,7 +37,7 @@ func (b *BlobCache) NewImageSource(ctx context.Context, sys *types.SystemContext
return nil, perrors.Wrapf(err, "error creating new image source %q", transports.ImageName(b.reference))
}
logrus.Debugf("starting to read from image %q using blob cache in %q (compression=%v)", transports.ImageName(b.reference), b.directory, b.compress)
return &blobCacheSource{reference: b, source: src, sys: *sys}, nil
return &blobCacheSource{reference: b, source: imagesource.FromPublic(src), sys: *sys}, nil
}

func (s *blobCacheSource) Reference() types.ImageReference {
Expand Down Expand Up @@ -174,3 +177,85 @@ func (s *blobCacheSource) LayerInfosForCopy(ctx context.Context, instanceDigest

return infos, nil
}

// SupportsGetBlobAt() returns true if GetBlobAt (BlobChunkAccessor) is supported.
func (s *blobCacheSource) SupportsGetBlobAt() bool {
return s.source.SupportsGetBlobAt()
}

// streamChunksFromFile generates the channels returned by GetBlobAt for chunks of seekable file
func streamChunksFromFile(streams chan io.ReadCloser, errs chan error, file io.ReadSeekCloser,
chunks []private.ImageSourceChunk) {
defer close(streams)
defer close(errs)
defer file.Close()

for _, c := range chunks {
// Always seek to the desired offest; that way we don’t need to care about the consumer
// not reading all of the chunk, or about the position going backwards.
if _, err := file.Seek(int64(c.Offset), io.SeekStart); err != nil {
errs <- err
break
}
s := signalCloseReader{
closed: make(chan interface{}),
stream: io.LimitReader(file, int64(c.Length)),
}
streams <- s

// Wait until the stream is closed before going to the next chunk
<-s.closed
}
}

type signalCloseReader struct {
closed chan interface{}
stream io.Reader
}

func (s signalCloseReader) Read(p []byte) (int, error) {
return s.stream.Read(p)
}

func (s signalCloseReader) Close() error {
close(s.closed)
return nil
}

// GetBlobAt returns a sequential channel of readers that contain data for the requested
// blob chunks, and a channel that might get a single error value.
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
func (s *blobCacheSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
blobPath, _, _, err := s.reference.findBlob(info)
if err != nil {
return nil, nil, err
}
if blobPath != "" {
f, err := os.Open(blobPath)
if err == nil {
s.mu.Lock()
s.cacheHits++
s.mu.Unlock()
streams := make(chan io.ReadCloser)
errs := make(chan error)
go streamChunksFromFile(streams, errs, f, chunks)
return streams, errs, nil
}
if !os.IsNotExist(err) {
s.mu.Lock()
s.cacheErrors++
s.mu.Unlock()
return nil, nil, fmt.Errorf("checking for cache: %w", err)
}
}
s.mu.Lock()
s.cacheMisses++
s.mu.Unlock()
streams, errs, err := s.source.GetBlobAt(ctx, info, chunks)
if err != nil {
return streams, errs, fmt.Errorf("error reading blob chunks from source image %q: %w", transports.ImageName(s.reference), err)
}
return streams, errs, nil
}
57 changes: 57 additions & 0 deletions pkg/blobcache/src_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package blobcache

import (
"bytes"
"io"
"io/ioutil"
"testing"

"github.com/containers/image/v5/internal/private"
"github.com/stretchr/testify/assert"
)

func readNextStream(streams chan io.ReadCloser, errs chan error) ([]byte, error) {
select {
case r := <-streams:
if r == nil {
return nil, nil
}
defer r.Close()
return ioutil.ReadAll(r)
case err := <-errs:
return nil, err
}
}

// readSeekerNopCloser adds a no-op Close() method to a readSeeker
type readSeekerNopCloser struct {
io.ReadSeeker
}

func (c *readSeekerNopCloser) Close() error {
return nil
}

func TestStreamChunksFromFile(t *testing.T) {
file := &readSeekerNopCloser{bytes.NewReader([]byte("123456789"))}
streams := make(chan io.ReadCloser)
errs := make(chan error)
chunks := []private.ImageSourceChunk{
{Offset: 1, Length: 2},
{Offset: 4, Length: 1},
}
go streamChunksFromFile(streams, errs, file, chunks)

for _, c := range []struct {
expectedData []byte
expectedError error
}{
{[]byte("23"), nil},
{[]byte("5"), nil},
{[]byte(nil), nil},
} {
data, err := readNextStream(streams, errs)
assert.Equal(t, c.expectedData, data)
assert.Equal(t, c.expectedError, err)
}
}

0 comments on commit f7703dd

Please sign in to comment.