Skip to content

Commit

Permalink
Update container with sandbox metadata after NetNS is created
Browse files Browse the repository at this point in the history
Signed-off-by: Qiutong Song <songqt01@gmail.com>
(cherry picked from commit b41d6f4)
Signed-off-by: Qiutong Song <songqt01@gmail.com>
(cherry picked from commit f2376e6)
Signed-off-by: Qiutong Song <songqt01@gmail.com>
  • Loading branch information
qiutongs committed Oct 26, 2022
1 parent a9e1c3b commit 5fe264a
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 52 deletions.
99 changes: 95 additions & 4 deletions integration/main_test.go
Expand Up @@ -20,18 +20,24 @@
package integration

import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
goruntime "runtime"
"strconv"
"strings"
"syscall"
"testing"
"time"

"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -328,8 +334,15 @@ func Randomize(str string) string {
}

// KillProcess kills the process by name. pkill is used.
func KillProcess(name string) error {
output, err := exec.Command("pkill", "-x", fmt.Sprintf("^%s$", name)).CombinedOutput()
func KillProcess(name string, signal syscall.Signal) error {
var command []string
if goruntime.GOOS == "windows" {
command = []string{"taskkill", "/IM", name, "/F"}
} else {
command = []string{"pkill", "-" + strconv.Itoa(int(signal)), "-x", fmt.Sprintf("^%s$", name)}
}

output, err := exec.Command(command[0], command[1:]...).CombinedOutput()
if err != nil {
return errors.Errorf("failed to kill %q - error: %v, output: %q", name, err, output)
}
Expand Down Expand Up @@ -358,6 +371,80 @@ func PidOf(name string) (int, error) {
return strconv.Atoi(output)
}

// PidsOf returns pid(s) of a process by name
func PidsOf(name string) ([]int, error) {
if len(name) == 0 {
return []int{}, fmt.Errorf("name is required")
}

procDirFD, err := os.Open("/proc")
if err != nil {
return nil, fmt.Errorf("failed to open /proc: %w", err)
}
defer procDirFD.Close()

res := []int{}
for {
fileInfos, err := procDirFD.Readdir(100)
if err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("failed to readdir: %w", err)
}

for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() {
continue
}

pid, err := strconv.Atoi(fileInfo.Name())
if err != nil {
continue
}

exePath, err := os.Readlink(filepath.Join("/proc", fileInfo.Name(), "exe"))
if err != nil {
continue
}

if strings.HasSuffix(exePath, name) {
res = append(res, pid)
}
}
}
return res, nil
}

// PidEnvs returns the environ of pid in key-value pairs.
func PidEnvs(pid int) (map[string]string, error) {
envPath := filepath.Join("/proc", strconv.Itoa(pid), "environ")

b, err := os.ReadFile(envPath)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", envPath, err)
}

values := bytes.Split(b, []byte{0})
if len(values) == 0 {
return nil, nil
}

res := make(map[string]string)
for _, value := range values {
value := strings.TrimSpace(string(value))
if len(value) == 0 {
continue
}

parts := strings.SplitN(value, "=", 2)
if len(parts) == 2 {
res[parts[0]] = parts[1]
}
}
return res, nil
}

// RawRuntimeClient returns a raw grpc runtime service client.
func RawRuntimeClient() (runtime.RuntimeServiceClient, error) {
addr, dialer, err := dialer.GetAddressAndDialer(*criEndpoint)
Expand Down Expand Up @@ -411,8 +498,8 @@ func SandboxInfo(id string) (*runtime.PodSandboxStatus, *server.SandboxInfo, err
return status, &info, nil
}

func RestartContainerd(t *testing.T) {
require.NoError(t, KillProcess(*containerdBin))
func RestartContainerd(t *testing.T, signal syscall.Signal) {
require.NoError(t, KillProcess(*containerdBin, signal))

// Use assert so that the 3rd wait always runs, this makes sure
// containerd is running before this function returns.
Expand All @@ -428,3 +515,7 @@ func RestartContainerd(t *testing.T) {
return ConnectDaemons() == nil, nil
}, time.Second, 30*time.Second), "wait for containerd to be restarted")
}

func GetContainer(id string) (containers.Container, error) {
return containerdClient.ContainerService().Get(context.Background(), id)
}
3 changes: 2 additions & 1 deletion integration/restart_test.go
Expand Up @@ -21,6 +21,7 @@ package integration

