Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve progress bars, and update mpb #1530

Merged
merged 11 commits into from Apr 27, 2022
129 changes: 25 additions & 104 deletions copy/copy.go
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
"golang.org/x/sync/semaphore"
"golang.org/x/term"
)
Expand Down Expand Up @@ -1069,85 +1068,6 @@ func (ic *imageCopier) copyUpdatedConfigAndManifest(ctx context.Context, instanc
return man, manifestDigest, nil
}

// newProgressPool creates a *mpb.Progress.
// The caller must eventually call pool.Wait() after the pool will no longer be updated.
// NOTE: Every progress bar created within the progress pool must either successfully
// complete or be aborted, or pool.Wait() will hang. That is typically done
// using "defer bar.Abort(false)", which must be called BEFORE pool.Wait() is called.
func (c *copier) newProgressPool() *mpb.Progress {
return mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput))
}

// customPartialBlobDecorFunc implements mpb.DecorFunc for the partial blobs retrieval progress bar
func customPartialBlobDecorFunc(s decor.Statistics) string {
if s.Total == 0 {
pairFmt := "%.1f / %.1f (skipped: %.1f)"
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill))
}
pairFmt := "%.1f / %.1f (skipped: %.1f = %.2f%%)"
percentage := 100.0 * float64(s.Refill) / float64(s.Total)
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill), percentage)
}

// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is io.Discard, the progress bar's output will be discarded
// NOTE: Every progress bar created within a progress pool must either successfully
// complete or be aborted, or pool.Wait() will hang. That is typically done
// using "defer bar.Abort(false)", which must happen BEFORE pool.Wait() is called.
func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded())
// Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column.
maxPrefixLen := len("Copying blob ") + shortDigestLen
if len(prefix) > maxPrefixLen {
prefix = prefix[:maxPrefixLen]
}

// onComplete will replace prefix once the bar/spinner has completed
onComplete = prefix + " " + onComplete

// Use a normal progress bar when we know the size (i.e., size > 0).
// Otherwise, use a spinner to indicate that something's happening.
var bar *mpb.Bar
if info.Size > 0 {
if partial {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.Any(customPartialBlobDecorFunc),
),
)
} else {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
}
} else {
bar = pool.New(0,
mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft(),
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
)
}
if c.progressOutput == io.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
return bar
}

// copyConfig copies config.json, if any, from src to dest.
func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
srcInfo := src.ConfigInfo()
Expand All @@ -1158,22 +1078,23 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
}
defer c.concurrentBlobCopiesSemaphore.Release(1)

configBlob, err := src.ConfigBlob(ctx)
if err != nil {
return errors.Wrapf(err, "reading config blob %s", srcInfo.Digest)
}

destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool := c.newProgressPool()
defer progressPool.Wait()
bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done")
defer bar.Abort(false)

configBlob, err := src.ConfigBlob(ctx)
if err != nil {
return types.BlobInfo{}, errors.Wrapf(err, "reading config blob %s", srcInfo.Digest)
}

destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false)
if err != nil {
return types.BlobInfo{}, err
}
bar.SetTotal(int64(len(configBlob)), true)

bar.mark100PercentComplete()
return destInfo, nil
}()
if err != nil {
Expand Down Expand Up @@ -1242,9 +1163,9 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
func() { // A scope for defer
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists")
bar := ic.c.createProgressBar(pool, false, types.BlobInfo{Digest: blobInfo.Digest, Size: 0}, "blob", "skipped: already exists")
defer bar.Abort(false)
bar.SetTotal(0, true)
bar.mark100PercentComplete()
}()

// Throw an event that the layer has been skipped
Expand Down Expand Up @@ -1287,12 +1208,12 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
wrapped: ic.c.rawSource,
bar: bar,
}
bar.SetTotal(srcInfo.Size, false)
info, err := ic.c.dest.PutBlobPartial(ctx, &proxy, srcInfo, ic.c.blobInfoCache)
if err == nil {
bar.SetRefill(srcInfo.Size - bar.Current())
bar.SetCurrent(srcInfo.Size)
bar.SetTotal(srcInfo.Size, true)
if srcInfo.Size != -1 {
bar.SetRefill(srcInfo.Size - bar.Current())
}
bar.mark100PercentComplete()
hideProgressBar = false
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
return true, info
Expand All @@ -1305,16 +1226,16 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}

// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "reading blob %s", srcInfo.Digest)
}
defer srcStream.Close()

return func() (types.BlobInfo, digest.Digest, error) { // A scope for defer
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done")
defer bar.Abort(false)

srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "reading blob %s", srcInfo.Digest)
}
defer srcStream.Close()

blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
if err != nil {
return types.BlobInfo{}, "", err
Expand All @@ -1337,7 +1258,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}

