Skip to content

Commit

Permalink
Merge pull request #104907 from adrianreber/2021-09-10-checkpoint
Browse files Browse the repository at this point in the history
Minimal checkpointing support
  • Loading branch information
k8s-ci-robot committed Jul 14, 2022
2 parents 27110bd + 92ea6e3 commit a655368
Show file tree
Hide file tree
Showing 20 changed files with 1,444 additions and 356 deletions.
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Expand Up @@ -190,6 +190,13 @@ const (
// Allows clients to request a duration for certificates issued via the Kubernetes CSR API.
CSRDuration featuregate.Feature = "CSRDuration"

// owner: @adrianreber
// kep: http://kep.k8s.io/2008
// alpha: v1.25
//
// Enables container Checkpoint support in the kubelet
ContainerCheckpoint featuregate.Feature = "ContainerCheckpoint"

// owner: @jiahuif
// alpha: v1.21
// beta: v1.22
Expand Down Expand Up @@ -846,6 +853,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

CSRDuration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26

ContainerCheckpoint: {Default: false, PreRelease: featuregate.Alpha},

ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26

CronJobTimeZone: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/config/defaults.go
Expand Up @@ -28,4 +28,5 @@ const (
DefaultKubeletPluginContainersDirName = "plugin-containers"
DefaultKubeletPodResourcesDirName = "pod-resources"
KubeletPluginsDirSELinuxLabel = "system_u:object_r:container_file_t:s0"
DefaultKubeletCheckpointsDirName = "checkpoints"
)
3 changes: 3 additions & 0 deletions pkg/kubelet/container/runtime.go
Expand Up @@ -119,6 +119,9 @@ type Runtime interface {
// This method just proxies a new runtimeConfig with the updated
// CIDR value down to the runtime shim.
UpdatePodCIDR(podCIDR string) error
// CheckpointContainer tells the runtime to checkpoint a container
// and store the resulting archive to the checkpoint directory.
CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error
}

// StreamingRuntime is the interface implemented by runtimes that handle the serving of the
Expand Down
8 changes: 8 additions & 0 deletions pkg/kubelet/container/testing/fake_runtime.go
Expand Up @@ -362,6 +362,14 @@ func (f *FakeRuntime) DeleteContainer(containerID kubecontainer.ContainerID) err
return f.Err
}

func (f *FakeRuntime) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
f.Lock()
defer f.Unlock()

f.CalledFunctions = append(f.CalledFunctions, "CheckpointContainer")
return f.Err
}

