Skip to content

Commit

Permalink
Merge pull request #2259 from tonistiigi/conn-limit2
Browse files Browse the repository at this point in the history
new implementation for limiting tcp connections
  • Loading branch information
tonistiigi committed Jul 16, 2021
2 parents 4b01f8c + 3018834 commit 3790ea3
Show file tree
Hide file tree
Showing 7 changed files with 236 additions and 38 deletions.
3 changes: 2 additions & 1 deletion cache/remotecache/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -72,7 +73,7 @@ func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docke
return nil, specs.Descriptor{}, err
}
src := &withDistributionSourceLabel{
Provider: contentutil.FromFetcher(fetcher),
Provider: contentutil.FromFetcher(limited.Default.WrapFetcher(fetcher, ref)),
ref: ref,
source: cs,
}
Expand Down
24 changes: 18 additions & 6 deletions util/contentutil/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
)

func Copy(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor, ref string, logger func([]byte)) error {
if _, err := retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), ref, logger)(ctx, desc); err != nil {
if _, err := retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ref), logger)(ctx, desc); err != nil {
return err
}
return nil
Expand All @@ -34,18 +34,30 @@ func (f *localFetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.R

type rc struct {
content.ReaderAt
offset int
offset int64
}

func (r *rc) Read(b []byte) (int, error) {
n, err := r.ReadAt(b, int64(r.offset))
r.offset += n
n, err := r.ReadAt(b, r.offset)
r.offset += int64(n)
if n > 0 && err == io.EOF {
err = nil
}
return n, err
}

func (r *rc) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
r.offset = offset
case io.SeekCurrent:
r.offset += offset
case io.SeekEnd:
r.offset = r.Size() - offset
}
return r.offset, nil
}

func CopyChain(ctx context.Context, ingester content.Ingester, provider content.Provider, desc ocispec.Descriptor) error {
var m sync.Mutex
manifestStack := []ocispec.Descriptor{}
Expand All @@ -65,7 +77,7 @@ func CopyChain(ctx context.Context, ingester content.Ingester, provider content.
handlers := []images.Handler{
images.ChildrenHandler(provider),
filterHandler,
retryhandler.New(remotes.FetchHandler(ingester, &localFetcher{provider}), "", func(_ []byte) {}),
retryhandler.New(limited.FetchHandler(ingester, &localFetcher{provider}, ""), func(_ []byte) {}),
}

if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion util/imageutil/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/moby/buildkit/util/leaseutil"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -101,7 +102,7 @@ func Config(ctx context.Context, str string, resolver remotes.Resolver, cache Co
children := childrenConfigHandler(cache, platform)

handlers := []images.Handler{
retryhandler.New(remotes.FetchHandler(cache, fetcher), str, func(_ []byte) {}),
retryhandler.New(limited.FetchHandler(cache, fetcher, str), func(_ []byte) {}),
children,
}
if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion util/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/pull/pullprogress"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -148,7 +149,7 @@ func (p *Puller) PullManifests(ctx context.Context) (*PulledManifests, error) {
}
handlers = append(handlers,
filterLayerBlobs(metadata, &mu),
retryhandler.New(remotes.FetchHandler(p.ContentStore, fetcher), p.ref, logs.LoggerFromContext(ctx)),
retryhandler.New(limited.FetchHandler(p.ContentStore, fetcher, p.ref), logs.LoggerFromContext(ctx)),
childrenHandler,
dslHandler,
)
Expand Down
4 changes: 2 additions & 2 deletions util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/containerd/containerd/remotes/docker"
"github.com/docker/distribution/reference"
"github.com/moby/buildkit/session"
Expand All @@ -20,6 +19,7 @@ import (
"github.com/moby/buildkit/util/progress"
"github.com/moby/buildkit/util/progress/logs"
"github.com/moby/buildkit/util/resolver"
"github.com/moby/buildkit/util/resolver/limited"
"github.com/moby/buildkit/util/resolver/retryhandler"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
Expand Down Expand Up @@ -86,7 +86,7 @@ func Push(ctx context.Context, sm *session.Manager, sid string, provider content
}
})

