Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fs: allow pre/background-fetch to be called before layer verification #467

Merged
merged 1 commit into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions estargz/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ func (r *Reader) getOrCreateDir(d string) *TOCEntry {
return e
}

func (r *Reader) TOCDigest() digest.Digest {
return r.tocDigest
}

// VerifyTOC checks that the TOC JSON in the passed blob matches the
// passed digests and that the TOC JSON contains digests for all chunks
// contained in the blob. If the verification succceeds, this function
Expand All @@ -335,7 +339,12 @@ func (r *Reader) VerifyTOC(tocDigest digest.Digest) (TOCEntryVerifier, error) {
if r.tocDigest != tocDigest {
return nil, fmt.Errorf("invalid TOC JSON %q; want %q", r.tocDigest, tocDigest)
}
return r.Verifiers()
}

// Verifiers returns TOCEntryVerifier of this chunk. Use VerifyTOC instead in most cases
// because this doesn't verify TOC.
func (r *Reader) Verifiers() (TOCEntryVerifier, error) {
chunkDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the chunk digest
regDigestMap := make(map[int64]digest.Digest) // map from chunk offset to the reg file digest
var chunkDigestMapIncomplete bool
Expand Down
74 changes: 34 additions & 40 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
return fmt.Errorf("source must be passed")
}

defaultPrefetchSize := fs.prefetchSize
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
defaultPrefetchSize = ps
}
}

// Resolve the target layer
var (
resultChan = make(chan layer.Layer)
Expand All @@ -188,10 +195,10 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
l, err := fs.resolver.Resolve(ctx, s.Hosts, s.Name, s.Target)
if err == nil {
resultChan <- l
fs.prefetch(ctx, l, defaultPrefetchSize, start)
return
}
rErr = errors.Wrapf(rErr, "failed to resolve layer %q from %q: %v",
s.Target.Digest, s.Name, err)
rErr = errors.Wrapf(rErr, "failed to resolve layer %q from %q: %v", s.Target.Digest, s.Name, err)
}
errChan <- rErr
}()
Expand All @@ -202,12 +209,17 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
desc := desc
go func() {
// Avoids to get canceled by client.
ctx := log.WithLogger(context.Background(),
log.G(ctx).WithField("mountpoint", mountpoint))
err := fs.resolver.Cache(ctx, preResolve.Hosts, preResolve.Name, desc)
ctx := log.WithLogger(context.Background(), log.G(ctx).WithField("mountpoint", mountpoint))
l, err := fs.resolver.Resolve(ctx, preResolve.Hosts, preResolve.Name, desc)
if err != nil {
log.G(ctx).WithError(err).Debug("failed to pre-resolve")
return
}
fs.prefetch(ctx, l, defaultPrefetchSize, start)

// Release this layer because this isn't target and we don't use it anymore here.
// However, this will remain on the resolver cache until eviction.
l.Done()
}()
}

Expand Down Expand Up @@ -271,41 +283,6 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
fs.layerMu.Unlock()
fs.metricsController.Add(mountpoint, l)

// Prefetch this layer. We prefetch several layers in parallel. The first
// Check() for this layer waits for the prefetch completion.
if !fs.noprefetch {
prefetchSize := fs.prefetchSize
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
prefetchSize = ps
}
}
go func() {
fs.backgroundTaskManager.DoPrioritizedTask()
defer fs.backgroundTaskManager.DonePrioritizedTask()
if err := l.Prefetch(prefetchSize); err != nil {
log.G(ctx).WithError(err).Warnf("failed to prefetch layer=%v", digest)
return
}
log.G(ctx).Debug("completed to prefetch")
}()
}

// Fetch whole layer aggressively in background. We use background
// reader for this so prioritized tasks(Mount, Check, etc...) can
// interrupt the reading. This can avoid disturbing prioritized tasks
// about NW traffic.
if !fs.noBackgroundFetch {
go func() {
if err := l.BackgroundFetch(); err != nil {
log.G(ctx).WithError(err).Warnf("failed to fetch whole layer=%v", digest)
return
}
commonmetrics.LogLatencyForLastOnDemandFetch(ctx, digest, start, l.Info().ReadTime) // write log record for the latency between mount start and last on demand fetch
log.G(ctx).Debug("completed to fetch all layer data in background")
}()
}