import (
"sort"
"syscall"
"testing"

"github.com/containerd/containerd"
Expand Down Expand Up @@ -146,7 +147,7 @@ func TestContainerdRestart(t *testing.T) {
assert.NoError(t, err)

t.Logf("Restart containerd")
RestartContainerd(t)
RestartContainerd(t, syscall.SIGTERM)

t.Logf("Check sandbox and container state after restart")
loadedSandboxes, err := runtimeService.ListPodSandbox(&runtime.PodSandboxFilter{})
Expand Down
143 changes: 141 additions & 2 deletions integration/sandbox_run_rollback_linux_test.go
Expand Up @@ -25,19 +25,26 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"syscall"
"testing"
"time"

"github.com/containerd/containerd/pkg/cri/store/sandbox"
"github.com/containerd/containerd/pkg/failpoint"
"github.com/containerd/continuity"
"github.com/containerd/go-cni"
"github.com/containerd/typeurl"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
criapiv1alpha2 "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

const (
failpointRuntimeHandler = "runc-fp"
failpointCNIBinary = "cni-bridge-fp"

failpointShimPrefixKey = "io.containerd.runtime.v2.shim.failpoint."

Expand Down Expand Up @@ -134,7 +141,7 @@ func TestRunPodSandboxWithShimDeleteFailure(t *testing.T) {

if restart {
t.Log("Restart containerd")
RestartContainerd(t)
RestartContainerd(t, syscall.SIGTERM)

t.Log("ListPodSandbox with the specific label")
l, err = runtimeService.ListPodSandbox(&criapiv1alpha2.PodSandboxFilter{Id: sb.Id})
Expand All @@ -150,6 +157,8 @@ func TestRunPodSandboxWithShimDeleteFailure(t *testing.T) {
}

t.Log("Cleanup leaky sandbox")
err = runtimeService.StopPodSandbox(sb.Id)
require.NoError(t, err)
err = runtimeService.RemovePodSandbox(sb.Id)
require.NoError(t, err)
}
Expand Down Expand Up @@ -209,7 +218,7 @@ func TestRunPodSandboxWithShimStartAndTeardownCNIFailure(t *testing.T) {

if restart {
t.Log("Restart containerd")
RestartContainerd(t)
RestartContainerd(t, syscall.SIGTERM)

t.Log("ListPodSandbox with the specific label")
l, err = runtimeService.ListPodSandbox(&criapiv1alpha2.PodSandboxFilter{Id: sb.Id})
Expand All @@ -219,6 +228,8 @@ func TestRunPodSandboxWithShimStartAndTeardownCNIFailure(t *testing.T) {
}

t.Log("Cleanup leaky sandbox")
err = runtimeService.StopPodSandbox(sb.Id)
require.NoError(t, err)
err = runtimeService.RemovePodSandbox(sb.Id)
require.NoError(t, err)
}
Expand All @@ -227,6 +238,134 @@ func TestRunPodSandboxWithShimStartAndTeardownCNIFailure(t *testing.T) {
t.Run("JustCleanup", testCase(false))
}

// TestRunPodSandboxWithShimStartAndTeardownCNISlow should keep the sandbox
// record if failed to rollback CNI API.
func TestRunPodSandboxAndTeardownCNISlow(t *testing.T) {
if runtime.GOOS != "linux" {
t.Skip()
}
if os.Getenv("ENABLE_CRI_SANDBOXES") != "" {
t.Skip()
}

defer prepareFailpointCNI(t)()

t.Log("Init PodSandboxConfig with specific key")
sbName := t.Name()
labels := map[string]string{
sbName: "true",
}
sbConfig := PodSandboxConfig(sbName, "failpoint", WithPodLabels(labels))

t.Log("Inject CNI failpoint")
conf := &failpointConf{
// Delay 1 day
Add: "1*delay(86400000)",
}
injectCNIFailpoint(t, sbConfig, conf)

var wg sync.WaitGroup
wg.Add(1)

go func() {
defer wg.Done()
t.Log("Create a sandbox")
_, err := runtimeService.RunPodSandbox(sbConfig, failpointRuntimeHandler)
require.Error(t, err)
require.Contains(t, err.Error(), "transport is closing")
}()

assert.NoError(t, ensureCNIAddRunning(t, sbName), "check that failpoint CNI.Add is running")

// Use SIGKILL to prevent containerd server gracefulshutdown which may cause indeterministic invocation of defer functions
t.Log("Restart containerd")
RestartContainerd(t, syscall.SIGKILL)

wg.Wait()

t.Log("ListPodSandbox with the specific label")
l, err := runtimeService.ListPodSandbox(&criapiv1alpha2.PodSandboxFilter{LabelSelector: labels})
require.NoError(t, err)
require.Len(t, l, 1)

sb := l[0]

defer func() {
t.Log("Cleanup leaky sandbox")
err := runtimeService.StopPodSandbox(sb.Id)
assert.NoError(t, err)
err = runtimeService.RemovePodSandbox(sb.Id)
require.NoError(t, err)
}()

assert.Equal(t, sb.State, criapiv1alpha2.PodSandboxState_SANDBOX_NOTREADY)
assert.Equal(t, sb.Metadata.Name, sbConfig.Metadata.Name)
assert.Equal(t, sb.Metadata.Namespace, sbConfig.Metadata.Namespace)
assert.Equal(t, sb.Metadata.Uid, sbConfig.Metadata.Uid)
assert.Equal(t, sb.Metadata.Attempt, sbConfig.Metadata.Attempt)

t.Log("Get sandbox info")
_, info, err := SandboxInfo(sb.Id)
require.NoError(t, err)
require.False(t, info.NetNSClosed)

var netNS string
for _, n := range info.RuntimeSpec.Linux.Namespaces {
if n.Type == runtimespec.NetworkNamespace {
netNS = n.Path
}
}
assert.NotEmpty(t, netNS, "network namespace should be set")

t.Log("Get sandbox container")
c, err := GetContainer(sb.Id)
require.NoError(t, err)
any, ok := c.Extensions["io.cri-containerd.sandbox.metadata"]
require.True(t, ok, "sandbox metadata should exist in extension")
i, err := typeurl.UnmarshalAny(&any)
require.NoError(t, err)
require.IsType(t, &sandbox.Metadata{}, i)
metadata, ok := i.(*sandbox.Metadata)
require.True(t, ok)
assert.NotEmpty(t, metadata.NetNSPath)
assert.Equal(t, netNS, metadata.NetNSPath, "network namespace path should be the same in runtime spec and sandbox metadata")
}

func ensureCNIAddRunning(t *testing.T, sbName string) error {
return Eventually(func() (bool, error) {
pids, err := PidsOf(failpointCNIBinary)
if err != nil || len(pids) == 0 {
return false, err
}

for _, pid := range pids {
envs, err := PidEnvs(pid)
if err != nil {
t.Logf("failed to read environ of pid %v: %v: skip it", pid, err)
continue
}

args, ok := envs["CNI_ARGS"]
if !ok {
t.Logf("expected CNI_ARGS env but got nothing, skip pid=%v", pid)
continue
}

for _, arg := range strings.Split(args, ";") {
kv := strings.SplitN(arg, "=", 2)
if len(kv) != 2 {
continue
}

if kv[0] == "K8S_POD_NAME" && kv[1] == sbName {
return true, nil
}
}
}
return false, nil
}, time.Second, 30*time.Second)
}

// failpointConf is used to describe cmdAdd/cmdDel/cmdCheck command's failpoint.
type failpointConf struct {
Add string `json:"cmdAdd"`
Expand Down
24 changes: 1 addition & 23 deletions pkg/cri/server/container_update_resources_other.go
Expand Up @@ -20,13 +20,8 @@
package server

import (
"context"

"github.com/containerd/containerd"
"github.com/containerd/containerd/containers"
"github.com/containerd/typeurl"
runtimespec "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"golang.org/x/net/context"
runtime "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"

containerstore "github.com/containerd/containerd/pkg/cri/store/container"
Expand All @@ -48,20 +43,3 @@ func (c *criService) UpdateContainerResources(ctx context.Context, r *runtime.Up
}
return &runtime.UpdateContainerResourcesResponse{}, nil
}

// updateContainerSpec updates container spec.
// Copied from container_update_resources_linux.go because it only builds on Linux
// updateContainerSpec updates container spec.
func updateContainerSpec(ctx context.Context, cntr containerd.Container, spec *runtimespec.Spec) error {
any, err := typeurl.MarshalAny(spec)
if err != nil {
return errors.Wrapf(err, "failed to marshal spec %+v", spec)
}
if err := cntr.Update(ctx, func(ctx context.Context, client *containerd.Client, c *containers.Container) error {
c.Spec = any
return nil
}); err != nil {
return errors.Wrap(err, "failed to update container spec")
}
return nil
}

0 comments on commit 5fe264a

Please sign in to comment.