Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate digest in repo for pull by digest #44327

Merged
merged 3 commits into from Oct 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
107 changes: 101 additions & 6 deletions distribution/manifest.go
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"io"
"strings"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
Expand All @@ -14,15 +15,22 @@ import (
"github.com/docker/distribution/manifest/manifestlist"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/reference"
"github.com/docker/docker/registry"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

// labelDistributionSource describes the source blob comes from.
const labelDistributionSource = "containerd.io/distribution.source"

// This is used by manifestStore to pare down the requirements to implement a
// full distribution.ManifestService, since `Get` is all we use here.
type manifestGetter interface {
Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error)
Exists(ctx context.Context, dgst digest.Digest) (bool, error)
}

type manifestStore struct {
Expand All @@ -39,15 +47,98 @@ type ContentStore interface {
content.Provider
Info(ctx context.Context, dgst digest.Digest) (content.Info, error)
Abort(ctx context.Context, ref string) error
Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error)
}

func makeDistributionSourceLabel(ref reference.Named) (string, string) {
domain := reference.Domain(ref)
if domain == "" {
domain = registry.DefaultNamespace
}
repo := reference.Path(ref)

return fmt.Sprintf("%s.%s", labelDistributionSource, domain), repo
}

func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
// Taken from https://github.com/containerd/containerd/blob/e079e4a155c86f07bbd602fe6753ecacc78198c2/remotes/docker/handler.go#L84-L108
func appendDistributionSourceLabel(originLabel, repo string) string {
repos := []string{}
if originLabel != "" {
repos = strings.Split(originLabel, ",")
}
repos = append(repos, repo)

// use empty string to present duplicate items
for i := 1; i < len(repos); i++ {
tmp, j := repos[i], i-1
for ; j >= 0 && repos[j] >= tmp; j-- {
if repos[j] == tmp {
tmp = ""
}
repos[j+1] = repos[j]
}
repos[j+1] = tmp
}

i := 0
for ; i < len(repos) && repos[i] == ""; i++ {
}

return strings.Join(repos[i:], ",")
}

func hasDistributionSource(label, repo string) bool {
sources := strings.Split(label, ",")
for _, s := range sources {
if s == repo {
return true
}
}
return false
}

func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor, ref reference.Named) (distribution.Manifest, error) {
ra, err := m.local.ReaderAt(ctx, desc)
if err != nil {
return nil, errors.Wrap(err, "error getting content store reader")
}
defer ra.Close()

distKey, distRepo := makeDistributionSourceLabel(ref)
info, err := m.local.Info(ctx, desc.Digest)
if err != nil {
return nil, errors.Wrap(err, "error getting content info")
}

if _, ok := ref.(reference.Canonical); ok {
// Since this is specified by digest...
// We know we have the content locally, we need to check if we've seen this content at the specified repository before.
// If we have, we can just return the manifest from the local content store.
// If we haven't, we need to check the remote repository to see if it has the content, otherwise we can end up returning
// a manifest that has never even existed in the remote before.
if !hasDistributionSource(info.Labels[distKey], distRepo) {
logrus.WithField("ref", ref).Debug("found manifest but no mataching source repo is listed, checking with remote")
exists, err := m.remote.Exists(ctx, desc.Digest)
if err != nil {
return nil, errors.Wrap(err, "error checking if remote exists")
}

if !exists {
return nil, errors.Wrapf(errdefs.ErrNotFound, "manifest %v not found", desc.Digest)
}

}
}

// Update the distribution sources since we now know the content exists in the remote.
if info.Labels == nil {
info.Labels = map[string]string{}
}
info.Labels[distKey] = appendDistributionSourceLabel(info.Labels[distKey], distRepo)
if _, err := m.local.Update(ctx, info, "labels."+distKey); err != nil {
logrus.WithError(err).WithField("ref", ref).Warn("Could not update content distribution source")
}

r := io.NewSectionReader(ra, 0, ra.Size())
data, err := io.ReadAll(r)
if err != nil {
Expand All @@ -58,6 +149,7 @@ func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor) (di
if err != nil {
return nil, errors.Wrap(err, "error unmarshaling manifest from content store")
}

return manifest, nil
}

Expand All @@ -75,7 +167,7 @@ func (m *manifestStore) getMediaType(ctx context.Context, desc specs.Descriptor)
return mt, nil
}

