Skip to content

Commit

Permalink
Merge pull request #6816 from dims/RFC-enhance-pull-performance-1.6
Browse files Browse the repository at this point in the history
  • Loading branch information
fuweid committed Apr 19, 2022
2 parents 64d2cf4 + 1764ea9 commit ec44f6b
Show file tree
Hide file tree
Showing 7 changed files with 429 additions and 25 deletions.
13 changes: 13 additions & 0 deletions image.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/containerd/containerd/diff"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/rootfs"
"github.com/containerd/containerd/snapshots"
Expand Down Expand Up @@ -287,6 +288,10 @@ type UnpackConfig struct {
// CheckPlatformSupported is whether to validate that a snapshotter
// supports an image's platform before unpacking
CheckPlatformSupported bool
// DuplicationSuppressor is used to make sure that there is only one
// in-flight fetch request or unpack handler for a given descriptor's
// digest or chain ID.
DuplicationSuppressor kmutex.KeyedLocker
}

// UnpackOpt provides configuration for unpack
Expand All @@ -300,6 +305,14 @@ func WithSnapshotterPlatformCheck() UnpackOpt {
}
}

// WithUnpackDuplicationSuppressor sets `DuplicationSuppressor` on the UnpackConfig.
func WithUnpackDuplicationSuppressor(suppressor kmutex.KeyedLocker) UnpackOpt {
return func(ctx context.Context, uc *UnpackConfig) error {
uc.DuplicationSuppressor = suppressor
return nil
}
}

func (i *image) Unpack(ctx context.Context, snapshotterName string, opts ...UnpackOpt) error {
ctx, done, err := i.client.WithLease(ctx)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cri/server/image_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ func (c *criService) PullImage(ctx context.Context, r *runtime.PullImageRequest)
containerd.WithPullLabel(imageLabelKey, imageLabelValue),
containerd.WithMaxConcurrentDownloads(c.config.MaxConcurrentDownloads),
containerd.WithImageHandler(imageHandler),
containerd.WithUnpackOpts([]containerd.UnpackOpt{
containerd.WithUnpackDuplicationSuppressor(c.unpackDuplicationSuppressor),
}),
}

pullOpts = append(pullOpts, c.encryptedImagesPullOpts()...)
Expand Down
28 changes: 17 additions & 11 deletions pkg/cri/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/pkg/cri/streaming"
"github.com/containerd/containerd/pkg/kmutex"
"github.com/containerd/containerd/plugin"
cni "github.com/containerd/go-cni"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -113,24 +114,29 @@ type criService struct {
// allCaps is the list of the capabilities.
// When nil, parsed from CapEff of /proc/self/status.
allCaps []string // nolint
// unpackDuplicationSuppressor is used to make sure that there is only
// one in-flight fetch request or unpack handler for a given descriptor's
// or chain ID.
unpackDuplicationSuppressor kmutex.KeyedLocker
}

// NewCRIService returns a new instance of CRIService
func NewCRIService(config criconfig.Config, client *containerd.Client) (CRIService, error) {
var err error
labels := label.NewStore()
c := &criService{
config: config,
client: client,
os: osinterface.RealOS{},
sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels),
imageStore: imagestore.NewStore(client),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
initialized: atomic.NewBool(false),
netPlugin: make(map[string]cni.CNI),
config: config,
client: client,
os: osinterface.RealOS{},
sandboxStore: sandboxstore.NewStore(labels),
containerStore: containerstore.NewStore(labels),
imageStore: imagestore.NewStore(client),
snapshotStore: snapshotstore.NewStore(),
sandboxNameIndex: registrar.NewRegistrar(),
containerNameIndex: registrar.NewRegistrar(),
initialized: atomic.NewBool(false),
netPlugin: make(map[string]cni.CNI),
unpackDuplicationSuppressor: kmutex.New(),
}

