Skip to content

Commit

Permalink
Merge pull request #1393 from lyft/precompute-digests
Browse files Browse the repository at this point in the history
Precompute digests option prior to registry upload
  • Loading branch information
mtrmac committed Oct 21, 2021
2 parents b736197 + a618725 commit 2541165
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 20 deletions.
14 changes: 14 additions & 0 deletions docker/docker_image_dest.go
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/blobinfocache"
"github.com/containers/image/v5/internal/putblobdigest"
"github.com/containers/image/v5/internal/streamdigest"
"github.com/containers/image/v5/internal/uploadreader"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/types"
Expand Down Expand Up @@ -130,6 +131,19 @@ func (d *dockerImageDestination) HasThreadSafePutBlob() bool {
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
// If requested, precompute the blob digest to prevent uploading layers that already exist on the registry.
// This functionality is particularly useful when BlobInfoCache has not been populated with compressed digests,
// the source blob is uncompressed, and the destination blob is being compressed "on the fly".
if inputInfo.Digest == "" && d.c.sys.DockerRegistryPushPrecomputeDigests {
logrus.Debugf("Precomputing digest layer for %s", reference.Path(d.ref.ref))
streamCopy, cleanup, err := streamdigest.ComputeBlobInfo(d.c.sys, stream, &inputInfo)
if err != nil {
return types.BlobInfo{}, err
}
defer cleanup()
stream = streamCopy
}

if inputInfo.Digest != "" {
// This should not really be necessary, at least the copy code calls TryReusingBlob automatically.
// Still, we need to check, if only because the "initiate upload" endpoint does not have a documented "blob already exists" return value.
Expand Down
23 changes: 3 additions & 20 deletions docker/internal/tarfile/dest.go
Expand Up @@ -5,13 +5,10 @@ import (
"context"
"encoding/json"
"io"
"io/ioutil"
"os"

"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/iolimits"
"github.com/containers/image/v5/internal/putblobdigest"
"github.com/containers/image/v5/internal/tmpdir"
"github.com/containers/image/v5/internal/streamdigest"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
Expand Down Expand Up @@ -98,25 +95,11 @@ func (d *Destination) PutBlob(ctx context.Context, stream io.Reader, inputInfo t
// When the layer is decompressed, we also have to generate the digest on uncompressed data.
if inputInfo.Size == -1 || inputInfo.Digest == "" {
logrus.Debugf("docker tarfile: input with unknown size, streaming to disk first ...")
streamCopy, err := ioutil.TempFile(tmpdir.TemporaryDirectoryForBigFiles(d.sysCtx), "docker-tarfile-blob")
streamCopy, cleanup, err := streamdigest.ComputeBlobInfo(d.sysCtx, stream, &inputInfo)
if err != nil {
return types.BlobInfo{}, err
}
defer os.Remove(streamCopy.Name())
defer streamCopy.Close()

digester, stream2 := putblobdigest.DigestIfUnknown(stream, inputInfo)
// TODO: This can take quite some time, and should ideally be cancellable using ctx.Done().
size, err := io.Copy(streamCopy, stream2)
if err != nil {
return types.BlobInfo{}, err
}
_, err = streamCopy.Seek(0, io.SeekStart)
if err != nil {
return types.BlobInfo{}, err
}
inputInfo.Size = size // inputInfo is a struct, so we are only modifying our copy.
inputInfo.Digest = digester.Digest()
defer cleanup()
stream = streamCopy
logrus.Debugf("... streaming done")
}
Expand Down
1 change: 1 addition & 0 deletions internal/streamdigest/fixtures/Hello.uncompressed
@@ -0,0 +1 @@
Hello
41 changes: 41 additions & 0 deletions internal/streamdigest/stream_digest.go
@@ -0,0 +1,41 @@
package streamdigest

import (
"fmt"
"io"
"io/ioutil"
"os"

"github.com/containers/image/v5/internal/putblobdigest"
"github.com/containers/image/v5/internal/tmpdir"
"github.com/containers/image/v5/types"
)

// ComputeBlobInfo streams a blob to a temporary file and populates Digest and Size in inputInfo.
// The temporary file is returned as an io.Reader along with a cleanup function.
// It is the caller's responsibility to call the cleanup function, which closes and removes the temporary file.
// If an error occurs, inputInfo is not modified.
func ComputeBlobInfo(sys *types.SystemContext, stream io.Reader, inputInfo *types.BlobInfo) (io.Reader, func(), error) {
diskBlob, err := ioutil.TempFile(tmpdir.TemporaryDirectoryForBigFiles(sys), "stream-blob")
if err != nil {
return nil, nil, fmt.Errorf("creating temporary on-disk layer: %w", err)
}
cleanup := func() {
diskBlob.Close()
os.Remove(diskBlob.Name())
}
digester, stream := putblobdigest.DigestIfCanonicalUnknown(stream, *inputInfo)
written, err := io.Copy(diskBlob, stream)
if err != nil {
cleanup()
return nil, nil, fmt.Errorf("writing to temporary on-disk layer: %w", err)
}
_, err = diskBlob.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return nil, nil, fmt.Errorf("rewinding temporary on-disk layer: %w", err)
}
inputInfo.Digest = digester.Digest()
inputInfo.Size = written
return diskBlob, cleanup, nil
}
36 changes: 36 additions & 0 deletions internal/streamdigest/stream_digest_test.go
@@ -0,0 +1,36 @@
package streamdigest

import (
"io/ioutil"
"os"
"testing"

"github.com/containers/image/v5/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestComputeBlobInfo(t *testing.T) {
inputInfo := types.BlobInfo{Digest: "", Size: -1}
fixtureFname := "fixtures/Hello.uncompressed"
fixtureInfo := types.BlobInfo{Digest: "sha256:185f8db32271fe25f561a6fc938b2e264306ec304eda518007d1764826381969", Size: 5}
fixtureBytes := []byte("Hello")

// open fixture
stream, err := os.Open(fixtureFname)
require.NoError(t, err, fixtureFname)
defer stream.Close()

// fill in Digest and Size for inputInfo
streamCopy, cleanup, err := ComputeBlobInfo(nil, stream, &inputInfo)
require.NoError(t, err)
defer cleanup()

// ensure inputInfo has been filled in with Digest and Size of fixture
assert.Equal(t, inputInfo, fixtureInfo)

// ensure streamCopy is the same as fixture
b, err := ioutil.ReadAll(streamCopy)
require.NoError(t, err)
assert.Equal(t, b, fixtureBytes)
}
4 changes: 4 additions & 0 deletions types/types.go
Expand Up @@ -622,6 +622,10 @@ type SystemContext struct {
DockerLogMirrorChoice bool
// Directory to use for OSTree temporary files
OSTreeTmpDirPath string
// If true, all blobs will have precomputed digests to ensure layers are not uploaded that already exist on the registry.
// Note that this requires writing blobs to temporary files, and takes more time than the default behavior,
// when the digest for a blob is unknown.
DockerRegistryPushPrecomputeDigests bool

// === docker/daemon.Transport overrides ===
// A directory containing a CA certificate (ending with ".crt"),
Expand Down

0 comments on commit 2541165

Please sign in to comment.