func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) {
func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor, ref reference.Named) (distribution.Manifest, error) {
l := log.G(ctx)

if desc.MediaType == "" {
Expand Down Expand Up @@ -103,7 +195,7 @@ func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distrib
if err != nil {
if errdefs.IsAlreadyExists(err) {
var manifest distribution.Manifest
if manifest, err = m.getLocal(ctx, desc); err == nil {
if manifest, err = m.getLocal(ctx, desc, ref); err == nil {
return manifest, nil
}
}
Expand All @@ -125,7 +217,7 @@ func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distrib

if w != nil {
// if `w` is nil here, something happened with the content store, so don't bother trying to persist.
if err := m.Put(ctx, manifest, desc, w); err != nil {
if err := m.Put(ctx, manifest, desc, w, ref); err != nil {
if err := m.local.Abort(ctx, key); err != nil {
l.WithError(err).Warn("error aborting content ingest")
}
Expand All @@ -135,7 +227,7 @@ func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distrib
return manifest, nil
}

func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, desc specs.Descriptor, w content.Writer) error {
func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, desc specs.Descriptor, w content.Writer, ref reference.Named) error {
mt, payload, err := manifest.Payload()
if err != nil {
return err
Expand All @@ -147,7 +239,10 @@ func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest,
return errors.Wrap(err, "error writing manifest to content store")
}

if err := w.Commit(ctx, desc.Size, desc.Digest); err != nil {
distKey, distSource := makeDistributionSourceLabel(ref)
if err := w.Commit(ctx, desc.Size, desc.Digest, content.WithLabels(map[string]string{
distKey: distSource,
})); err != nil {
return errors.Wrap(err, "error committing manifest to content store")
}
return nil
Expand Down
62 changes: 42 additions & 20 deletions distribution/manifest_test.go
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/docker/distribution/manifest/ocischema"
"github.com/docker/distribution/manifest/schema1"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/distribution/reference"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand All @@ -39,6 +40,11 @@ func (m *mockManifestGetter) Get(ctx context.Context, dgst digest.Digest, option
return manifest, nil
}

func (m *mockManifestGetter) Exists(ctx context.Context, dgst digest.Digest) (bool, error) {
_, ok := m.manifests[dgst]
return ok, nil
}

type memoryLabelStore struct {
mu sync.Mutex
labels map[digest.Digest]map[string]string
Expand Down Expand Up @@ -76,7 +82,9 @@ func (s *memoryLabelStore) Update(dgst digest.Digest, update map[string]string)
for k, v := range update {
labels[k] = v
}

if s.labels == nil {
s.labels = map[digest.Digest]map[string]string{}
}
s.labels[dgst] = labels

return labels, nil
Expand Down Expand Up @@ -125,7 +133,7 @@ func TestManifestStore(t *testing.T) {
assert.NilError(t, err)
dgst := digest.Canonical.FromBytes(serialized)

setupTest := func(t *testing.T) (specs.Descriptor, *mockManifestGetter, *manifestStore, content.Store, func(*testing.T)) {
setupTest := func(t *testing.T) (reference.Named, specs.Descriptor, *mockManifestGetter, *manifestStore, content.Store, func(*testing.T)) {
root, err := os.MkdirTemp("", strings.ReplaceAll(t.Name(), "/", "_"))
assert.NilError(t, err)
defer func() {
Expand All @@ -141,7 +149,10 @@ func TestManifestStore(t *testing.T) {
store := &manifestStore{local: cs, remote: mg}
desc := specs.Descriptor{Digest: dgst, MediaType: specs.MediaTypeImageManifest, Size: int64(len(serialized))}

return desc, mg, store, cs, func(t *testing.T) {
ref, err := reference.Parse("foo/bar")
assert.NilError(t, err)

return ref.(reference.Named), desc, mg, store, cs, func(t *testing.T) {
assert.Check(t, os.RemoveAll(root))
}
}
Expand Down Expand Up @@ -181,22 +192,22 @@ func TestManifestStore(t *testing.T) {
}

t.Run("no remote or local", func(t *testing.T) {
desc, _, store, cs, teardown := setupTest(t)
ref, desc, _, store, cs, teardown := setupTest(t)
defer teardown(t)

_, err = store.Get(ctx, desc)
_, err = store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
// This error is what our digest getter returns when it doesn't know about the manifest
assert.Error(t, err, distribution.ErrManifestUnknown{Tag: dgst.String()}.Error())
})

t.Run("no local cache", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)

mg.manifests[desc.Digest] = m

m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
Expand All @@ -206,23 +217,34 @@ func TestManifestStore(t *testing.T) {
assert.NilError(t, err)
assert.Check(t, cmp.Equal(i.Digest, desc.Digest))

distKey, distSource := makeDistributionSourceLabel(ref)
assert.Check(t, hasDistributionSource(i.Labels[distKey], distSource))

// Now check again, this should not hit the remote
m2, err = store.Get(ctx, desc)
m2, err = store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
assert.Check(t, cmp.Equal(mg.gets, 1))

t.Run("digested", func(t *testing.T) {
ref, err := reference.WithDigest(ref, desc.Digest)
assert.NilError(t, err)

_, err = store.Get(ctx, desc, ref)
assert.NilError(t, err)
})
})

t.Run("with local cache", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)

// first add the manifest to the coontent store
writeManifest(t, cs, desc)

// now do the get
m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
Expand All @@ -236,13 +258,13 @@ func TestManifestStore(t *testing.T) {
// This is for the case of pull by digest where we don't know the media type of the manifest until it's actually pulled.
t.Run("unknown media type", func(t *testing.T) {
t.Run("no cache", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)

mg.manifests[desc.Digest] = m
desc.MediaType = ""

m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
Expand All @@ -251,27 +273,27 @@ func TestManifestStore(t *testing.T) {

t.Run("with cache", func(t *testing.T) {
t.Run("cached manifest has media type", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)

writeManifest(t, cs, desc)
desc.MediaType = ""

m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
assert.Check(t, cmp.Equal(mg.gets, 0))
})

t.Run("cached manifest has no media type", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)

desc.MediaType = ""
writeManifest(t, cs, desc)

m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
Expand All @@ -286,14 +308,14 @@ func TestManifestStore(t *testing.T) {
// Also makes sure the ingests are aborted.
t.Run("error persisting manifest", func(t *testing.T) {
t.Run("error on writer", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)
mg.manifests[desc.Digest] = m

csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnWriter: errors.New("random error")}
store.local = csW

m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
Expand All @@ -305,14 +327,14 @@ func TestManifestStore(t *testing.T) {
})

t.Run("error on commit", func(t *testing.T) {
desc, mg, store, cs, teardown := setupTest(t)
ref, desc, mg, store, cs, teardown := setupTest(t)
defer teardown(t)
mg.manifests[desc.Digest] = m

csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnCommit: errors.New("random error")}
store.local = csW

m2, err := store.Get(ctx, desc)
m2, err := store.Get(ctx, desc, ref)
checkIngest(t, cs, desc)
assert.NilError(t, err)
assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{})))
Expand Down