func (f *FakeRuntime) ImageStats() (*kubecontainer.ImageStats, error) {
f.Lock()
defer f.Unlock()
Expand Down
14 changes: 14 additions & 0 deletions pkg/kubelet/container/testing/runtime_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions pkg/kubelet/cri/remote/fake/fake_runtime.go
Expand Up @@ -322,3 +322,13 @@ func (f *RemoteRuntime) ReopenContainerLog(ctx context.Context, req *kubeapi.Reo

return &kubeapi.ReopenContainerLogResponse{}, nil
}

// CheckpointContainer checkpoints the given container.
func (f *RemoteRuntime) CheckpointContainer(ctx context.Context, req *kubeapi.CheckpointContainerRequest) (*kubeapi.CheckpointContainerResponse, error) {
err := f.RuntimeService.CheckpointContainer(&kubeapi.CheckpointContainerRequest{})
if err != nil {
return nil, err
}

return &kubeapi.CheckpointContainerResponse{}, nil
}
55 changes: 55 additions & 0 deletions pkg/kubelet/cri/remote/remote_runtime.go
Expand Up @@ -1149,3 +1149,58 @@ func (r *remoteRuntimeService) ReopenContainerLog(containerID string) (err error
klog.V(10).InfoS("[RemoteRuntimeService] ReopenContainerLog Response", "containerID", containerID)
return nil
}

// CheckpointContainer triggers a checkpoint of the given CheckpointContainerRequest
func (r *remoteRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
klog.V(10).InfoS(
"[RemoteRuntimeService] CheckpointContainer",
"options",
options,
)
if options == nil {
return errors.New("CheckpointContainer requires non-nil CheckpointRestoreOptions parameter")
}
if !r.useV1API() {
return errors.New("CheckpointContainer is only supported in the CRI v1 runtime API")
}

if options.Timeout < 0 {
return errors.New("CheckpointContainer requires the timeout value to be > 0")
}

ctx, cancel := func() (context.Context, context.CancelFunc) {
defaultTimeout := int64(r.timeout / time.Second)
if options.Timeout > defaultTimeout {
// The user requested a specific timeout, let's use that if it
// is larger than the CRI default.
return getContextWithTimeout(time.Duration(options.Timeout) * time.Second)
}
// If the user requested a timeout less than the
// CRI default, let's use the CRI default.
options.Timeout = defaultTimeout
return getContextWithTimeout(r.timeout)
}()
defer cancel()

_, err := r.runtimeClient.CheckpointContainer(
ctx,
options,
)

if err != nil {
klog.ErrorS(
err,
"CheckpointContainer from runtime service failed",
"containerID",
options.ContainerId,
)
return err
}
klog.V(10).InfoS(
"[RemoteRuntimeService] CheckpointContainer Response",
"containerID",
options.ContainerId,
)

return nil
}
46 changes: 46 additions & 0 deletions pkg/kubelet/kubelet.go
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"os"
"path"
"path/filepath"
sysruntime "runtime"
"sort"
"strings"
Expand Down Expand Up @@ -60,6 +61,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
"k8s.io/component-helpers/apimachinery/lease"
internalapi "k8s.io/cri-api/pkg/apis"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2"
pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
Expand Down Expand Up @@ -1229,6 +1231,7 @@ func (kl *Kubelet) RlimitStats() (*statsapi.RlimitStats, error) {
// 2. the pods directory
// 3. the plugins directory
// 4. the pod-resources directory
// 5. the checkpoint directory
func (kl *Kubelet) setupDataDirs() error {
kl.rootDirectory = path.Clean(kl.rootDirectory)
pluginRegistrationDir := kl.getPluginsRegistrationDir()
Expand All @@ -1251,6 +1254,11 @@ func (kl *Kubelet) setupDataDirs() error {
if err := os.MkdirAll(kl.getPodResourcesDir(), 0750); err != nil {
return fmt.Errorf("error creating podresources directory: %v", err)
}
if utilfeature.DefaultFeatureGate.Enabled(features.ContainerCheckpoint) {
if err := os.MkdirAll(kl.getCheckpointsDir(), 0700); err != nil {
return fmt.Errorf("error creating checkpoint directory: %v", err)
}
}
if selinux.GetEnabled() {
err := selinux.SetFileLabel(pluginRegistrationDir, config.KubeletPluginsDirSELinuxLabel)
if err != nil {
Expand Down Expand Up @@ -2439,6 +2447,44 @@ func (kl *Kubelet) fastStatusUpdateOnce() {
}
}

// CheckpointContainer tries to checkpoint a container. The parameters are used to
// look up the specified container. If the container specified by the given parameters
// cannot be found an error is returned. If the container is found the container
// engine will be asked to checkpoint the given container into the kubelet's default
// checkpoint directory.
func (kl *Kubelet) CheckpointContainer(
podUID types.UID,
podFullName,
containerName string,
options *runtimeapi.CheckpointContainerRequest,
) error {
container, err := kl.findContainer(podFullName, podUID, containerName)
if err != nil {
return err
}
if container == nil {
return fmt.Errorf("container %v not found", containerName)
}

options.Location = filepath.Join(
kl.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s-%s-%s.tar",
podFullName,
containerName,
time.Now().Format(time.RFC3339),
),
)

options.ContainerId = string(container.ID.ID)

if err := kl.containerRuntime.CheckpointContainer(options); err != nil {
return err
}

return nil
}

// isSyncPodWorthy filters out events that are not worthy of pod syncing
func isSyncPodWorthy(event *pleg.PodLifecycleEvent) bool {
// ContainerRemoved doesn't affect pod state
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/kubelet_getters.go
Expand Up @@ -77,6 +77,12 @@ func (kl *Kubelet) getPluginDir(pluginName string) string {
return filepath.Join(kl.getPluginsDir(), pluginName)
}

// getCheckpointsDir returns a data directory name for checkpoints.
// Checkpoints can be stored in this directory for further use.
func (kl *Kubelet) getCheckpointsDir() string {
return filepath.Join(kl.getRootDir(), config.DefaultKubeletCheckpointsDirName)
}

// getVolumeDevicePluginsDir returns the full path to the directory under which plugin
// directories are created. Plugins can use these directories for data that
// they need to persist. Plugins should create subdirectories under this named
Expand Down
116 changes: 116 additions & 0 deletions pkg/kubelet/kubelet_test.go
Expand Up @@ -21,10 +21,12 @@ import (
"fmt"
"io/ioutil"
"os"
"path/filepath"
"reflect"
goruntime "runtime"
"sort"
"strconv"
"strings"
"testing"
"time"

Expand All @@ -44,6 +46,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/klog/v2/ktesting"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/cm"
Expand Down Expand Up @@ -1583,6 +1586,119 @@ func TestFilterOutInactivePods(t *testing.T) {
assert.Equal(t, expected, actual)
}

func TestCheckpointContainer(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kubelet := testKubelet.kubelet

fakeRuntime := testKubelet.fakeRuntime
containerID := kubecontainer.ContainerID{
Type: "test",
ID: "abc1234",
}

fakePod := &containertest.FakePod{
Pod: &kubecontainer.Pod{
ID: "12345678",
Name: "podFoo",
Namespace: "nsFoo",
Containers: []*kubecontainer.Container{
{
Name: "containerFoo",
ID: containerID,
},
},
},
}

fakeRuntime.PodList = []*containertest.FakePod{fakePod}
wrongContainerName := "wrongContainerName"

tests := []struct {
name string
containerName string
checkpointLocation string
expectedStatus error
expectedLocation string
}{
{
name: "Checkpoint with wrong container name",
containerName: wrongContainerName,
checkpointLocation: "",
expectedStatus: fmt.Errorf("container %s not found", wrongContainerName),
expectedLocation: "",
},
{
name: "Checkpoint with default checkpoint location",
containerName: fakePod.Pod.Containers[0].Name,
checkpointLocation: "",
expectedStatus: nil,
expectedLocation: filepath.Join(
kubelet.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s_%s-%s",
fakePod.Pod.Name,
fakePod.Pod.Namespace,
fakePod.Pod.Containers[0].Name,
),
),
},
{
name: "Checkpoint with ignored location",
containerName: fakePod.Pod.Containers[0].Name,
checkpointLocation: "somethingThatWillBeIgnored",
expectedStatus: nil,
expectedLocation: filepath.Join(
kubelet.getCheckpointsDir(),
fmt.Sprintf(
"checkpoint-%s_%s-%s",
fakePod.Pod.Name,
fakePod.Pod.Namespace,
fakePod.Pod.Containers[0].Name,
),
),
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
options := &runtimeapi.CheckpointContainerRequest{}
if test.checkpointLocation != "" {
options.Location = test.checkpointLocation
}
status := kubelet.CheckpointContainer(
fakePod.Pod.ID,
fmt.Sprintf(
"%s_%s",
fakePod.Pod.Name,
fakePod.Pod.Namespace,
),
test.containerName,
options,
)
require.Equal(t, status, test.expectedStatus)

if status != nil {
return
}

require.True(
t,
strings.HasPrefix(
options.Location,
test.expectedLocation,
),
)
require.Equal(
t,
options.ContainerId,
containerID.ID,
)

})
}
}

func TestSyncPodsSetStatusToFailedForPodsThatRunTooLong(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
Expand Down
9 changes: 9 additions & 0 deletions pkg/kubelet/kuberuntime/instrumented_services.go
Expand Up @@ -324,3 +324,12 @@ func (in instrumentedImageManagerService) ImageFsInfo() ([]*runtimeapi.Filesyste
recordError(operation, err)
return fsInfo, nil
}

func (in instrumentedRuntimeService) CheckpointContainer(options *runtimeapi.CheckpointContainerRequest) error {
const operation = "checkpoint_container"
defer recordOperation(operation, time.Now())

err := in.service.CheckpointContainer(options)
recordError(operation, err)
return err
}

0 comments on commit a655368

Please sign in to comment.