Skip to content

Commit

Permalink
fs: allow pre/background-fetch to be called before layer verification
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Sep 27, 2021
1 parent 30b5083 commit 6a8daf5
Show file tree
Hide file tree
Showing 6 changed files with 478 additions and 242 deletions.
9 changes: 9 additions & 0 deletions estargz/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func (r *Reader) getOrCreateDir(d string) *TOCEntry {
return e
}

func (r *Reader) TOCDigest() digest.Digest {
return r.tocDigest
}

// VerifyTOC checks that the TOC JSON in the passed blob matches the
// passed digests and that the TOC JSON contains digests for all chunks
// contained in the blob. If the verification succceeds, this function
Expand All @@ -335,7 +339,12 @@ func (r *Reader) VerifyTOC(tocDigest digest.Digest) (TOCEntryVerifier, error) {
if r.tocDigest != tocDigest {
return nil, fmt.Errorf("invalid TOC JSON %q; want %q", r.tocDigest, tocDigest)
}
return r.Verifiers()
}

// Verifiers returns TOCEntryVerifier of this chunk. Use VerifyTOC instead in most cases
// because this doesn't verify TOC.
func (r *Reader) Verifiers() (TOCEntryVerifier, error) {
chunkDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the chunk digest
regDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the reg file digest
var chunkDigestMapIncomplete bool
Expand Down
74 changes: 34 additions & 40 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
return fmt.Errorf("source must be passed")
}

defaultPrefetchSize := fs.prefetchSize
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
defaultPrefetchSize = ps
}
}

// Resolve the target layer
var (
resultChan = make(chan layer.Layer)
Expand All @@ -188,10 +195,10 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
l, err := fs.resolver.Resolve(ctx, s.Hosts, s.Name, s.Target)
if err == nil {
resultChan <- l
fs.prefetch(ctx, l, defaultPrefetchSize, start)
return
}
rErr = errors.Wrapf(rErr, "failed to resolve layer %q from %q: %v",
s.Target.Digest, s.Name, err)
rErr = errors.Wrapf(rErr, "failed to resolve layer %q from %q: %v", s.Target.Digest, s.Name, err)
}
errChan <- rErr
}()
Expand All @@ -202,12 +209,17 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
desc := desc
go func() {
// Avoids to get canceled by client.
ctx := log.WithLogger(context.Background(),
log.G(ctx).WithField("mountpoint", mountpoint))
err := fs.resolver.Cache(ctx, preResolve.Hosts, preResolve.Name, desc)
ctx := log.WithLogger(context.Background(), log.G(ctx).WithField("mountpoint", mountpoint))
l, err := fs.resolver.Resolve(ctx, preResolve.Hosts, preResolve.Name, desc)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to pre-resolve")
return
}
fs.prefetch(ctx, l, defaultPrefetchSize, start)

// Release this layer because this isn't target and we don't use it anymore here.
// However, this will remain on the resolver cache until eviction.
l.Done()
}()
}

Expand Down Expand Up @@ -271,41 +283,6 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
fs.layerMu.Unlock()
fs.metricsController.Add(mountpoint, l)

// Prefetch this layer. We prefetch several layers in parallel. The first
// Check() for this layer waits for the prefetch completion.
if !fs.noprefetch {
prefetchSize := fs.prefetchSize
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
prefetchSize = ps
}
}
go func() {
fs.backgroundTaskManager.DoPrioritizedTask()
defer fs.backgroundTaskManager.DonePrioritizedTask()
if err := l.Prefetch(prefetchSize); err != nil {
log.G(ctx).WithError(err).Warnf("failed to prefetch layer=%v", digest)
return
}
log.G(ctx).Debug("completed to prefetch")
}()
}

// Fetch whole layer aggressively in background. We use background
// reader for this so prioritized tasks(Mount, Check, etc...) can
// interrupt the reading. This can avoid disturbing prioritized tasks
// about NW traffic.
if !fs.noBackgroundFetch {
go func() {
if err := l.BackgroundFetch(); err != nil {
log.G(ctx).WithError(err).Warnf("failed to fetch whole layer=%v", digest)
return
}
commonmetrics.LogLatencyForLastOnDemandFetch(ctx, digest, start, l.Info().ReadTime) // write log record for the latency between mount start and last on demand fetch
log.G(ctx).Debug("completed to fetch all layer data in background")
}()
}