pushHandler := retryhandler.New(remotes.PushHandler(pusher, provider), ref, logs.LoggerFromContext(ctx))
pushHandler := retryhandler.New(limited.PushHandler(pusher, provider, ref), logs.LoggerFromContext(ctx))
pushUpdateSourceHandler, err := updateDistributionSourceHandler(manager, pushHandler, ref)
if err != nil {
return err
Expand Down
209 changes: 209 additions & 0 deletions util/resolver/limited/group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package limited

import (
"context"
"io"
"runtime"
"strings"
"sync"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/remotes"
"github.com/docker/distribution/reference"
"github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/sirupsen/logrus"
"golang.org/x/sync/semaphore"
)

var Default = New(4)

type Group struct {
mu sync.Mutex
size int
sem map[string][2]*semaphore.Weighted
}

type req struct {
g *Group
ref string
}

func (r *req) acquire(ctx context.Context, desc ocispec.Descriptor) (func(), error) {
// json request get one additional connection
highPriority := strings.HasSuffix(desc.MediaType, "+json")

r.g.mu.Lock()
s, ok := r.g.sem[r.ref]
if !ok {
s = [2]*semaphore.Weighted{
semaphore.NewWeighted(int64(r.g.size)),
semaphore.NewWeighted(int64(r.g.size + 1)),
}
r.g.sem[r.ref] = s
}
r.g.mu.Unlock()
if !highPriority {
if err := s[0].Acquire(ctx, 1); err != nil {
return nil, err
}
}
if err := s[1].Acquire(ctx, 1); err != nil {
if !highPriority {
s[0].Release(1)
}
return nil, err
}
return func() {
s[1].Release(1)
if !highPriority {
s[0].Release(1)
}
}, nil
}

func New(size int) *Group {
return &Group{
size: size,
sem: make(map[string][2]*semaphore.Weighted),
}
}

func (g *Group) req(ref string) *req {
return &req{g: g, ref: domain(ref)}
}

func (g *Group) WrapFetcher(f remotes.Fetcher, ref string) remotes.Fetcher {
return &fetcher{Fetcher: f, req: g.req(ref)}
}

func (g *Group) WrapPusher(p remotes.Pusher, ref string) remotes.Pusher {
return &pusher{Pusher: p, req: g.req(ref)}
}

type pusher struct {
remotes.Pusher
req *req
}

func (p *pusher) Push(ctx context.Context, desc ocispec.Descriptor) (content.Writer, error) {
release, err := p.req.acquire(ctx, desc)
if err != nil {
return nil, err
}
w, err := p.Pusher.Push(ctx, desc)
if err != nil {
release()
return nil, err
}
ww := &writer{Writer: w}
closer := func() {
if !ww.closed {
logrus.Warnf("writer not closed cleanly: %s", desc.Digest)
}
release()
}
ww.release = closer
runtime.SetFinalizer(ww, func(rc *writer) {
rc.close()
})
return ww, nil
}

type writer struct {
content.Writer
once sync.Once
release func()
closed bool
}

func (w *writer) Close() error {
w.closed = true
w.close()
return w.Writer.Close()
}

func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
w.closed = true
w.close()
return w.Writer.Commit(ctx, size, expected, opts...)
}

func (w *writer) close() {
w.once.Do(w.release)
}

type fetcher struct {
remotes.Fetcher
req *req
}

func (f *fetcher) Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error) {
release, err := f.req.acquire(ctx, desc)
if err != nil {
return nil, err
}
rc, err := f.Fetcher.Fetch(ctx, desc)
if err != nil {
release()
return nil, err
}

rcw := &readCloser{ReadCloser: rc}
closer := func() {
if !rcw.closed {
logrus.Warnf("fetcher not closed cleanly: %s", desc.Digest)
}
release()
}
rcw.release = closer
runtime.SetFinalizer(rcw, func(rc *readCloser) {
rc.close()
})

if s, ok := rc.(io.Seeker); ok {
return &readCloserSeeker{rcw, s}, nil
}

return rcw, nil
}

type readCloserSeeker struct {
*readCloser
io.Seeker
}

type readCloser struct {
io.ReadCloser
once sync.Once
closed bool
release func()
}

func (r *readCloser) Close() error {
r.closed = true
r.close()
return r.ReadCloser.Close()
}

func (r *readCloser) close() {
r.once.Do(r.release)
}

func FetchHandler(ingester content.Ingester, fetcher remotes.Fetcher, ref string) images.HandlerFunc {
return remotes.FetchHandler(ingester, Default.WrapFetcher(fetcher, ref))
}

func PushHandler(pusher remotes.Pusher, provider content.Provider, ref string) images.HandlerFunc {
return remotes.PushHandler(Default.WrapPusher(pusher, ref), provider)
}

func domain(ref string) string {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
return reference.Domain(named)
}
}
return ref
}
28 changes: 1 addition & 27 deletions util/resolver/retryhandler/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,17 @@ import (
"fmt"
"io"
"net"
"sync"
"syscall"
"time"

"github.com/containerd/containerd/images"
remoteserrors "github.com/containerd/containerd/remotes/errors"
"github.com/docker/distribution/reference"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
"golang.org/x/sync/semaphore"
)

var mu sync.Mutex
var sem = map[string]*semaphore.Weighted{}

const connsPerHost = 4

func New(f images.HandlerFunc, ref string, logger func([]byte)) images.HandlerFunc {
if ref != "" {
if named, err := reference.ParseNormalizedNamed(ref); err == nil {
ref = reference.Domain(named)
}
}

func New(f images.HandlerFunc, logger func([]byte)) images.HandlerFunc {
return func(ctx context.Context, desc ocispec.Descriptor) ([]ocispec.Descriptor, error) {
mu.Lock()
s, ok := sem[ref]
if !ok {
s = semaphore.NewWeighted(connsPerHost)
sem[ref] = s
}
mu.Unlock()
if err := s.Acquire(ctx, 1); err != nil {
return nil, err
}
defer s.Release(1)

backoff := time.Second
for {
descs, err := f(ctx, desc)
Expand Down

0 comments on commit 3790ea3

Please sign in to comment.