Skip to content

Commit

Permalink
retry request on writer reset
Browse files Browse the repository at this point in the history
when a put request is retried due to the response from registry,
the body of the request should be seekable. A dynamic pipe is added
to the body so that the content of the body can be read again.
Currently a maximum of 5 resets are allowed, above which will fail the
request. A new error ErrReset is introduced which informs that a
reset has occured and request needs to be retried.

also added tests for Copy() and push() to test the new functionality

Signed-off-by: Akhil Mohan <makhil@vmware.com>
(cherry picked from commit 8f4c23b)
Signed-off-by: Akhil Mohan <makhil@vmware.com>
  • Loading branch information
akhilerm committed Oct 5, 2022
1 parent a389eb6 commit fd0f406
Show file tree
Hide file tree
Showing 4 changed files with 501 additions and 88 deletions.
68 changes: 51 additions & 17 deletions content/helpers.go
Expand Up @@ -25,11 +25,17 @@ import (
"time"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

// maxResets is the no.of times the Copy() method can tolerate a reset of the body
const maxResets = 5

var ErrReset = errors.New("writer has been reset")

var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 1<<20)
Expand Down Expand Up @@ -80,7 +86,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc o
return errors.Wrap(err, "failed to open writer")
}

return nil // all ready present
return nil // already present
}
defer cw.Close()

Expand Down Expand Up @@ -131,35 +137,63 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
// the size or digest is unknown, these values may be empty.
//
// Copy is buffered, so no need to wrap reader in buffered io.
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
ws, err := cw.Status()
if err != nil {
return errors.Wrap(err, "failed to get status")
}

r := or
if ws.Offset > 0 {
r, err = seekReader(r, ws.Offset, size)
r, err = seekReader(or, ws.Offset, size)
if err != nil {
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}
}

copied, err := copyWithBuffer(cw, r)
if err != nil {
return errors.Wrap(err, "failed to copy")
}
if size != 0 && copied < size-ws.Offset {
// Short writes would return its own error, this indicates a read failure
return errors.Wrapf(io.ErrUnexpectedEOF, "failed to read expected number of bytes")
}

if err := cw.Commit(ctx, size, expected, opts...); err != nil {
if !errdefs.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
for i := 0; i < maxResets; i++ {
if i >= 1 {
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
}
copied, err := copyWithBuffer(cw, r)
if errors.Is(err, ErrReset) {
ws, err := cw.Status()
if err != nil {
return errors.Wrap(err, "failed to get status")
}
r, err = seekReader(or, ws.Offset, size)
if err != nil {
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}
continue
}
if err != nil {
return errors.Wrap(err, "failed to copy")
}
if size != 0 && copied < size-ws.Offset {
// Short writes would return its own error, this indicates a read failure
errors.Wrapf(io.ErrUnexpectedEOF, "failed to read expected number of bytes")
}
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
if errors.Is(err, ErrReset) {
ws, err := cw.Status()
if err != nil {
return errors.Wrap(err, "failed to get status: %w")
}
r, err = seekReader(or, ws.Offset, size)
if err != nil {
return errors.Wrapf(err, "unable to resume write to %v", ws.Ref)
}
continue
}
if !errdefs.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed commit on ref %q", ws.Ref)
}
}
return nil
}

return nil
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
return errors.Errorf("failed to copy after %d retries", maxResets)
}

// CopyReaderAt copies to a writer from a given reader at for the given
Expand Down
114 changes: 96 additions & 18 deletions content/helpers_test.go
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/containerd/containerd/errdefs"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"
)
Expand All @@ -39,38 +40,107 @@ type copySource struct {
func TestCopy(t *testing.T) {
defaultSource := newCopySource("this is the source to copy")

cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets the first time
if i == 0 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
i++
buf.Reset()
st.Offset = 0
return ErrReset
}
return nil
}
}

cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets for more than the maxReset value
if i < maxResets+1 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
i++
buf.Reset()
st.Offset = 0
return ErrReset
}
return nil
}
}

s1 := Status{}
s2 := Status{}
b1 := bytes.Buffer{}
b2 := bytes.Buffer{}

var testcases = []struct {
name string
source copySource
writer fakeWriter
expected string
name string
source copySource
writer fakeWriter
expected string
expectedErr error
}{
{
name: "copy no offset",
source: defaultSource,
writer: fakeWriter{},
name: "copy no offset",
source: defaultSource,
writer: fakeWriter{
Buffer: &bytes.Buffer{},
},
expected: "this is the source to copy",
},
{
name: "copy with offset from seeker",
source: defaultSource,
writer: fakeWriter{status: Status{Offset: 8}},
name: "copy with offset from seeker",
source: defaultSource,
writer: fakeWriter{
Buffer: &bytes.Buffer{},
status: Status{Offset: 8},
},
expected: "the source to copy",
},
{
name: "copy with offset from unseekable source",
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
writer: fakeWriter{status: Status{Offset: 3}},
name: "copy with offset from unseekable source",
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
writer: fakeWriter{
Buffer: &bytes.Buffer{},
status: Status{Offset: 3},
},
expected: "bar",
},
{
name: "commit already exists",
source: newCopySource("this already exists"),
writer: fakeWriter{commitFunc: func() error {
return errdefs.ErrAlreadyExists
}},
writer: fakeWriter{
Buffer: &bytes.Buffer{},
commitFunc: func() error {
return errdefs.ErrAlreadyExists
}},
expected: "this already exists",
},
{
name: "commit fails first time with ErrReset",
source: newCopySource("content to copy"),
writer: fakeWriter{
Buffer: &b1,
status: s1,
commitFunc: cf1(&b1, s1),
},
expected: "content to copy",
},
{
name: "write fails more than maxReset times due to reset",
source: newCopySource("content to copy"),
writer: fakeWriter{
Buffer: &b2,
status: s2,
commitFunc: cf2(&b2, s2),
},
expected: "",
expectedErr: errors.Errorf("failed to copy after %d retries", maxResets),
},
}

for _, testcase := range testcases {
Expand All @@ -81,6 +151,12 @@ func TestCopy(t *testing.T) {
testcase.source.size,
testcase.source.digest)

// if an error is expected then further comparisons are not required
if testcase.expectedErr != nil {
assert.Check(t, is.Equal(testcase.expectedErr.Error(), err.Error()))
return
}

assert.NilError(t, err)
assert.Check(t, is.Equal(testcase.source.digest, testcase.writer.committedDigest))
assert.Check(t, is.Equal(testcase.expected, testcase.writer.String()))
Expand All @@ -96,11 +172,13 @@ func newCopySource(raw string) copySource {
}
}

type commitFunction func() error

type fakeWriter struct {
bytes.Buffer
*bytes.Buffer
committedDigest digest.Digest
status Status
commitFunc func() error
commitFunc commitFunction
}

func (f *fakeWriter) Close() error {
Expand Down

0 comments on commit fd0f406

Please sign in to comment.