if client.SnapshotService(c.config.ContainerdConfig.Snapshotter) == nil {
Expand Down
105 changes: 105 additions & 0 deletions pkg/kmutex/kmutex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package kmutex provides synchronization primitives to lock/unlock resource by unique key.
package kmutex

import (
"context"
"fmt"
"sync"

"golang.org/x/sync/semaphore"
)

// KeyedLocker is the interface for acquiring locks based on string.
type KeyedLocker interface {
Lock(ctx context.Context, key string) error
Unlock(key string)
}

func New() KeyedLocker {
return newKeyMutex()
}

func newKeyMutex() *keyMutex {
return &keyMutex{
locks: make(map[string]*klock),
}
}

type keyMutex struct {
mu sync.Mutex

locks map[string]*klock
}

type klock struct {
*semaphore.Weighted
ref int
}

func (km *keyMutex) Lock(ctx context.Context, key string) error {
km.mu.Lock()

l, ok := km.locks[key]
if !ok {
km.locks[key] = &klock{
Weighted: semaphore.NewWeighted(1),
}
l = km.locks[key]
}
l.ref++
km.mu.Unlock()

if err := l.Acquire(ctx, 1); err != nil {
km.mu.Lock()
defer km.mu.Unlock()

l.ref--

if l.ref < 0 {
panic(fmt.Errorf("kmutex: release of unlocked key %v", key))
}

if l.ref == 0 {
delete(km.locks, key)
}
return err
}
return nil
}

func (km *keyMutex) Unlock(key string) {
km.mu.Lock()
defer km.mu.Unlock()

l, ok := km.locks[key]
if !ok {
panic(fmt.Errorf("kmutex: unlock of unlocked key %v", key))
}
l.Release(1)

l.ref--

if l.ref < 0 {
panic(fmt.Errorf("kmutex: released of unlocked key %v", key))
}

if l.ref == 0 {
delete(km.locks, key)
}
}
175 changes: 175 additions & 0 deletions pkg/kmutex/kmutex_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
Copyright The containerd Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kmutex

import (
"context"
"math/rand"
"runtime"
"strconv"
"sync"
"testing"
"time"

"github.com/containerd/containerd/pkg/seed"
"github.com/stretchr/testify/assert"
)

func init() {
seed.WithTimeAndRand()
}

func TestBasic(t *testing.T) {
t.Parallel()

km := newKeyMutex()
ctx := context.Background()

km.Lock(ctx, "c1")
km.Lock(ctx, "c2")

assert.Equal(t, len(km.locks), 2)
assert.Equal(t, km.locks["c1"].ref, 1)
assert.Equal(t, km.locks["c2"].ref, 1)

checkWaitFn := func(key string, num int) {
retries := 100
waitLock := false

for i := 0; i < retries; i++ {
// prevent from data-race
km.mu.Lock()
ref := km.locks[key].ref
km.mu.Unlock()

if ref == num {
waitLock = true
break
}
time.Sleep(time.Duration(rand.Int63n(100)) * time.Millisecond)
}
assert.Equal(t, waitLock, true)
}

// should acquire successfully after release
{
waitCh := make(chan struct{})
go func() {
defer close(waitCh)

km.Lock(ctx, "c1")
}()
checkWaitFn("c1", 2)

km.Unlock("c1")

<-waitCh
assert.Equal(t, km.locks["c1"].ref, 1)
}

// failed to acquire if context cancel
{
var errCh = make(chan error, 1)

ctx, cancel := context.WithCancel(context.Background())
go func() {
errCh <- km.Lock(ctx, "c1")
}()

checkWaitFn("c1", 2)

cancel()
assert.Equal(t, <-errCh, context.Canceled)
assert.Equal(t, km.locks["c1"].ref, 1)
}
}

func TestReleasePanic(t *testing.T) {
t.Parallel()

km := newKeyMutex()

defer func() {
if recover() == nil {
t.Fatal("release of unlocked key did not panic")
}
}()

km.Unlock(t.Name())
}

func TestMultileAcquireOnKeys(t *testing.T) {
t.Parallel()

km := newKeyMutex()
nloops := 10000
nproc := runtime.GOMAXPROCS(0)
ctx := context.Background()

var wg sync.WaitGroup
for i := 0; i < nproc; i++ {
wg.Add(1)

go func(key string) {
defer wg.Done()

for i := 0; i < nloops; i++ {
km.Lock(ctx, key)

time.Sleep(time.Duration(rand.Int63n(100)) * time.Nanosecond)

km.Unlock(key)
}
}("key-" + strconv.Itoa(i))
}
wg.Wait()
}

func TestMultiAcquireOnSameKey(t *testing.T) {
t.Parallel()

km := newKeyMutex()
key := "c1"
ctx := context.Background()

assert.Nil(t, km.Lock(ctx, key))

nproc := runtime.GOMAXPROCS(0)
nloops := 10000

var wg sync.WaitGroup
for i := 0; i < nproc; i++ {
wg.Add(1)

go func() {
defer wg.Done()

for i := 0; i < nloops; i++ {
km.Lock(ctx, key)

time.Sleep(time.Duration(rand.Int63n(100)) * time.Nanosecond)

km.Unlock(key)
}
}()
}
km.Unlock(key)
wg.Wait()

// c1 key has been released so the it should not have any klock.
assert.Equal(t, len(km.locks), 0)
}

0 comments on commit ec44f6b

Please sign in to comment.