diff --git a/content/helpers.go b/content/helpers.go index 00fae1fc80d87..2af4159a456ee 100644 --- a/content/helpers.go +++ b/content/helpers.go @@ -25,11 +25,17 @@ import ( "time" "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" ) +// maxResets is the no.of times the Copy() method can tolerate a reset of the body +const maxResets = 5 + +var ErrReset = errors.New("writer has been reset") + var bufPool = sync.Pool{ New: func() interface{} { buffer := make([]byte, 1<<20) @@ -80,7 +86,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc o return errors.Wrap(err, "failed to open writer") } - return nil // all ready present + return nil // already present } defer cw.Close() @@ -131,35 +137,63 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er // the size or digest is unknown, these values may be empty. // // Copy is buffered, so no need to wrap reader in buffered io. -func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error { +func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error { ws, err := cw.Status() if err != nil { return errors.Wrap(err, "failed to get status") } - + r := or if ws.Offset > 0 { - r, err = seekReader(r, ws.Offset, size) + r, err = seekReader(or, ws.Offset, size) if err != nil { return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) } } - copied, err := copyWithBuffer(cw, r) - if err != nil { - return errors.Wrap(err, "failed to copy") - } - if size != 0 && copied < size-ws.Offset { - // Short writes would return its own error, this indicates a read failure - return errors.Wrapf(io.ErrUnexpectedEOF, "failed to read expected number of bytes") - } - - if err := cw.Commit(ctx, size, expected, opts...); err != nil { - if !errdefs.IsAlreadyExists(err) { - return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) + for i := 0; i < maxResets; i++ { + if i >= 1 { + log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset") } + copied, err := copyWithBuffer(cw, r) + if errors.Is(err, ErrReset) { + ws, err := cw.Status() + if err != nil { + return errors.Wrap(err, "failed to get status") + } + r, err = seekReader(or, ws.Offset, size) + if err != nil { + return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) + } + continue + } + if err != nil { + return errors.Wrap(err, "failed to copy") + } + if size != 0 && copied < size-ws.Offset { + // Short writes would return its own error, this indicates a read failure + errors.Wrapf(io.ErrUnexpectedEOF, "failed to read expected number of bytes") + } + if err := cw.Commit(ctx, size, expected, opts...); err != nil { + if errors.Is(err, ErrReset) { + ws, err := cw.Status() + if err != nil { + return errors.Wrap(err, "failed to get status: %w") + } + r, err = seekReader(or, ws.Offset, size) + if err != nil { + return errors.Wrapf(err, "unable to resume write to %v", ws.Ref) + } + continue + } + if !errdefs.IsAlreadyExists(err) { + return errors.Wrapf(err, "failed commit on ref %q", ws.Ref) + } + } + return nil } - return nil + log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets) + return errors.Errorf("failed to copy after %d retries", maxResets) } // CopyReaderAt copies to a writer from a given reader at for the given diff --git a/content/helpers_test.go b/content/helpers_test.go index be52f043ed420..f4da531fc9c45 100644 --- a/content/helpers_test.go +++ b/content/helpers_test.go @@ -26,6 +26,7 @@ import ( "github.com/containerd/containerd/errdefs" "github.com/opencontainers/go-digest" + "github.com/pkg/errors" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" ) @@ -39,38 +40,107 @@ type copySource struct { func TestCopy(t *testing.T) { defaultSource := newCopySource("this is the source to copy") + cf1 := func(buf *bytes.Buffer, st Status) commitFunction { + i := 0 + return func() error { + // function resets the first time + if i == 0 { + // this is the case where, the pipewriter to which the data was being written has + // changed. which means we need to clear the buffer + i++ + buf.Reset() + st.Offset = 0 + return ErrReset + } + return nil + } + } + + cf2 := func(buf *bytes.Buffer, st Status) commitFunction { + i := 0 + return func() error { + // function resets for more than the maxReset value + if i < maxResets+1 { + // this is the case where, the pipewriter to which the data was being written has + // changed. which means we need to clear the buffer + i++ + buf.Reset() + st.Offset = 0 + return ErrReset + } + return nil + } + } + + s1 := Status{} + s2 := Status{} + b1 := bytes.Buffer{} + b2 := bytes.Buffer{} + var testcases = []struct { - name string - source copySource - writer fakeWriter - expected string + name string + source copySource + writer fakeWriter + expected string + expectedErr error }{ { - name: "copy no offset", - source: defaultSource, - writer: fakeWriter{}, + name: "copy no offset", + source: defaultSource, + writer: fakeWriter{ + Buffer: &bytes.Buffer{}, + }, expected: "this is the source to copy", }, { - name: "copy with offset from seeker", - source: defaultSource, - writer: fakeWriter{status: Status{Offset: 8}}, + name: "copy with offset from seeker", + source: defaultSource, + writer: fakeWriter{ + Buffer: &bytes.Buffer{}, + status: Status{Offset: 8}, + }, expected: "the source to copy", }, { - name: "copy with offset from unseekable source", - source: copySource{reader: bytes.NewBufferString("foobar"), size: 6}, - writer: fakeWriter{status: Status{Offset: 3}}, + name: "copy with offset from unseekable source", + source: copySource{reader: bytes.NewBufferString("foobar"), size: 6}, + writer: fakeWriter{ + Buffer: &bytes.Buffer{}, + status: Status{Offset: 3}, + }, expected: "bar", }, { name: "commit already exists", source: newCopySource("this already exists"), - writer: fakeWriter{commitFunc: func() error { - return errdefs.ErrAlreadyExists - }}, + writer: fakeWriter{ + Buffer: &bytes.Buffer{}, + commitFunc: func() error { + return errdefs.ErrAlreadyExists + }}, expected: "this already exists", }, + { + name: "commit fails first time with ErrReset", + source: newCopySource("content to copy"), + writer: fakeWriter{ + Buffer: &b1, + status: s1, + commitFunc: cf1(&b1, s1), + }, + expected: "content to copy", + }, + { + name: "write fails more than maxReset times due to reset", + source: newCopySource("content to copy"), + writer: fakeWriter{ + Buffer: &b2, + status: s2, + commitFunc: cf2(&b2, s2), + }, + expected: "", + expectedErr: errors.Errorf("failed to copy after %d retries", maxResets), + }, } for _, testcase := range testcases { @@ -81,6 +151,12 @@ func TestCopy(t *testing.T) { testcase.source.size, testcase.source.digest) + // if an error is expected then further comparisons are not required + if testcase.expectedErr != nil { + assert.Check(t, is.Equal(testcase.expectedErr.Error(), err.Error())) + return + } + assert.NilError(t, err) assert.Check(t, is.Equal(testcase.source.digest, testcase.writer.committedDigest)) assert.Check(t, is.Equal(testcase.expected, testcase.writer.String())) @@ -96,11 +172,13 @@ func newCopySource(raw string) copySource { } } +type commitFunction func() error + type fakeWriter struct { - bytes.Buffer + *bytes.Buffer committedDigest digest.Digest status Status - commitFunc func() error + commitFunc commitFunction } func (f *fakeWriter) Close() error { diff --git a/remotes/docker/pusher.go b/remotes/docker/pusher.go index da3652b4f8363..c44d1d91bd293 100644 --- a/remotes/docker/pusher.go +++ b/remotes/docker/pusher.go @@ -19,7 +19,6 @@ package docker import ( "context" "io" - "io/ioutil" "net/http" "net/url" "strings" @@ -261,27 +260,20 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str // TODO: Support chunked upload - pr, pw := io.Pipe() - respC := make(chan response, 1) - body := ioutil.NopCloser(pr) + pushw := newPushWriter(p.dockerBase, ref, desc.Digest, p.tracker, isManifest) req.body = func() (io.ReadCloser, error) { - if body == nil { - return nil, errors.New("cannot reuse body, request must be retried") - } - // Only use the body once since pipe cannot be seeked - ob := body - body = nil - return ob, nil + pr, pw := io.Pipe() + pushw.setPipe(pw) + return io.NopCloser(pr), nil } req.size = desc.Size go func() { - defer close(respC) resp, err := req.doWithRetries(ctx, nil) if err != nil { - respC <- response{err: err} - pr.CloseWithError(err) + pushw.setError(err) + pushw.Close() return } @@ -290,20 +282,13 @@ func (p dockerPusher) push(ctx context.Context, desc ocispec.Descriptor, ref str default: err := remoteserrors.NewUnexpectedStatusErr(resp) log.G(ctx).WithField("resp", resp).WithField("body", string(err.(remoteserrors.ErrUnexpectedStatus).Body)).Debug("unexpected response") - pr.CloseWithError(err) + pushw.setError(err) + pushw.Close() } - respC <- response{Response: resp} + pushw.setResponse(resp) }() - return &pushWriter{ - base: p.dockerBase, - ref: ref, - pipe: pw, - responseC: respC, - isManifest: isManifest, - expected: desc.Digest, - tracker: p.tracker, - }, nil + return pushw, nil } func getManifestPath(object string, dgst digest.Digest) []string { @@ -325,28 +310,80 @@ func getManifestPath(object string, dgst digest.Digest) []string { return []string{"manifests", object} } -type response struct { - *http.Response - err error -} - type pushWriter struct { base *dockerBase ref string - pipe *io.PipeWriter - responseC <-chan response + pipe *io.PipeWriter + + pipeC chan *io.PipeWriter + respC chan *http.Response + errC chan error + isManifest bool expected digest.Digest tracker StatusTracker } +func newPushWriter(db *dockerBase, ref string, expected digest.Digest, tracker StatusTracker, isManifest bool) *pushWriter { + // Initialize and create response + return &pushWriter{ + base: db, + ref: ref, + expected: expected, + tracker: tracker, + pipeC: make(chan *io.PipeWriter, 1), + respC: make(chan *http.Response, 1), + errC: make(chan error, 1), + isManifest: isManifest, + } +} + +func (pw *pushWriter) setPipe(p *io.PipeWriter) { + pw.pipeC <- p +} + +func (pw *pushWriter) setError(err error) { + pw.errC <- err +} +func (pw *pushWriter) setResponse(resp *http.Response) { + pw.respC <- resp +} + func (pw *pushWriter) Write(p []byte) (n int, err error) { status, err := pw.tracker.GetStatus(pw.ref) if err != nil { return n, err } + + if pw.pipe == nil { + p, ok := <-pw.pipeC + if !ok { + return 0, io.ErrClosedPipe + } + pw.pipe = p + } else { + select { + case p, ok := <-pw.pipeC: + if !ok { + return 0, io.ErrClosedPipe + } + pw.pipe.CloseWithError(content.ErrReset) + pw.pipe = p + + // If content has already been written, the bytes + // cannot be written and the caller must reset + if status.Offset > 0 { + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return 0, content.ErrReset + } + default: + } + } + n, err = pw.pipe.Write(p) status.Offset += int64(n) status.UpdatedAt = time.Now() @@ -355,13 +392,26 @@ func (pw *pushWriter) Write(p []byte) (n int, err error) { } func (pw *pushWriter) Close() error { - status, err := pw.tracker.GetStatus(pw.ref) - if err == nil && !status.Committed { - // Closing an incomplete writer. Record this as an error so that following write can retry it. - status.ErrClosed = errors.New("closed incomplete writer") - pw.tracker.SetStatus(pw.ref, status) + // Ensure pipeC is closed but handle `Close()` being + // called multiple times without panicking + select { + case _, ok := <-pw.pipeC: + if ok { + close(pw.pipeC) + } + default: + close(pw.pipeC) } - return pw.pipe.Close() + if pw.pipe != nil { + status, err := pw.tracker.GetStatus(pw.ref) + if err == nil && !status.Committed { + // Closing an incomplete writer. Record this as an error so that following write can retry it. + status.ErrClosed = errors.New("closed incomplete writer") + pw.tracker.SetStatus(pw.ref, status) + } + return pw.pipe.Close() + } + return nil } func (pw *pushWriter) Status() (content.Status, error) { @@ -388,18 +438,43 @@ func (pw *pushWriter) Commit(ctx context.Context, size int64, expected digest.Di return err } // TODO: timeout waiting for response - resp := <-pw.responseC - if resp.err != nil { - return resp.err + var resp *http.Response + select { + case err := <-pw.errC: + if err != nil { + return err + } + case resp = <-pw.respC: + defer resp.Body.Close() + case p, ok := <-pw.pipeC: + // check whether the pipe has changed in the commit, because sometimes Write + // can complete successfully, but the pipe may have changed. In that case, the + // content needs to be reset. + if !ok { + return io.ErrClosedPipe + } + pw.pipe.CloseWithError(content.ErrReset) + pw.pipe = p + status, err := pw.tracker.GetStatus(pw.ref) + if err != nil { + return err + } + // If content has already been written, the bytes + // cannot be written again and the caller must reset + if status.Offset > 0 { + status.Offset = 0 + status.UpdatedAt = time.Now() + pw.tracker.SetStatus(pw.ref, status) + return content.ErrReset + } } - defer resp.Response.Body.Close() // 201 is specified return status, some registries return // 200, 202 or 204. switch resp.StatusCode { case http.StatusOK, http.StatusCreated, http.StatusNoContent, http.StatusAccepted: default: - return remoteserrors.NewUnexpectedStatusErr(resp.Response) + return remoteserrors.NewUnexpectedStatusErr(resp) } status, err := pw.tracker.GetStatus(pw.ref) diff --git a/remotes/docker/pusher_test.go b/remotes/docker/pusher_test.go index 2dfe9a8d4731b..cf169222d8316 100644 --- a/remotes/docker/pusher_test.go +++ b/remotes/docker/pusher_test.go @@ -29,8 +29,12 @@ import ( "testing" "github.com/containerd/containerd/content" - digest "github.com/opencontainers/go-digest" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/remotes" + "github.com/opencontainers/go-digest" ocispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/stretchr/testify/assert" ) func TestGetManifestPath(t *testing.T) { @@ -81,6 +85,60 @@ func TestPusherErrClosedRetry(t *testing.T) { } } +// TestPusherErrReset tests the push method if the request needs to be retried +// i.e when ErrReset occurs +func TestPusherErrReset(t *testing.T) { + p, reg, done := samplePusher(t) + defer done() + + p.object = "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308" + + reg.uploadable = true + reg.putHandlerFunc = func() func(w http.ResponseWriter, r *http.Request) bool { + // sets whether the request should timeout so that a reset error can occur and + // request will be retried + shouldTimeout := true + return func(w http.ResponseWriter, r *http.Request) bool { + if shouldTimeout { + shouldTimeout = !shouldTimeout + w.WriteHeader(http.StatusRequestTimeout) + return true + } + return false + } + }() + + ct := []byte("manifest-content") + + desc := ocispec.Descriptor{ + MediaType: ocispec.MediaTypeImageManifest, + Digest: digest.FromBytes(ct), + Size: int64(len(ct)), + } + + w, err := p.push(context.Background(), desc, remotes.MakeRefKey(context.Background(), desc), false) + assert.Equal(t, err, nil, "no error should be there") + + w.Write(ct) + + pw, _ := w.(*pushWriter) + + select { + case p := <-pw.pipeC: + p.Write(ct) + case e := <-pw.errC: + assert.Failf(t, "error: %v while retrying request", e.Error()) + } + + select { + case resp := <-pw.respC: + assert.Equalf(t, resp.StatusCode, http.StatusCreated, + "201 should be the response code when uploading new content") + case <-pw.errC: + assert.Fail(t, "should not give error") + } +} + func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent []byte) error { desc := ocispec.Descriptor{ MediaType: ocispec.MediaTypeImageLayerGzip, @@ -99,7 +157,9 @@ func tryUpload(ctx context.Context, t *testing.T, p dockerPusher, layerContent [ } func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) { - reg := &uploadableMockRegistry{} + reg := &uploadableMockRegistry{ + availableContents: make([]string, 0), + } s := httptest.NewServer(reg) u, err := url.Parse(s.URL) if err != nil { @@ -124,40 +184,205 @@ func samplePusher(t *testing.T) (dockerPusher, *uploadableMockRegistry, func()) } var manifestRegexp = regexp.MustCompile(`/([a-z0-9]+)/manifests/(.*)`) -var blobUploadRegexp = regexp.MustCompile(`/([a-z0-9]+)/blobs/uploads/`) +var blobUploadRegexp = regexp.MustCompile(`/([a-z0-9]+)/blobs/uploads/(.*)`) // uploadableMockRegistry provides minimal registry APIs which are enough to serve requests from dockerPusher. type uploadableMockRegistry struct { - uploadable bool + availableContents []string + uploadable bool + putHandlerFunc func(w http.ResponseWriter, r *http.Request) bool } func (u *uploadableMockRegistry) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method == "POST" { + if r.Method == http.MethodPut && u.putHandlerFunc != nil { + // if true return the response witout calling default handler + if u.putHandlerFunc(w, r) { + return + } + } + u.defaultHandler(w, r) +} + +func (u *uploadableMockRegistry) defaultHandler(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodPost { if matches := blobUploadRegexp.FindStringSubmatch(r.URL.Path); len(matches) != 0 { if u.uploadable { w.Header().Set("Location", "/upload") } else { w.Header().Set("Location", "/cannotupload") } - w.WriteHeader(202) + dgstr := digest.Canonical.Digester() + if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + u.availableContents = append(u.availableContents, dgstr.Digest().String()) + w.WriteHeader(http.StatusAccepted) return } - } else if r.Method == "PUT" { + } else if r.Method == http.MethodPut { mfstMatches := manifestRegexp.FindStringSubmatch(r.URL.Path) if len(mfstMatches) != 0 || strings.HasPrefix(r.URL.Path, "/upload") { dgstr := digest.Canonical.Digester() if _, err := io.Copy(dgstr.Hash(), r.Body); err != nil { - w.WriteHeader(500) + w.WriteHeader(http.StatusInternalServerError) return } + u.availableContents = append(u.availableContents, dgstr.Digest().String()) w.Header().Set("Docker-Content-Digest", dgstr.Digest().String()) - w.WriteHeader(201) + w.WriteHeader(http.StatusCreated) return } else if r.URL.Path == "/cannotupload" { - w.WriteHeader(500) + w.WriteHeader(http.StatusInternalServerError) return } + } else if r.Method == http.MethodHead { + var content string + // check for both manifest and blob paths + if manifestMatch := manifestRegexp.FindStringSubmatch(r.URL.Path); len(manifestMatch) == 3 { + content = manifestMatch[2] + } else if blobMatch := blobUploadRegexp.FindStringSubmatch(r.URL.Path); len(blobMatch) == 3 { + content = blobMatch[2] + } + // if content is not found or if the path is not manifest or blob + // we return 404 + if u.isContentAlreadyExist(content) { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusNotFound) + } + return } fmt.Println(r) - w.WriteHeader(404) + w.WriteHeader(http.StatusNotFound) +} + +// checks if the content is already present in the registry +func (u *uploadableMockRegistry) isContentAlreadyExist(c string) bool { + for _, ct := range u.availableContents { + if ct == c { + return true + } + } + return false +} + +func Test_dockerPusher_push(t *testing.T) { + + p, reg, done := samplePusher(t) + defer done() + + reg.uploadable = true + + manifestContent := []byte("manifest-content") + manifestContentDigest := digest.FromBytes(manifestContent) + layerContent := []byte("layer-content") + layerContentDigest := digest.FromBytes(layerContent) + + // using a random object here + baseObject := "latest@sha256:55d31f3af94c797b65b310569803cacc1c9f4a34bf61afcdc8138f89345c8308" + + type args struct { + content []byte + mediatype string + ref string + unavailableOnFail bool + } + tests := []struct { + name string + dp dockerPusher + dockerBaseObject string + args args + checkerFunc func(writer pushWriter) bool + wantErr error + }{ + { + name: "when a manifest is pushed", + dp: p, + dockerBaseObject: baseObject, + args: args{ + content: manifestContent, + mediatype: ocispec.MediaTypeImageManifest, + ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()), + unavailableOnFail: false, + }, + checkerFunc: func(writer pushWriter) bool { + select { + case resp := <-writer.respC: + // 201 should be the response code when uploading a new manifest + return resp.StatusCode == http.StatusCreated + case <-writer.errC: + return false + } + }, + wantErr: nil, + }, + { + name: "trying to push content that already exists", + dp: p, + dockerBaseObject: baseObject, + args: args{ + content: manifestContent, + mediatype: ocispec.MediaTypeImageManifest, + ref: fmt.Sprintf("manifest-%s", manifestContentDigest.String()), + unavailableOnFail: false, + }, + wantErr: errors.Wrapf(errdefs.ErrAlreadyExists, "content %v on remote", digest.FromBytes(manifestContent)), + }, + { + name: "trying to push a blob layer", + dp: p, + // Not needed to set the base object as it is used to generate path only in case of manifests + // dockerBaseObject: + args: args{ + content: layerContent, + mediatype: ocispec.MediaTypeImageLayer, + ref: fmt.Sprintf("layer-%s", layerContentDigest.String()), + unavailableOnFail: false, + }, + checkerFunc: func(writer pushWriter) bool { + select { + case resp := <-writer.respC: + // 201 should be the response code when uploading a new blob + return resp.StatusCode == http.StatusCreated + case <-writer.errC: + return false + } + }, + wantErr: nil, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + desc := ocispec.Descriptor{ + MediaType: test.args.mediatype, + Digest: digest.FromBytes(test.args.content), + Size: int64(len(test.args.content)), + } + + test.dp.object = test.dockerBaseObject + + got, err := test.dp.push(context.Background(), desc, test.args.ref, test.args.unavailableOnFail) + + // if an error is expected, further comparisons are not required. + if test.wantErr != nil { + assert.Equal(t, test.wantErr.Error(), err.Error()) + return + } + + assert.NoError(t, err) + + // write the content to the writer, this will be done when a Read() is called on the body of the request + got.Write(test.args.content) + + pw, ok := got.(*pushWriter) + if !ok { + assert.Errorf(t, errors.New("unable to cast content.Writer to pushWriter"), "got %v instead of pushwriter", got) + } + + // test whether a proper response has been received after the push operation + assert.True(t, test.checkerFunc(*pw)) + + }) + } }