bar.SetTotal(srcInfo.Size, true)
bar.mark100PercentComplete()
return blobInfo, diffID, nil
}()
}
Expand All @@ -1347,7 +1268,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// perhaps (de/re/)compressing the stream,
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, toEncrypt bool, bar *mpb.Bar, layerIndex int, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) {
diffIDIsNeeded bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult

Expand Down Expand Up @@ -1424,7 +1345,7 @@ func (r errorAnnotationReader) Read(b []byte) (n int, err error) {
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, toEncrypt bool, bar *mpb.Bar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
canModifyBlob bool, isConfig bool, toEncrypt bool, bar *progressBar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
if isConfig { // This is guaranteed by the caller, but set it here to be explicit.
canModifyBlob = false
}
Expand All @@ -1444,6 +1365,9 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}
var destStream io.Reader = digestingReader

// === Update progress bars
destStream = bar.ProxyReader(destStream)

// === Decrypt the stream, if required.
var decrypted bool
if isOciEncrypted(srcInfo.MediaType) && c.ociDecryptConfig != nil {
Expand Down Expand Up @@ -1478,9 +1402,6 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
logrus.Debugf("blob %s with type %s should be compressed with %s, but compressor appears to be %s", srcInfo.Digest.String(), srcInfo.MediaType, expectedCompressionFormat.Name(), compressionFormat.Name())
}

// === Update progress bars
destStream = bar.ProxyReader(destStream)

// === Send a copy of the original, uncompressed, stream, to a separate path if necessary.
var originalLayerReader io.Reader // DO NOT USE this other than to drain the input if no other consumer in the pipeline has done so.
if getOriginalLayerCopyWriter != nil {
Expand Down
148 changes: 148 additions & 0 deletions copy/progress_bars.go
@@ -0,0 +1,148 @@
package copy

import (
"context"
"fmt"
"io"

"github.com/containers/image/v5/internal/private"
"github.com/containers/image/v5/types"
"github.com/vbauerster/mpb/v7"
"github.com/vbauerster/mpb/v7/decor"
)

// newProgressPool creates a *mpb.Progress.
// The caller must eventually call pool.Wait() after the pool will no longer be updated.
// NOTE: Every progress bar created within the progress pool must either successfully
// complete or be aborted, or pool.Wait() will hang. That is typically done
// using "defer bar.Abort(false)", which must be called BEFORE pool.Wait() is called.
func (c *copier) newProgressPool() *mpb.Progress {
return mpb.New(mpb.WithWidth(40), mpb.WithOutput(c.progressOutput))
}

// customPartialBlobDecorFunc implements mpb.DecorFunc for the partial blobs retrieval progress bar
func customPartialBlobDecorFunc(s decor.Statistics) string {
if s.Total == 0 {
pairFmt := "%.1f / %.1f (skipped: %.1f)"
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill))
}
pairFmt := "%.1f / %.1f (skipped: %.1f = %.2f%%)"
percentage := 100.0 * float64(s.Refill) / float64(s.Total)
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill), percentage)
}

// progressBar wraps a *mpb.Bar, allowing us to add extra state and methods.
type progressBar struct {
*mpb.Bar
originalSize int64 // or -1 if unknown
}

// createProgressBar creates a progressBar in pool. Note that if the copier's reportWriter
// is io.Discard, the progress bar's output will be discarded
//
// NOTE: Every progress bar created within a progress pool must either successfully
// complete or be aborted, or pool.Wait() will hang. That is typically done
// using "defer bar.Abort(false)", which must happen BEFORE pool.Wait() is called.
//
// As a convention, most users of progress bars should call mark100PercentComplete on full success;
// by convention, we don't leave progress bars in partial state when fully done
// (even if we copied much less data than anticipated).
func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *progressBar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12

prefix := fmt.Sprintf("Copying %s %s", kind, info.Digest.Encoded())
// Truncate the prefix (chopping of some part of the digest) to make all progress bars aligned in a column.
maxPrefixLen := len("Copying blob ") + shortDigestLen
if len(prefix) > maxPrefixLen {
prefix = prefix[:maxPrefixLen]
}

// onComplete will replace prefix once the bar/spinner has completed
onComplete = prefix + " " + onComplete

// Use a normal progress bar when we know the size (i.e., size > 0).
// Otherwise, use a spinner to indicate that something's happening.
var bar *mpb.Bar
if info.Size > 0 {
if partial {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.Any(customPartialBlobDecorFunc),
),
)
} else {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
}
} else {
bar = pool.New(0,
mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft(),
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
)
}
if c.progressOutput == io.Discard {
c.Printf("Copying %s %s\n", kind, info.Digest)
}
return &progressBar{
Bar: bar,
originalSize: info.Size,
}
}

// mark100PercentComplete marks the progres bars as 100% complete;
// it may do so by possibly advancing the current state if it is below the known total.
func (bar *progressBar) mark100PercentComplete() {
if bar.originalSize > 0 {
// We can't call bar.SetTotal even if we wanted to; the total can not be changed
// after a progress bar is created with a definite total.
bar.SetCurrent(bar.originalSize) // This triggers the completion condition.
} else {
// -1 = unknown size
// 0 is somewhat of a a special case: Unlike c/image, where 0 is a definite known
// size (possible at least in theory), in mpb, zero-sized progress bars are treated
// as unknown size, in particular they are not configured to be marked as
// complete on bar.Current() reaching bar.total (because that would happen already
// when creating the progress bar).
// That means that we are both _allowed_ to call SetTotal, and we _have to_.
bar.SetTotal(-1, true) // total < 0 = set it to bar.Current(), report it; and mark the bar as complete.
}
}

// blobChunkAccessorProxy wraps a BlobChunkAccessor and updates a *progressBar
// with the number of received bytes.
type blobChunkAccessorProxy struct {
wrapped private.BlobChunkAccessor // The underlying BlobChunkAccessor
bar *progressBar // A progress bar updated with the number of bytes read so far
}

// GetBlobAt returns a sequential channel of readers that contain data for the requested
// blob chunks, and a channel that might get a single error value.
// The specified chunks must be not overlapping and sorted by their offset.
// The readers must be fully consumed, in the order they are returned, before blocking
// to read the next chunk.
func (s *blobChunkAccessorProxy) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []private.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
rc, errs, err := s.wrapped.GetBlobAt(ctx, info, chunks)
if err == nil {
total := int64(0)
for _, c := range chunks {
total += int64(c.Length)
}
s.bar.IncrInt64(total)
}
return rc, errs, err
}