// mount the node to the specified mountpoint
// TODO: bind mount the state directory as a read-only fs on snapshotter's side
rawFS := fusefs.NewNodeFS(node, &fusefs.Options{
Expand Down Expand Up @@ -422,6 +399,23 @@ func (fs *filesystem) Unmount(ctx context.Context, mountpoint string) error {
return syscall.Unmount(mountpoint, syscall.MNT_FORCE)
}

func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) {
// Prefetch a layer. The first Check() for this layer waits for the prefetch completion.
if !fs.noprefetch {
go l.Prefetch(defaultPrefetchSize)
}

// Fetch whole layer aggressively in background.
if !fs.noBackgroundFetch {
go func() {
if err := l.BackgroundFetch(); err == nil {
// write log record for the latency between mount start and last on demand fetch
commonmetrics.LogLatencyForLastOnDemandFetch(ctx, l.Info().Digest, start, l.Info().ReadTime)
}
}()
}
}

// neighboringLayers returns layer descriptors except the `target` layer in the specified manifest.
func neighboringLayers(manifest ocispec.Manifest, target ocispec.Descriptor) (descs []ocispec.Descriptor) {
for _, desc := range manifest.Layers {
Expand Down
93 changes: 56 additions & 37 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ type Layer interface {
Refresh(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error

// Verify verifies this layer using the passed TOC Digest.
// Nop if Verify() or SkipVerify() was already called.
Verify(tocDigest digest.Digest) (err error)

// SkipVerify skips verification for this layer.
// Nop if Verify() or SkipVerify() was already called.
SkipVerify()

// Prefetch prefetches the specified size. If the layer is eStargz and contains landmark files,
// the range indicated by these files is respected.
// Calling this function before calling Verify or SkipVerify will fail.
Prefetch(prefetchSize int64) error

// ReadAt reads this layer.
Expand All @@ -93,7 +94,6 @@ type Layer interface {

// BackgroundFetch fetches the entire layer contents to the cache.
// Fetching contents is done as a background task.
// Calling this function before calling Verify or SkipVerify will fail.
BackgroundFetch() error

// Done releases the reference to this layer. The resources related to this layer will be
Expand Down Expand Up @@ -350,18 +350,6 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts,
return &blobRef{cachedB.(remote.Blob), done}, nil
}

// Cache is similar to Resolve but the result isn't returned. Instead, it'll be stored in the cache.
func (r *Resolver) Cache(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error {
l, err := r.Resolve(ctx, hosts, refspec, desc)
if err != nil {
return err
}
// Release this layer. However, this will remain on the cache until eviction.
// Until then, the client can reuse this (already pre-resolved) layer.
l.Done()
return nil
}

func newLayer(
resolver *Resolver,
desc ocispec.Descriptor,
Expand Down Expand Up @@ -391,15 +379,22 @@ type layer struct {

closed bool
closedMu sync.Mutex

prefetchOnce sync.Once
backgroundFetchOnce sync.Once
}

func (l *layer) Info() Info {
var readTime time.Time
if l.r != nil {
readTime = l.r.LastOnDemandReadTime()
}
return Info{
Digest: l.desc.Digest,
Size: l.blob.Size(),
FetchedSize: l.blob.FetchedSize(),
PrefetchSize: l.prefetchedSize(),
ReadTime: l.r.LastOnDemandReadTime(),
ReadTime: readTime,
}
}

Expand Down Expand Up @@ -428,34 +423,50 @@ func (l *layer) Verify(tocDigest digest.Digest) (err error) {
if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
if l.r != nil {
return nil
}
l.r, err = l.verifiableReader.VerifyTOC(tocDigest)
return
}

func (l *layer) SkipVerify() {
if l.r != nil {
return
}
l.r = l.verifiableReader.SkipVerify()
}

func (l *layer) Prefetch(prefetchSize int64) error {
func (l *layer) Prefetch(prefetchSize int64) (err error) {
l.prefetchOnce.Do(func() {
ctx := context.Background()
l.resolver.backgroundTaskManager.DoPrioritizedTask()
defer l.resolver.backgroundTaskManager.DonePrioritizedTask()
err = l.prefetch(ctx, prefetchSize)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to prefetch layer=%v", l.desc.Digest)
return
}
log.G(ctx).Debug("completed to prefetch")
})
return
}

func (l *layer) prefetch(ctx context.Context, prefetchSize int64) error {
defer l.prefetchWaiter.done() // Notify the completion
ctx := context.Background()
// Measuring the total time to complete prefetch (use defer func() because l.Info().PrefetchSize is set later)
start := time.Now()
defer func() {
commonmetrics.WriteLatencyWithBytesLogValue(ctx, l.Info().Digest, commonmetrics.PrefetchTotal, start, commonmetrics.PrefetchSize, l.Info().PrefetchSize)
commonmetrics.WriteLatencyWithBytesLogValue(ctx, l.desc.Digest, commonmetrics.PrefetchTotal, start, commonmetrics.PrefetchSize, l.prefetchedSize())
}()

if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
if l.r == nil {
return fmt.Errorf("layer hasn't been verified yet")
}
lr := l.r
if _, ok := lr.Lookup(estargz.NoPrefetchLandmark); ok {
if _, ok := l.verifiableReader.Lookup(estargz.NoPrefetchLandmark); ok {
// do not prefetch this layer
return nil
} else if e, ok := lr.Lookup(estargz.PrefetchLandmark); ok {
} else if e, ok := l.verifiableReader.Lookup(estargz.PrefetchLandmark); ok {
// override the prefetch size with optimized value
prefetchSize = e.Offset
} else if prefetchSize > l.blob.Size() {
Expand All @@ -466,7 +477,7 @@ func (l *layer) Prefetch(prefetchSize int64) error {
// Fetch the target range
downloadStart := time.Now()
err := l.blob.Cache(0, prefetchSize)
commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.PrefetchDownload, downloadStart) // time to download prefetch data
commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.PrefetchDownload, downloadStart) // time to download prefetch data

if err != nil {
return errors.Wrap(err, "failed to prefetch layer")
Expand All @@ -479,10 +490,10 @@ func (l *layer) Prefetch(prefetchSize int64) error {

// Cache uncompressed contents of the prefetched range
decompressStart := time.Now()
err = lr.Cache(reader.WithFilter(func(e *estargz.TOCEntry) bool {
err = l.verifiableReader.Cache(reader.WithFilter(func(e *estargz.TOCEntry) bool {
return e.Offset < prefetchSize // Cache only prefetch target
}))
commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.PrefetchDecompress, decompressStart) // time to decompress prefetch data
commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.PrefetchDecompress, decompressStart) // time to decompress prefetch data

if err != nil {
return errors.Wrap(err, "failed to cache prefetched layer")
Expand All @@ -498,20 +509,28 @@ func (l *layer) WaitForPrefetchCompletion() error {
return l.prefetchWaiter.wait(l.resolver.prefetchTimeout)
}

func (l *layer) BackgroundFetch() error {
ctx := context.Background()
defer commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.BackgroundFetchTotal, time.Now())
func (l *layer) BackgroundFetch() (err error) {
l.backgroundFetchOnce.Do(func() {
ctx := context.Background()
err = l.backgroundFetch(ctx)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to fetch whole layer=%v", l.desc.Digest)
return
}
log.G(ctx).Debug("completed to fetch all layer data in background")
})
return
}

func (l *layer) backgroundFetch(ctx context.Context) error {
defer commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.BackgroundFetchTotal, time.Now())
if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
if l.r == nil {
return fmt.Errorf("layer hasn't been verified yet")
}
lr := l.r
br := io.NewSectionReader(readerAtFunc(func(p []byte, offset int64) (retN int, retErr error) {
l.resolver.backgroundTaskManager.InvokeBackgroundTask(func(ctx context.Context) {
// Measuring the time to download background fetch data (in milliseconds)
defer commonmetrics.MeasureLatency(commonmetrics.BackgroundFetchDownload, l.Info().Digest, time.Now()) // time to download background fetch data
defer commonmetrics.MeasureLatency(commonmetrics.BackgroundFetchDownload, l.desc.Digest, time.Now()) // time to download background fetch data
retN, retErr = l.blob.ReadAt(
p,
offset,
Expand All @@ -521,8 +540,8 @@ func (l *layer) BackgroundFetch() error {
}, 120*time.Second)
return
}), 0, l.blob.Size())
defer commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.BackgroundFetchDecompress, time.Now()) // time to decompress background fetch data (in milliseconds)
return lr.Cache(
defer commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.BackgroundFetchDecompress, time.Now()) // time to decompress background fetch data (in milliseconds)
return l.verifiableReader.Cache(
reader.WithReader(br), // Read contents in background
reader.WithCacheOpts(cache.Direct()), // Do not pollute mem cache
)
Expand Down
4 changes: 3 additions & 1 deletion fs/layer/layer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/containerd/stargz-snapshotter/fs/reader"
"github.com/containerd/stargz-snapshotter/fs/remote"
"github.com/containerd/stargz-snapshotter/fs/source"
"github.com/containerd/stargz-snapshotter/task"
"github.com/containerd/stargz-snapshotter/util/testutil"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -123,7 +124,8 @@ func TestPrefetch(t *testing.T) {
}
l := newLayer(
&Resolver{
prefetchTimeout: time.Second,
prefetchTimeout: time.Second,
backgroundTaskManager: task.NewBackgroundTaskManager(10, 5*time.Second),
},
ocispec.Descriptor{Digest: testStateLayerDigest},
&blobRef{blob, func() {}},
Expand Down

0 comments on commit 6a8daf5

Please sign in to comment.