Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release/1.6] retry request on writer reset #7461

Merged
merged 1 commit into from Oct 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
68 changes: 51 additions & 17 deletions content/helpers.go
Expand Up @@ -26,10 +26,16 @@ import (
"time"

"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/log"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
)

// maxResets is the no.of times the Copy() method can tolerate a reset of the body
const maxResets = 5

var ErrReset = errors.New("writer has been reset")

var bufPool = sync.Pool{
New: func() interface{} {
buffer := make([]byte, 1<<20)
Expand Down Expand Up @@ -80,7 +86,7 @@ func WriteBlob(ctx context.Context, cs Ingester, ref string, r io.Reader, desc o
return fmt.Errorf("failed to open writer: %w", err)
}

return nil // all ready present
return nil // already present
}
defer cw.Close()

Expand Down Expand Up @@ -131,35 +137,63 @@ func OpenWriter(ctx context.Context, cs Ingester, opts ...WriterOpt) (Writer, er
// the size or digest is unknown, these values may be empty.
//
// Copy is buffered, so no need to wrap reader in buffered io.
func Copy(ctx context.Context, cw Writer, r io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
func Copy(ctx context.Context, cw Writer, or io.Reader, size int64, expected digest.Digest, opts ...Opt) error {
ws, err := cw.Status()
if err != nil {
return fmt.Errorf("failed to get status: %w", err)
}

r := or
if ws.Offset > 0 {
r, err = seekReader(r, ws.Offset, size)
r, err = seekReader(or, ws.Offset, size)
if err != nil {
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
}
}

copied, err := copyWithBuffer(cw, r)
if err != nil {
return fmt.Errorf("failed to copy: %w", err)
}
if size != 0 && copied < size-ws.Offset {
// Short writes would return its own error, this indicates a read failure
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
}

if err := cw.Commit(ctx, size, expected, opts...); err != nil {
if !errdefs.IsAlreadyExists(err) {
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
for i := 0; i < maxResets; i++ {
if i >= 1 {
log.G(ctx).WithField("digest", expected).Debugf("retrying copy due to reset")
}
copied, err := copyWithBuffer(cw, r)
if errors.Is(err, ErrReset) {
ws, err := cw.Status()
if err != nil {
return fmt.Errorf("failed to get status: %w", err)
}
r, err = seekReader(or, ws.Offset, size)
if err != nil {
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
}
continue
}
if err != nil {
return fmt.Errorf("failed to copy: %w", err)
}
if size != 0 && copied < size-ws.Offset {
// Short writes would return its own error, this indicates a read failure
return fmt.Errorf("failed to read expected number of bytes: %w", io.ErrUnexpectedEOF)
}
if err := cw.Commit(ctx, size, expected, opts...); err != nil {
if errors.Is(err, ErrReset) {
ws, err := cw.Status()
if err != nil {
return fmt.Errorf("failed to get status: %w", err)
}
r, err = seekReader(or, ws.Offset, size)
if err != nil {
return fmt.Errorf("unable to resume write to %v: %w", ws.Ref, err)
}
continue
}
if !errdefs.IsAlreadyExists(err) {
return fmt.Errorf("failed commit on ref %q: %w", ws.Ref, err)
}
}
return nil
}

return nil
log.G(ctx).WithField("digest", expected).Errorf("failed to copy after %d retries", maxResets)
return fmt.Errorf("failed to copy after %d retries", maxResets)
}

// CopyReaderAt copies to a writer from a given reader at for the given
Expand Down
114 changes: 96 additions & 18 deletions content/helpers_test.go
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"context"
_ "crypto/sha256" // required by go-digest
"fmt"
"io"
"strings"
"testing"
Expand All @@ -39,38 +40,107 @@ type copySource struct {
func TestCopy(t *testing.T) {
defaultSource := newCopySource("this is the source to copy")

cf1 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets the first time
if i == 0 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
i++
buf.Reset()
st.Offset = 0
return ErrReset
}
return nil
}
}

cf2 := func(buf *bytes.Buffer, st Status) commitFunction {
i := 0
return func() error {
// function resets for more than the maxReset value
if i < maxResets+1 {
// this is the case where, the pipewriter to which the data was being written has
// changed. which means we need to clear the buffer
i++
buf.Reset()
st.Offset = 0
return ErrReset
}
return nil
}
}

s1 := Status{}
s2 := Status{}
b1 := bytes.Buffer{}
b2 := bytes.Buffer{}

var testcases = []struct {
name string
source copySource
writer fakeWriter
expected string
name string
source copySource
writer fakeWriter
expected string
expectedErr error
}{
{
name: "copy no offset",
source: defaultSource,
writer: fakeWriter{},
name: "copy no offset",
source: defaultSource,
writer: fakeWriter{
Buffer: &bytes.Buffer{},
},
expected: "this is the source to copy",
},
{
name: "copy with offset from seeker",
source: defaultSource,
writer: fakeWriter{status: Status{Offset: 8}},
name: "copy with offset from seeker",
source: defaultSource,
writer: fakeWriter{
Buffer: &bytes.Buffer{},
status: Status{Offset: 8},
},
expected: "the source to copy",
},
{
name: "copy with offset from unseekable source",
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
writer: fakeWriter{status: Status{Offset: 3}},
name: "copy with offset from unseekable source",
source: copySource{reader: bytes.NewBufferString("foobar"), size: 6},
writer: fakeWriter{
Buffer: &bytes.Buffer{},
status: Status{Offset: 3},
},
expected: "bar",
},
{
name: "commit already exists",
source: newCopySource("this already exists"),
writer: fakeWriter{commitFunc: func() error {
return errdefs.ErrAlreadyExists
}},
writer: fakeWriter{
Buffer: &bytes.Buffer{},
commitFunc: func() error {
return errdefs.ErrAlreadyExists
}},
expected: "this already exists",
},
{
name: "commit fails first time with ErrReset",
source: newCopySource("content to copy"),
writer: fakeWriter{
Buffer: &b1,
status: s1,
commitFunc: cf1(&b1, s1),
},
expected: "content to copy",
},
{
name: "write fails more than maxReset times due to reset",
source: newCopySource("content to copy"),
writer: fakeWriter{
Buffer: &b2,
status: s2,
commitFunc: cf2(&b2, s2),
},
expected: "",
expectedErr: fmt.Errorf("failed to copy after %d retries", maxResets),
},
}

for _, testcase := range testcases {
Expand All @@ -81,6 +151,12 @@ func TestCopy(t *testing.T) {
testcase.source.size,
testcase.source.digest)

// if an error is expected then further comparisons are not required
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mikebrow There was one merge conflict while cherry-picking here in the tests. Other than that it was clean.

if testcase.expectedErr != nil {
assert.Check(t, is.Equal(testcase.expectedErr.Error(), err.Error()))
return
}

assert.NilError(t, err)
assert.Check(t, is.Equal(testcase.source.digest, testcase.writer.committedDigest))
assert.Check(t, is.Equal(testcase.expected, testcase.writer.String()))
Expand All @@ -96,11 +172,13 @@ func newCopySource(raw string) copySource {
}
}

type commitFunction func() error

type fakeWriter struct {
bytes.Buffer
*bytes.Buffer
committedDigest digest.Digest
status Status
commitFunc func() error
commitFunc commitFunction
}

func (f *fakeWriter) Close() error {
Expand Down