// mount the node to the specified mountpoint
// TODO: bind mount the state directory as a read-only fs on snapshotter's side
rawFS := fusefs.NewNodeFS(node, &fusefs.Options{
Expand Down Expand Up @@ -422,6 +399,23 @@ func (fs *filesystem) Unmount(ctx context.Context, mountpoint string) error {
return syscall.Unmount(mountpoint, syscall.MNT_FORCE)
}

func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) {
// Prefetch a layer. The first Check() for this layer waits for the prefetch completion.
if !fs.noprefetch {
go l.Prefetch(defaultPrefetchSize)
}

// Fetch whole layer aggressively in background.
if !fs.noBackgroundFetch {
go func() {
if err := l.BackgroundFetch(); err == nil {
// write log record for the latency between mount start and last on demand fetch
commonmetrics.LogLatencyForLastOnDemandFetch(ctx, l.Info().Digest, start, l.Info().ReadTime)
}
}()
}
}

// neighboringLayers returns layer descriptors except the `target` layer in the specified manifest.
func neighboringLayers(manifest ocispec.Manifest, target ocispec.Descriptor) (descs []ocispec.Descriptor) {
for _, desc := range manifest.Layers {
Expand Down
93 changes: 56 additions & 37 deletions fs/layer/layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,15 @@ type Layer interface {
Refresh(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error

// Verify verifies this layer using the passed TOC Digest.
// Nop if Verify() or SkipVerify() was already called.
Verify(tocDigest digest.Digest) (err error)

// SkipVerify skips verification for this layer.
// Nop if Verify() or SkipVerify() was already called.
SkipVerify()

// Prefetch prefetches the specified size. If the layer is eStargz and contains landmark files,
// the range indicated by these files is respected.
// Calling this function before calling Verify or SkipVerify will fail.
Prefetch(prefetchSize int64) error

// ReadAt reads this layer.
Expand All @@ -93,7 +94,6 @@ type Layer interface {

// BackgroundFetch fetches the entire layer contents to the cache.
// Fetching contents is done as a background task.
// Calling this function before calling Verify or SkipVerify will fail.
BackgroundFetch() error

// Done releases the reference to this layer. The resources related to this layer will be
Expand Down Expand Up @@ -350,18 +350,6 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts,
return &blobRef{cachedB.(remote.Blob), done}, nil
}

// Cache is similar to Resolve but the result isn't returned. Instead, it'll be stored in the cache.
func (r *Resolver) Cache(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) error {
l, err := r.Resolve(ctx, hosts, refspec, desc)
if err != nil {
return err
}
// Release this layer. However, this will remain on the cache until eviction.
// Until then, the client can reuse this (already pre-resolved) layer.
l.Done()
return nil
}

func newLayer(
resolver *Resolver,
desc ocispec.Descriptor,
Expand Down Expand Up @@ -391,15 +379,22 @@ type layer struct {

closed bool
closedMu sync.Mutex

prefetchOnce sync.Once
backgroundFetchOnce sync.Once
}

func (l *layer) Info() Info {
var readTime time.Time
if l.r != nil {
readTime = l.r.LastOnDemandReadTime()
}
return Info{
Digest: l.desc.Digest,
Size: l.blob.Size(),
FetchedSize: l.blob.FetchedSize(),
PrefetchSize: l.prefetchedSize(),
ReadTime: l.r.LastOnDemandReadTime(),
ReadTime: readTime,
}
}

Expand Down Expand Up @@ -428,34 +423,50 @@ func (l *layer) Verify(tocDigest digest.Digest) (err error) {
if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
if l.r != nil {
return nil
}
l.r, err = l.verifiableReader.VerifyTOC(tocDigest)
return
}

func (l *layer) SkipVerify() {
if l.r != nil {
return
}
l.r = l.verifiableReader.SkipVerify()
}

func (l *layer) Prefetch(prefetchSize int64) error {
func (l *layer) Prefetch(prefetchSize int64) (err error) {
l.prefetchOnce.Do(func() {
ctx := context.Background()
l.resolver.backgroundTaskManager.DoPrioritizedTask()
defer l.resolver.backgroundTaskManager.DonePrioritizedTask()
err = l.prefetch(ctx, prefetchSize)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to prefetch layer=%v", l.desc.Digest)
return
}
log.G(ctx).Debug("completed to prefetch")
})
return
}

func (l *layer) prefetch(ctx context.Context, prefetchSize int64) error {
defer l.prefetchWaiter.done() // Notify the completion
ctx := context.Background()
// Measuring the total time to complete prefetch (use defer func() because l.Info().PrefetchSize is set later)
start := time.Now()
defer func() {
commonmetrics.WriteLatencyWithBytesLogValue(ctx, l.Info().Digest, commonmetrics.PrefetchTotal, start, commonmetrics.PrefetchSize, l.Info().PrefetchSize)
commonmetrics.WriteLatencyWithBytesLogValue(ctx, l.desc.Digest, commonmetrics.PrefetchTotal, start, commonmetrics.PrefetchSize, l.prefetchedSize())
}()

if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
if l.r == nil {
return fmt.Errorf("layer hasn't been verified yet")
}
lr := l.r
if _, ok := lr.Lookup(estargz.NoPrefetchLandmark); ok {
if _, ok := l.verifiableReader.Lookup(estargz.NoPrefetchLandmark); ok {
// do not prefetch this layer
return nil
} else if e, ok := lr.Lookup(estargz.PrefetchLandmark); ok {
} else if e, ok := l.verifiableReader.Lookup(estargz.PrefetchLandmark); ok {
// override the prefetch size with optimized value
prefetchSize = e.Offset
} else if prefetchSize > l.blob.Size() {
Expand All @@ -466,7 +477,7 @@ func (l *layer) Prefetch(prefetchSize int64) error {
// Fetch the target range
downloadStart := time.Now()
err := l.blob.Cache(0, prefetchSize)
commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.PrefetchDownload, downloadStart) // time to download prefetch data
commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.PrefetchDownload, downloadStart) // time to download prefetch data

if err != nil {
return errors.Wrap(err, "failed to prefetch layer")
Expand All @@ -479,10 +490,10 @@ func (l *layer) Prefetch(prefetchSize int64) error {

// Cache uncompressed contents of the prefetched range
decompressStart := time.Now()
err = lr.Cache(reader.WithFilter(func(e *estargz.TOCEntry) bool {
err = l.verifiableReader.Cache(reader.WithFilter(func(e *estargz.TOCEntry) bool {
return e.Offset < prefetchSize // Cache only prefetch target
}))
commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.PrefetchDecompress, decompressStart) // time to decompress prefetch data
commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.PrefetchDecompress, decompressStart) // time to decompress prefetch data

if err != nil {
return errors.Wrap(err, "failed to cache prefetched layer")
Expand All @@ -498,20 +509,28 @@ func (l *layer) WaitForPrefetchCompletion() error {
return l.prefetchWaiter.wait(l.resolver.prefetchTimeout)
}

func (l *layer) BackgroundFetch() error {
ctx := context.Background()
defer commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.BackgroundFetchTotal, time.Now())
func (l *layer) BackgroundFetch() (err error) {
l.backgroundFetchOnce.Do(func() {
ctx := context.Background()
err = l.backgroundFetch(ctx)
if err != nil {
log.G(ctx).WithError(err).Warnf("failed to fetch whole layer=%v", l.desc.Digest)
return
}
log.G(ctx).Debug("completed to fetch all layer data in background")
})
return
}

func (l *layer) backgroundFetch(ctx context.Context) error {
defer commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.BackgroundFetchTotal, time.Now())
if l.isClosed() {
return fmt.Errorf("layer is already closed")
}
if l.r == nil {
return fmt.Errorf("layer hasn't been verified yet")
}
lr := l.r
br := io.NewSectionReader(readerAtFunc(func(p []byte, offset int64) (retN int, retErr error) {
l.resolver.backgroundTaskManager.InvokeBackgroundTask(func(ctx context.Context) {
// Measuring the time to download background fetch data (in milliseconds)
defer commonmetrics.MeasureLatency(commonmetrics.BackgroundFetchDownload, l.Info().Digest, time.Now()) // time to download background fetch data
defer commonmetrics.MeasureLatency(commonmetrics.BackgroundFetchDownload, l.desc.Digest, time.Now()) // time to download background fetch data
retN, retErr = l.blob.ReadAt(
p,
offset,
Expand All @@ -521,8 +540,8 @@ func (l *layer) BackgroundFetch() error {
}, 120*time.Second)
return
}), 0, l.blob.Size())
defer commonmetrics.WriteLatencyLogValue(ctx, l.Info().Digest, commonmetrics.BackgroundFetchDecompress, time.Now()) // time to decompress background fetch data (in milliseconds)
return lr.Cache(
defer commonmetrics.WriteLatencyLogValue(ctx, l.desc.Digest, commonmetrics.BackgroundFetchDecompress, time.Now()) // time to decompress background fetch data (in milliseconds)
return l.verifiableReader.Cache(
reader.WithReader(br), // Read contents in background
reader.WithCacheOpts(cache.Direct()), // Do not pollute mem cache
)
Expand Down
4 changes: 3 additions & 1 deletion fs/layer/layer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/containerd/stargz-snapshotter/fs/reader"
"github.com/containerd/stargz-snapshotter/fs/remote"
"github.com/containerd/stargz-snapshotter/fs/source"
"github.com/containerd/stargz-snapshotter/task"
"github.com/containerd/stargz-snapshotter/util/testutil"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -123,7 +124,8 @@ func TestPrefetch(t *testing.T) {
}
l := newLayer(
&Resolver{
prefetchTimeout: time.Second,
prefetchTimeout: time.Second,
backgroundTaskManager: task.NewBackgroundTaskManager(10, 5*time.Second),
},
ocispec.Descriptor{Digest: testStateLayerDigest},
&blobRef{blob, func() {}},
Expand Down