Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new implementation for limiting tcp connections #2259

Merged
merged 4 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -72,7 +73,7 @@ func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docke
return nil, specs.Descriptor{}, err
}
src := &withDistributionSourceLabel{
Provider: contentutil.FromFetcher(fetcher),
Provider: contentutil.FromFetcher(limited.Default.WrapFetcher(fetcher, ref)),
ref: ref,
source: cs,
}
Expand Down
24 changes: 18 additions & 6 deletions util/contentutil/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error {
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil {
if _, err := retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ref), logger)(ctx, desc); err != nil {
return err
}
return nil
Expand All @@ -34,18 +34,30 @@ func (f *localFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R

type rc struct {
content.ReaderAt
offset int
offset int64
}

func (r *rc) Read(b []byte) (int, error) {
n, err := r.ReadAt(b, int64(r.offset))
r.offset += n
n, err := r.ReadAt(b, r.offset)
r.offset += int64(n)
if n > 0 && err == io.EOF {
err = nil
}
return n, err
}

func (r *rc) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
r.offset = offset
case io.SeekCurrent:
r.offset += offset
case io.SeekEnd:
r.offset = r.Size() - offset
}
return r.offset, nil
}

func CopyChain(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor) error {
var m sync.Mutex
manifestStack := []ocispec.Descriptor{}
Expand All @@ -65,7 +77,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
handlers := []images.Handler{
images.ChildrenHandler(provider),
filterHandler,
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}),
retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ""), func(_ []byte) {}),
}

if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion util/imageutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -101,7 +102,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
children := childrenConfigHandler(cache, platform)

handlers := []images.Handler{
retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}),
retryhandler.New(limited.FetchHandler(cache, fetcher, str), func(_ []byte) {}),
children,
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion util/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/pull/pullprogress"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
}
handlers = append(handlers,
filterLayerBlobs(metadata, &mu),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)),
retryhandler.New(limited.FetchHandler(p.ContentStore, fetcher, p.ref), logs.LoggerFromContext(ctx)),
childrenHandler,
dslHandler,
)
Expand Down
4 changes: 2 additions & 2 deletions util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/session"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
}
})

pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx))
pushHandler := retryhandler.New(limited.PushHandler(pusher, provider, ref), logs.LoggerFromContext(ctx))
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
if err != nil {
return err
Expand Down
209 changes: 209 additions & 0 deletions util/resolver/limited/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package limited

import (
"context"
"io"
"runtime"
"strings"
"sync"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/docker/distribution/reference"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

var Default = New(4)

type Group struct {
mu sync.Mutex
size int
sem map[string][2]*semaphore.Weighted
}

type req struct {
g *Group
ref string
}

func (r *req) acquire(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
// json request get one additional connection
highPriority := strings.HasSuffix(desc.MediaType, "+json")

r.g.mu.Lock()
s, ok := r.g.sem[r.ref]
if !ok {
s = [2]*semaphore.Weighted{
semaphore.NewWeighted(int64(r.g.size)),
semaphore.NewWeighted(int64(r.g.size + 1)),
}
r.g.sem[r.ref] = s
}
r.g.mu.Unlock()
if !highPriority {
if err := s[0].Acquire(ctx, 1); err != nil {
return nil, err
}
}
if err := s[1].Acquire(ctx, 1); err != nil {
if !highPriority {
s[0].Release(1)
}
return nil, err
}
return func() {
s[1].Release(1)
if !highPriority {
s[0].Release(1)
}
}, nil
}

func New(size int) *Group {
return &Group{
size: size,
sem: make(map[string][2]*semaphore.Weighted),
}
}

func (g *Group) req(ref string) *req {
return &req{g: g, ref: domain(ref)}
}

func (g *Group) WrapFetcher(f remotes.Fetcher, ref string) remotes.Fetcher {
return &fetcher{Fetcher: f, req: g.req(ref)}
}

func (g *Group) WrapPusher(p remotes.Pusher, ref string) remotes.Pusher {
return &pusher{Pusher: p, req: g.req(ref)}
}

type pusher struct {
remotes.Pusher
req *req
}

func (p *pusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) {
release, err := p.req.acquire(ctx, desc)
if err != nil {
return nil, err
}
w, err := p.Pusher.Push(ctx, desc)
if err != nil {
release()
return nil, err
}
ww := &writer{Writer: w}
closer := func() {
if !ww.closed {
logrus.Warnf("writer not closed cleanly: %s", desc.Digest)
}
release()
}
ww.release = closer
runtime.SetFinalizer(ww, func(rc *writer) {
rc.close()
})
return ww, nil
}

type writer struct {
content.Writer
once sync.Once
release func()
closed bool
}

func (w *writer) Close() error {
w.closed = true
w.close()
return w.Writer.Close()
}

func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
w.closed = true
w.close()
return w.Writer.Commit(ctx, size, expected, opts...)
}

func (w *writer) close() {
w.once.Do(w.release)
}

type fetcher struct {
remotes.Fetcher
req *req
}

func (f *fetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
release, err := f.req.acquire(ctx, desc)
if err != nil {
return nil, err
}
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
release()
return nil, err
}

rcw := &readCloser{ReadCloser: rc}
closer := func() {
if !rcw.closed {
logrus.Warnf("fetcher not closed cleanly: %s", desc.Digest)
}
release()
}
rcw.release = closer
runtime.SetFinalizer(rcw, func(rc *readCloser) {
rc.close()
})

if s, ok := rc.(io.Seeker); ok {
return &readCloserSeeker{rcw, s}, nil
}

return rcw, nil
}

type readCloserSeeker struct {
*readCloser
io.Seeker
}

type readCloser struct {
io.ReadCloser
once sync.Once
closed bool
release func()
}

func (r *readCloser) Close() error {
r.closed = true
r.close()
return r.ReadCloser.Close()
}

func (r *readCloser) close() {
r.once.Do(r.release)
}

func FetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, ref string) images.HandlerFunc {
return remotes.FetchHandler(ingester, Default.WrapFetcher(fetcher, ref))
}

func PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc {
return remotes.PushHandler(Default.WrapPusher(pusher, ref), provider)
}

func domain(ref string) string {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
return reference.Domain(named)
}
}
return ref
}
28 changes: 1 addition & 27 deletions util/resolver/retryhandler/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,17 @@ import (
"fmt"
"io"
"net"
"sync"
"syscall"
"time"

"github.com/containerd/containerd/images"
remoteserrors "github.com/containerd/containerd/remotes/errors"
"github.com/docker/distribution/reference"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

var mu sync.Mutex
var sem = map[string]*semaphore.Weighted{}

const connsPerHost = 4

func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
ref = reference.Domain(named)
}
}

func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
mu.Lock()
s, ok := sem[ref]
if !ok {
s = semaphore.NewWeighted(connsPerHost)
sem[ref] = s
}
mu.Unlock()
if err := s.Acquire(ctx, 1); err != nil {
return nil, err
}
defer s.Release(1)

backoff := time.Second
for {
descs, err := f(ctx, desc)
Expand Down