From 1b4ebd9378064571e01fc7d922dd15781da7519c Mon Sep 17 00:00:00 2001 From: Pete Woods Date: Thu, 24 Feb 2022 12:18:27 +0000 Subject: [PATCH] distribution: retry downloading schema config on retryable error fixes #43267 Signed-off-by: Pete Woods --- distribution/pull_v2.go | 30 +++++- distribution/pull_v2_test.go | 197 +++++++++++++++++++++++++++++++++++ 2 files changed, 226 insertions(+), 1 deletion(-) diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index 07cc1e5b864b8..ff25f5801556d 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -7,6 +7,7 @@ import ( "io" "os" "runtime" + "time" "github.com/containerd/containerd/log" "github.com/containerd/containerd/platforms" @@ -858,7 +859,10 @@ func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mf func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) { blobs := p.repo.Blobs(ctx) - configJSON, err = blobs.Get(ctx, dgst) + err = retry(ctx, 5, 250*time.Millisecond, func(ctx context.Context) (err error) { + configJSON, err = blobs.Get(ctx, dgst) + return err + }) if err != nil { return nil, err } @@ -877,6 +881,30 @@ func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (c return configJSON, nil } +func retry(ctx context.Context, attempts int, sleep time.Duration, f func(ctx context.Context) error) (err error) { + for i := 0; i < attempts; i++ { + err = retryOnError(f(ctx)) + if err == nil { + return nil + } + if _, isDNR := err.(xfer.DoNotRetry); isDNR { + logrus.WithError(err).Errorf("download failed after %d attempts", attempts) + return err + } + + timer := time.NewTimer(sleep) + select { + case <-ctx.Done(): + timer.Stop() + return ctx.Err() + case <-timer.C: + logrus.WithError(err).Error("retrying after error") + sleep *= 2 + } + } + return fmt.Errorf("download failed after %d attempts: %w", attempts, err) +} + // schema2ManifestDigest computes the manifest digest, and, if pulling by // digest, ensures that it matches the requested digest. func schema2ManifestDigest(ref reference.Named, mfst distribution.Manifest) (digest.Digest, error) { diff --git a/distribution/pull_v2_test.go b/distribution/pull_v2_test.go index ac42858a7a349..d026561ac4bd2 100644 --- a/distribution/pull_v2_test.go +++ b/distribution/pull_v2_test.go @@ -1,13 +1,18 @@ package distribution // import "github.com/docker/docker/distribution" import ( + "context" "encoding/json" "fmt" + "net/http" + "net/http/httptest" + "net/url" "os" "reflect" "regexp" "runtime" "strings" + "sync/atomic" "testing" "github.com/docker/distribution/manifest/schema1" @@ -16,6 +21,11 @@ import ( specs "github.com/opencontainers/image-spec/specs-go/v1" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" + + "github.com/docker/docker/api/types" + registrytypes "github.com/docker/docker/api/types/registry" + "github.com/docker/docker/image" + "github.com/docker/docker/registry" ) // TestFixManifestLayers checks that fixManifestLayers removes a duplicate @@ -205,3 +215,190 @@ func TestFormatPlatform(t *testing.T) { } } } + +func Test_v2Puller_pullSchema2Config_Basic(t *testing.T) { + ctx := context.Background() + + const imageJSON = `{ + "architecture": "amd64", + "os": "linux", + "config": {}, + "rootfs": { + "type": "layers", + "diff_ids": [] + } +}` + expectedDigest := digest.Digest("sha256:66ad98165d38f53ee73868f82bd4eed60556ddfee824810a4062c4f777b20a5b") + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + switch { + case r.Method == "GET" && r.URL.Path == "/v2": + w.WriteHeader(http.StatusOK) + case r.Method == "GET" && r.URL.Path == "/v2/docker.io/library/testremotename/blobs/"+expectedDigest.String(): + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(imageJSON)) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + p := testNewPuller(t, ts.URL) + + config, err := p.pullSchema2Config(ctx, expectedDigest) + if err != nil { + t.Fatal(err) + } + + _, err = image.NewFromJSON(config) + if err != nil { + t.Fatal(err) + } +} + +func Test_v2Puller_pullSchema2Config_RetryEOF(t *testing.T) { + ctx := context.Background() + + const imageJSON = `{ + "architecture": "amd64", + "os": "linux", + "config": {}, + "rootfs": { + "type": "layers", + "diff_ids": [] + } +}` + expectedDigest := digest.Digest("sha256:66ad98165d38f53ee73868f82bd4eed60556ddfee824810a4062c4f777b20a5b") + + var callCount int64 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + switch { + case r.Method == "GET" && r.URL.Path == "/v2": + w.WriteHeader(http.StatusOK) + case r.Method == "GET" && r.URL.Path == "/v2/docker.io/library/testremotename/blobs/"+expectedDigest.String(): + count := atomic.AddInt64(&callCount, 1) + if count == 1 { + panic("intended panic") + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(imageJSON)) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + p := testNewPuller(t, ts.URL) + + config, err := p.pullSchema2Config(ctx, expectedDigest) + if err != nil { + t.Fatal(err) + } + + _, err = image.NewFromJSON(config) + if err != nil { + t.Fatal(err) + } +} + +func Test_v2Puller_pullSchema2Config_Retry500(t *testing.T) { + ctx := context.Background() + + const imageJSON = `{ + "architecture": "amd64", + "os": "linux", + "config": {}, + "rootfs": { + "type": "layers", + "diff_ids": [] + } +}` + expectedDigest := digest.Digest("sha256:66ad98165d38f53ee73868f82bd4eed60556ddfee824810a4062c4f777b20a5b") + + var callCount int64 + + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + switch { + case r.Method == "GET" && r.URL.Path == "/v2": + w.WriteHeader(http.StatusOK) + case r.Method == "GET" && r.URL.Path == "/v2/docker.io/library/testremotename/blobs/"+expectedDigest.String(): + count := atomic.AddInt64(&callCount, 1) + if count == 1 { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(imageJSON)) + default: + w.WriteHeader(http.StatusNotFound) + } + })) + defer ts.Close() + + p := testNewPuller(t, ts.URL) + + config, err := p.pullSchema2Config(ctx, expectedDigest) + if err != nil { + t.Fatal(err) + } + + _, err = image.NewFromJSON(config) + if err != nil { + t.Fatal(err) + } +} + +func testNewPuller(t *testing.T, rawurl string) *v2Puller { + t.Helper() + + uri, err := url.Parse(rawurl) + if err != nil { + t.Fatalf("could not parse url from test server: %v", err) + } + + endpoint := registry.APIEndpoint{ + Mirror: false, + URL: uri, + Version: 2, + Official: false, + TrimHostname: false, + TLSConfig: nil, + } + n, _ := reference.ParseNormalizedNamed("testremotename") + repoInfo := ®istry.RepositoryInfo{ + Name: n, + Index: ®istrytypes.IndexInfo{ + Name: "testrepo", + Mirrors: nil, + Secure: false, + Official: false, + }, + Official: false, + } + imagePullConfig := &ImagePullConfig{ + Config: Config{ + MetaHeaders: http.Header{}, + AuthConfig: &types.AuthConfig{ + RegistryToken: secretRegistryToken, + }, + }, + Schema2Types: ImageTypes, + } + + puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil) + if err != nil { + t.Fatal(err) + } + p := puller.(*v2Puller) + + p.repo, err = NewV2Repository(context.Background(), p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull") + if err != nil { + t.Fatal(err) + } + return p +}