Skip to content

Commit

Permalink
distribution: retry downloading schema config on retryable error
Browse files Browse the repository at this point in the history
fixes moby#43267

Signed-off-by: Pete Woods <pete.woods@circleci.com>
  • Loading branch information
pete-woods committed Feb 24, 2022
1 parent 48d08a7 commit d3f5dcc
Show file tree
Hide file tree
Showing 3 changed files with 198 additions and 1 deletion.
34 changes: 33 additions & 1 deletion distribution/pull_v2.go
Expand Up @@ -7,6 +7,7 @@ import (
"io"
"os"
"runtime"
"time"

"github.com/containerd/containerd/log"
"github.com/containerd/containerd/platforms"
Expand Down Expand Up @@ -856,9 +857,15 @@ func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mf
return id, manifestListDigest, err
}

const defaultSchemaPullBackoff = 250 * time.Millisecond
const defaultMaxSchemaPullAttempts = 5

func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) {
blobs := p.repo.Blobs(ctx)
configJSON, err = blobs.Get(ctx, dgst)
err = retry(ctx, defaultMaxSchemaPullAttempts, defaultSchemaPullBackoff, func(ctx context.Context) (err error) {
configJSON, err = blobs.Get(ctx, dgst)
return err
})
if err != nil {
return nil, err
}
Expand All @@ -877,6 +884,31 @@ func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (c
return configJSON, nil
}

func retry(ctx context.Context, maxAttempts int, sleep time.Duration, f func(ctx context.Context) error) (err error) {
for attempt := 0; attempt < maxAttempts; attempt++ {
err = retryOnError(f(ctx))
if err == nil {
return nil
}
if xfer.IsDoNotRetryError(err) {
return err
}

if attempt+1 < maxAttempts {
timer := time.NewTicker(sleep)
select {
case <-ctx.Done():
timer.Stop()
return ctx.Err()
case <-timer.C:
logrus.WithError(err).WithField("attempts", attempt+1).Error("retrying after error")
sleep *= 2
}
}
}
return errors.Wrapf(err, "download failed after attempts=%d", maxAttempts)
}

// schema2ManifestDigest computes the manifest digest, and, if pulling by
// digest, ensures that it matches the requested digest.
func schema2ManifestDigest(ref reference.Named, mfst distribution.Manifest) (digest.Digest, error) {
Expand Down
158 changes: 158 additions & 0 deletions distribution/pull_v2_test.go
@@ -1,13 +1,18 @@
package distribution // import "github.com/docker/docker/distribution"

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"os"
"reflect"
"regexp"
"runtime"
"strings"
"sync/atomic"
"testing"

"github.com/docker/distribution/manifest/schema1"
Expand All @@ -16,6 +21,11 @@ import (
specs "github.com/opencontainers/image-spec/specs-go/v1"
"gotest.tools/v3/assert"
is "gotest.tools/v3/assert/cmp"

"github.com/docker/docker/api/types"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/image"
"github.com/docker/docker/registry"
)

// TestFixManifestLayers checks that fixManifestLayers removes a duplicate
Expand Down Expand Up @@ -205,3 +215,151 @@ func TestFormatPlatform(t *testing.T) {
}
}
}

func TestPullSchema2Config(t *testing.T) {
ctx := context.Background()

const imageJSON = `{
"architecture": "amd64",
"os": "linux",
"config": {},
"rootfs": {
"type": "layers",
"diff_ids": []
}
}`
expectedDigest := digest.Digest("sha256:66ad98165d38f53ee73868f82bd4eed60556ddfee824810a4062c4f777b20a5b")
var callCount int64

tests := []struct {
name string
handler http.HandlerFunc
expectError string
}{
{
name: "success first time",
handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(imageJSON))
},
},
{
name: "500 status",
handler: func(w http.ResponseWriter, r *http.Request) {
count := atomic.AddInt64(&callCount, 1)
if count == 1 {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(imageJSON))
},
},
{
name: "EOF",
handler: func(w http.ResponseWriter, r *http.Request) {
count := atomic.AddInt64(&callCount, 1)
if count == 1 {
panic("intentional panic")
}
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte(imageJSON))
},
},
{
name: "unauthorized",
handler: func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusUnauthorized)
},
expectError: "unauthorized: authentication required",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
callCount = 0
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("HTTP %s %s", r.Method, r.URL.Path)
defer r.Body.Close()
switch {
case r.Method == "GET" && r.URL.Path == "/v2":
w.WriteHeader(http.StatusOK)
case r.Method == "GET" && r.URL.Path == "/v2/docker.io/library/testremotename/blobs/"+expectedDigest.String():
tt.handler(w, r)
default:
w.WriteHeader(http.StatusNotFound)
}
}))
defer ts.Close()

p := testNewPuller(t, ts.URL)

config, err := p.pullSchema2Config(ctx, expectedDigest)
if tt.expectError == "" {
if err != nil {
t.Fatal(err)
}

_, err = image.NewFromJSON(config)
if err != nil {
t.Fatal(err)
}
} else {
if !strings.Contains(err.Error(), tt.expectError) {
t.Fatalf("expected error=%q to contain %q", err, tt.expectError)
}
}

})
}
}

func testNewPuller(t *testing.T, rawurl string) *v2Puller {
t.Helper()

uri, err := url.Parse(rawurl)
if err != nil {
t.Fatalf("could not parse url from test server: %v", err)
}

endpoint := registry.APIEndpoint{
Mirror: false,
URL: uri,
Version: 2,
Official: false,
TrimHostname: false,
TLSConfig: nil,
}
n, _ := reference.ParseNormalizedNamed("testremotename")
repoInfo := &registry.RepositoryInfo{
Name: n,
Index: &registrytypes.IndexInfo{
Name: "testrepo",
Mirrors: nil,
Secure: false,
Official: false,
},
Official: false,
}
imagePullConfig := &ImagePullConfig{
Config: Config{
MetaHeaders: http.Header{},
AuthConfig: &types.AuthConfig{
RegistryToken: secretRegistryToken,
},
},
Schema2Types: ImageTypes,
}

puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil)
if err != nil {
t.Fatal(err)
}
p := puller.(*v2Puller)

p.repo, err = NewV2Repository(context.Background(), p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull")
if err != nil {
t.Fatal(err)
}
return p
}
7 changes: 7 additions & 0 deletions distribution/xfer/transfer.go
Expand Up @@ -5,6 +5,8 @@ import (
"runtime"
"sync"

"github.com/pkg/errors"

"github.com/docker/docker/pkg/progress"
)

Expand All @@ -19,6 +21,11 @@ func (e DoNotRetry) Error() string {
return e.Err.Error()
}

func IsDoNotRetryError(err error) bool {
var dnr DoNotRetry
return errors.As(err, &dnr)
}

// Watcher is returned by Watch and can be passed to Release to stop watching.
type Watcher struct {
// signalChan is used to signal to the watcher goroutine that
Expand Down

0 comments on commit d3f5dcc

Please sign in to comment.