diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 5b2ce6fa433e..42243edf0356 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -724,6 +724,12 @@ const ( // Allows jobs to be created in the suspended state. SuspendJob featuregate.Feature = "SuspendJob" + // owner: @fromanirh + // alpha: v1.21 + // + // Enable POD resources API to return allocatable resources + KubeletPodResourcesGetAllocatable featuregate.Feature = "KubeletPodResourcesGetAllocatable" + // owner: @jayunit100 @abhiraut @rikatz // beta: v1.21 // @@ -838,6 +844,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS IngressClassNamespacedParams: {Default: false, PreRelease: featuregate.Alpha}, ServiceInternalTrafficPolicy: {Default: false, PreRelease: featuregate.Alpha}, SuspendJob: {Default: false, PreRelease: featuregate.Alpha}, + KubeletPodResourcesGetAllocatable: {Default: false, PreRelease: featuregate.Alpha}, NamespaceDefaultLabelName: {Default: true, PreRelease: featuregate.Beta}, // graduate to GA and lock to default in 1.22, remove in 1.24 // inherited features from generic apiserver, relisted here to get a conflict if it is changed diff --git a/pkg/kubelet/apis/podresources/server_v1.go b/pkg/kubelet/apis/podresources/server_v1.go index 8482f51a19db..bd29ed9fc53e 100644 --- a/pkg/kubelet/apis/podresources/server_v1.go +++ b/pkg/kubelet/apis/podresources/server_v1.go @@ -18,7 +18,10 @@ package podresources import ( "context" + "fmt" + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubelet/pkg/apis/podresources/v1" @@ -44,6 +47,7 @@ func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesP // List returns information about the resources assigned to pods on the node func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResourcesRequest) (*v1.ListPodResourcesResponse, error) { metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc() + metrics.PodResourcesEndpointRequestsListCount.WithLabelValues("v1").Inc() pods := p.podsProvider.GetPods() podResources := make([]*v1.PodResources, len(pods)) @@ -70,3 +74,21 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource PodResources: podResources, }, nil } + +// GetAllocatableResources returns information about all the resources known by the server - this more like the capacity, not like the current amount of free resources. +func (p *v1PodResourcesServer) GetAllocatableResources(ctx context.Context, req *v1.AllocatableResourcesRequest) (*v1.AllocatableResourcesResponse, error) { + metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc() + metrics.PodResourcesEndpointRequestsGetAllocatableCount.WithLabelValues("v1").Inc() + + if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesGetAllocatable) { + metrics.PodResourcesEndpointErrorsGetAllocatableCount.WithLabelValues("v1").Inc() + return nil, fmt.Errorf("Pod Resources API GetAllocatableResources disabled") + } + + metrics.PodResourcesEndpointRequestsTotalCount.WithLabelValues("v1").Inc() + + return &v1.AllocatableResourcesResponse{ + Devices: p.devicesProvider.GetAllocatableDevices(), + CpuIds: p.cpusProvider.GetAllocatableCPUs(), + }, nil +} diff --git a/pkg/kubelet/apis/podresources/server_v1_test.go b/pkg/kubelet/apis/podresources/server_v1_test.go index c25912ee3ba3..65b9bd8d60f6 100644 --- a/pkg/kubelet/apis/podresources/server_v1_test.go +++ b/pkg/kubelet/apis/podresources/server_v1_test.go @@ -18,12 +18,19 @@ package podresources import ( "context" + "reflect" + "sort" "testing" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" + pkgfeatures "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" ) func TestListPodResourcesV1(t *testing.T) { @@ -135,14 +142,335 @@ func TestListPodResourcesV1(t *testing.T) { m.On("GetDevices", string(podUID), containerName).Return(tc.devices) m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus) m.On("UpdateAllocatedDevices").Return() + m.On("GetAllocatableCPUs").Return(cpuset.CPUSet{}) + m.On("GetAllocatableDevices").Return(devicemanager.NewResourceDeviceInstances()) server := NewV1PodResourcesServer(m, m, m) resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{}) if err != nil { t.Errorf("want err = %v, got %q", nil, err) } - if tc.expectedResponse.String() != resp.String() { + if !equalListResponse(tc.expectedResponse, resp) { t.Errorf("want resp = %s, got %s", tc.expectedResponse.String(), resp.String()) } }) } } + +func TestAllocatableResources(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, pkgfeatures.KubeletPodResourcesGetAllocatable, true)() + + allDevs := []*podresourcesapi.ContainerDevices{ + { + ResourceName: "resource", + DeviceIds: []string{"dev0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource", + DeviceIds: []string{"dev1"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 1, + }, + }, + }, + }, + { + ResourceName: "resource-nt", + DeviceIds: []string{"devA"}, + }, + { + ResourceName: "resource-mm", + DeviceIds: []string{"devM0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource-mm", + DeviceIds: []string{"devMM"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + { + ID: 1, + }, + }, + }, + }, + } + + allCPUs := []int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} + + for _, tc := range []struct { + desc string + allCPUs []int64 + allDevices []*podresourcesapi.ContainerDevices + expectedAllocatableResourcesResponse *podresourcesapi.AllocatableResourcesResponse + }{ + { + desc: "no devices, no CPUs", + allCPUs: []int64{}, + allDevices: []*podresourcesapi.ContainerDevices{}, + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{}, + }, + { + desc: "no devices, all CPUs", + allCPUs: allCPUs, + allDevices: []*podresourcesapi.ContainerDevices{}, + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{ + CpuIds: allCPUs, + }, + }, + { + desc: "with devices, all CPUs", + allCPUs: allCPUs, + allDevices: allDevs, + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{ + CpuIds: allCPUs, + Devices: []*podresourcesapi.ContainerDevices{ + { + ResourceName: "resource", + DeviceIds: []string{"dev0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource", + DeviceIds: []string{"dev1"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 1, + }, + }, + }, + }, + { + ResourceName: "resource-nt", + DeviceIds: []string{"devA"}, + }, + { + ResourceName: "resource-mm", + DeviceIds: []string{"devM0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource-mm", + DeviceIds: []string{"devMM"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + { + ID: 1, + }, + }, + }, + }, + }, + }, + }, + { + desc: "with devices, no CPUs", + allCPUs: []int64{}, + allDevices: allDevs, + expectedAllocatableResourcesResponse: &podresourcesapi.AllocatableResourcesResponse{ + Devices: []*podresourcesapi.ContainerDevices{ + { + ResourceName: "resource", + DeviceIds: []string{"dev0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource", + DeviceIds: []string{"dev1"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 1, + }, + }, + }, + }, + { + ResourceName: "resource-nt", + DeviceIds: []string{"devA"}, + }, + { + ResourceName: "resource-mm", + DeviceIds: []string{"devM0"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + }, + }, + }, + { + ResourceName: "resource-mm", + DeviceIds: []string{"devMM"}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: 0, + }, + { + ID: 1, + }, + }, + }, + }, + }, + }, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + m := new(mockProvider) + m.On("GetDevices", "", "").Return([]*podresourcesapi.ContainerDevices{}) + m.On("GetCPUs", "", "").Return([]int64{}) + m.On("UpdateAllocatedDevices").Return() + m.On("GetAllocatableDevices").Return(tc.allDevices) + m.On("GetAllocatableCPUs").Return(tc.allCPUs) + server := NewV1PodResourcesServer(m, m, m) + + resp, err := server.GetAllocatableResources(context.TODO(), &podresourcesapi.AllocatableResourcesRequest{}) + if err != nil { + t.Errorf("want err = %v, got %q", nil, err) + } + + if !equalAllocatableResourcesResponse(tc.expectedAllocatableResourcesResponse, resp) { + t.Errorf("want resp = %s, got %s", tc.expectedAllocatableResourcesResponse.String(), resp.String()) + } + }) + } +} + +func equalListResponse(respA, respB *podresourcesapi.ListPodResourcesResponse) bool { + if len(respA.PodResources) != len(respB.PodResources) { + return false + } + for idx := 0; idx < len(respA.PodResources); idx++ { + podResA := respA.PodResources[idx] + podResB := respB.PodResources[idx] + if podResA.Name != podResB.Name { + return false + } + if podResA.Namespace != podResB.Namespace { + return false + } + if len(podResA.Containers) != len(podResB.Containers) { + return false + } + for jdx := 0; jdx < len(podResA.Containers); jdx++ { + cntA := podResA.Containers[jdx] + cntB := podResB.Containers[jdx] + + if cntA.Name != cntB.Name { + return false + } + if !equalInt64s(cntA.CpuIds, cntB.CpuIds) { + return false + } + + if !equalContainerDevices(cntA.Devices, cntB.Devices) { + return false + } + } + } + return true +} + +func equalContainerDevices(devA, devB []*podresourcesapi.ContainerDevices) bool { + if len(devA) != len(devB) { + return false + } + + for idx := 0; idx < len(devA); idx++ { + cntDevA := devA[idx] + cntDevB := devB[idx] + + if cntDevA.ResourceName != cntDevB.ResourceName { + return false + } + if !equalTopology(cntDevA.Topology, cntDevB.Topology) { + return false + } + if !equalStrings(cntDevA.DeviceIds, cntDevB.DeviceIds) { + return false + } + } + + return true +} + +func equalInt64s(a, b []int64) bool { + if len(a) != len(b) { + return false + } + aCopy := append([]int64{}, a...) + sort.Slice(aCopy, func(i, j int) bool { return aCopy[i] < aCopy[j] }) + bCopy := append([]int64{}, b...) + sort.Slice(bCopy, func(i, j int) bool { return bCopy[i] < bCopy[j] }) + return reflect.DeepEqual(aCopy, bCopy) +} + +func equalStrings(a, b []string) bool { + if len(a) != len(b) { + return false + } + aCopy := append([]string{}, a...) + sort.Strings(aCopy) + bCopy := append([]string{}, b...) + sort.Strings(bCopy) + return reflect.DeepEqual(aCopy, bCopy) +} + +func equalTopology(a, b *podresourcesapi.TopologyInfo) bool { + if a == nil && b != nil { + return false + } + if a != nil && b == nil { + return false + } + return reflect.DeepEqual(a, b) +} + +func equalAllocatableResourcesResponse(respA, respB *podresourcesapi.AllocatableResourcesResponse) bool { + if !equalInt64s(respA.CpuIds, respB.CpuIds) { + return false + } + return equalContainerDevices(respA.Devices, respB.Devices) +} diff --git a/pkg/kubelet/apis/podresources/server_v1alpha1_test.go b/pkg/kubelet/apis/podresources/server_v1alpha1_test.go index 5fe6f966e9da..4bd77130dcb7 100644 --- a/pkg/kubelet/apis/podresources/server_v1alpha1_test.go +++ b/pkg/kubelet/apis/podresources/server_v1alpha1_test.go @@ -52,6 +52,16 @@ func (m *mockProvider) UpdateAllocatedDevices() { m.Called() } +func (m *mockProvider) GetAllocatableDevices() []*podresourcesv1.ContainerDevices { + args := m.Called() + return args.Get(0).([]*podresourcesv1.ContainerDevices) +} + +func (m *mockProvider) GetAllocatableCPUs() []int64 { + args := m.Called() + return args.Get(0).([]int64) +} + func TestListPodResourcesV1alpha1(t *testing.T) { podName := "pod-name" podNamespace := "pod-namespace" diff --git a/pkg/kubelet/apis/podresources/types.go b/pkg/kubelet/apis/podresources/types.go index 433d92c59964..ebc068742c7f 100644 --- a/pkg/kubelet/apis/podresources/types.go +++ b/pkg/kubelet/apis/podresources/types.go @@ -23,8 +23,12 @@ import ( // DevicesProvider knows how to provide the devices used by the given container type DevicesProvider interface { - GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. UpdateAllocatedDevices() + // GetDevices returns information about the devices assigned to pods and containers + GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + // GetAllocatableDevices returns information about all the devices known to the manager + GetAllocatableDevices() []*podresourcesapi.ContainerDevices } // PodsProvider knows how to provide the pods admitted by the node @@ -34,5 +38,8 @@ type PodsProvider interface { // CPUsProvider knows how to provide the cpus used by the given container type CPUsProvider interface { + // GetCPUs returns information about the cpus assigned to pods and containers GetCPUs(podUID, containerName string) []int64 + // GetAllocatableCPUs returns the allocatable (not allocated) CPUs + GetAllocatableCPUs() []int64 } diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index e4a710947187..2296fe024e25 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -28,7 +28,9 @@ import ( internalapi "k8s.io/cri-api/pkg/apis" podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" @@ -103,12 +105,6 @@ type ContainerManager interface { // registration. GetPluginRegistrationHandler() cache.PluginHandler - // GetDevices returns information about the devices assigned to pods and containers - GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices - - // GetCPUs returns information about the cpus assigned to pods and containers - GetCPUs(podUID, containerName string) []int64 - // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // due to node recreation. ShouldResetExtendedResourceCapacity() bool @@ -116,8 +112,9 @@ type ContainerManager interface { // GetAllocateResourcesPodAdmitHandler returns an instance of a PodAdmitHandler responsible for allocating pod resources. GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler - // UpdateAllocatedDevices frees any Devices that are bound to terminated pods. - UpdateAllocatedDevices() + // Implements the podresources Provider API for CPUs and Devices + podresources.CPUsProvider + podresources.DevicesProvider } type NodeConfig struct { @@ -191,3 +188,39 @@ func ParseQOSReserved(m map[string]string) (*map[v1.ResourceName]int64, error) { } return &reservations, nil } + +func containerDevicesFromResourceDeviceInstances(devs devicemanager.ResourceDeviceInstances) []*podresourcesapi.ContainerDevices { + var respDevs []*podresourcesapi.ContainerDevices + + for resourceName, resourceDevs := range devs { + for devID, dev := range resourceDevs { + topo := dev.GetTopology() + if topo == nil { + // Some device plugin do not report the topology information. + // This is legal, so we report the devices anyway, + // let the client decide what to do. + respDevs = append(respDevs, &podresourcesapi.ContainerDevices{ + ResourceName: resourceName, + DeviceIds: []string{devID}, + }) + continue + } + + for _, node := range topo.GetNodes() { + respDevs = append(respDevs, &podresourcesapi.ContainerDevices{ + ResourceName: resourceName, + DeviceIds: []string{devID}, + Topology: &podresourcesapi.TopologyInfo{ + Nodes: []*podresourcesapi.NUMANode{ + { + ID: node.GetID(), + }, + }, + }, + }) + } + } + } + + return respDevs +} diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index e6e16ace516f..4d3f434ca4a2 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -1069,13 +1069,21 @@ func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceLi } func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { - return cm.deviceManager.GetDevices(podUID, containerName) + return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName)) +} + +func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices { + return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetAllocatableDevices()) } func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { return cm.cpuManager.GetCPUs(podUID, containerName).ToSliceNoSortInt64() } +func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 { + return cm.cpuManager.GetAllocatableCPUs().ToSliceNoSortInt64() +} + func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { return cm.deviceManager.ShouldResetExtendedResourceCapacity() } diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index ac4ceee2c56e..cda8c580d6bb 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -114,6 +114,10 @@ func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.Conta return nil } +func (cm *containerManagerStub) GetAllocatableDevices() []*podresourcesapi.ContainerDevices { + return nil +} + func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool { return cm.shouldResetExtendedResourceCapacity } @@ -130,6 +134,10 @@ func (cm *containerManagerStub) GetCPUs(_, _ string) []int64 { return nil } +func (cm *containerManagerStub) GetAllocatableCPUs() []int64 { + return nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{shouldResetExtendedResourceCapacity: false} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index c3d07f270c30..041d90f365ae 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -217,7 +217,11 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string { } func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { - return cm.deviceManager.GetDevices(podUID, containerName) + return containerDevicesFromResourceDeviceInstances(cm.deviceManager.GetDevices(podUID, containerName)) +} + +func (cm *containerManagerImpl) GetAllocatableDevices() []*podresourcesapi.ContainerDevices { + return nil } func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { @@ -235,3 +239,7 @@ func (cm *containerManagerImpl) UpdateAllocatedDevices() { func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 { return nil } + +func (cm *containerManagerImpl) GetAllocatableCPUs() []int64 { + return nil +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index c0d0f1aa69b9..673e627db080 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -85,6 +85,9 @@ type Manager interface { // and is consulted to achieve NUMA aware resource alignment per Pod // among this and other resource controllers. GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint + + // GetAllocatableCPUs returns the assignable (not allocated) CPUs + GetAllocatableCPUs() cpuset.CPUSet } type manager struct { @@ -124,6 +127,9 @@ type manager struct { // stateFileDirectory holds the directory where the state file for checkpoints is held. stateFileDirectory string + + // allocatableCPUs is the set of online CPUs as reported by the system + allocatableCPUs cpuset.CPUSet } var _ Manager = &manager{} @@ -150,6 +156,7 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo return nil, err } klog.Infof("[cpumanager] detected CPU topology: %v", topo) + reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU] if !ok { // The static policy cannot initialize without this information. @@ -210,6 +217,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe return err } + m.allocatableCPUs = m.policy.GetAllocatableCPUs(m.state) + if m.policy.Name() == string(PolicyNone) { return nil } @@ -296,6 +305,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager. return m.policy.GetPodTopologyHints(m.state, pod) } +func (m *manager) GetAllocatableCPUs() cpuset.CPUSet { + return m.allocatableCPUs.Clone() +} + type reconciledContainer struct { podName string containerName string diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 82f38262c39c..51c6ad99251b 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -120,6 +120,10 @@ func (p *mockPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string] return nil } +func (p *mockPolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet { + return cpuset.NewCPUSet() +} + type mockRuntimeService struct { err error } diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 478534b1c1c0..1c7fa8b54984 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -74,6 +74,11 @@ func (m *fakeManager) GetCPUs(podUID, containerName string) cpuset.CPUSet { return cpuset.CPUSet{} } +func (m *fakeManager) GetAllocatableCPUs() cpuset.CPUSet { + klog.Infof("[fake cpumanager] Get Allocatable Cpus") + return cpuset.CPUSet{} +} + // NewFakeManager creates empty/fake cpu manager func NewFakeManager() Manager { return &fakeManager{ diff --git a/pkg/kubelet/cm/cpumanager/policy.go b/pkg/kubelet/cm/cpumanager/policy.go index 54565e5023c0..dd5d977a1201 100644 --- a/pkg/kubelet/cm/cpumanager/policy.go +++ b/pkg/kubelet/cm/cpumanager/policy.go @@ -19,6 +19,7 @@ package cpumanager import ( "k8s.io/api/core/v1" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) @@ -38,4 +39,6 @@ type Policy interface { // and is consulted to achieve NUMA aware resource alignment per Pod // among this and other resource controllers. GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint + // GetAllocatableCPUs returns the assignable (not allocated) CPUs + GetAllocatableCPUs(m state.State) cpuset.CPUSet } diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index abc1c0632b14..5b8f094d2d66 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -20,6 +20,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" ) @@ -30,7 +31,7 @@ var _ Policy = &nonePolicy{} // PolicyNone name of none policy const PolicyNone policyName = "none" -// NewNonePolicy returns a cupset manager policy that does nothing +// NewNonePolicy returns a cpuset manager policy that does nothing func NewNonePolicy() Policy { return &nonePolicy{} } @@ -59,3 +60,12 @@ func (p *nonePolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v1. func (p *nonePolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[string][]topologymanager.TopologyHint { return nil } + +// Assignable CPUs are the ones that can be exclusively allocated to pods that meet the exclusivity requirement +// (ie guaranteed QoS class and integral CPU request). +// Assignability of CPUs as a concept is only applicable in case of static policy i.e. scenarios where workloads +// CAN get exclusive access to core(s). +// Hence, we return empty set here: no cpus are assignable according to above definition with this policy. +func (p *nonePolicy) GetAllocatableCPUs(m state.State) cpuset.CPUSet { + return cpuset.NewCPUSet() +} diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index 732bce2a4722..971279710962 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -65,3 +65,24 @@ func TestNonePolicyRemove(t *testing.T) { t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err) } } + +func TestNonePolicyGetAllocatableCPUs(t *testing.T) { + // any random topology is fine + + var cpuIDs []int + for cpuID := range topoSingleSocketHT.CPUDetails { + cpuIDs = append(cpuIDs, cpuID) + } + + policy := &nonePolicy{} + + st := &mockState{ + assignments: state.ContainerCPUAssignments{}, + defaultCPUSet: cpuset.NewCPUSet(cpuIDs...), + } + + cpus := policy.GetAllocatableCPUs(st) + if cpus.Size() != 0 { + t.Errorf("NonePolicy GetAllocatableCPUs() error. expected empty set, returned: %v", cpus) + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index c3309ef72805..f699c0d5c2c3 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -187,8 +187,8 @@ func (p *staticPolicy) validateState(s state.State) error { return nil } -// assignableCPUs returns the set of unassigned CPUs minus the reserved set. -func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { +// GetAllocatableCPUs returns the set of unassigned CPUs minus the reserved set. +func (p *staticPolicy) GetAllocatableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } @@ -258,14 +258,14 @@ func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerNa func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bitmask.BitMask, reusableCPUs cpuset.CPUSet) (cpuset.CPUSet, error) { klog.Infof("[cpumanager] allocateCpus: (numCPUs: %d, socket: %v)", numCPUs, numaAffinity) - assignableCPUs := p.assignableCPUs(s).Union(reusableCPUs) + allocatableCPUs := p.GetAllocatableCPUs(s).Union(reusableCPUs) // If there are aligned CPUs in numaAffinity, attempt to take those first. result := cpuset.NewCPUSet() if numaAffinity != nil { alignedCPUs := cpuset.NewCPUSet() for _, numaNodeID := range numaAffinity.GetBits() { - alignedCPUs = alignedCPUs.Union(assignableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) + alignedCPUs = alignedCPUs.Union(allocatableCPUs.Intersection(p.topology.CPUDetails.CPUsInNUMANodes(numaNodeID))) } numAlignedToAlloc := alignedCPUs.Size() @@ -282,7 +282,7 @@ func (p *staticPolicy) allocateCPUs(s state.State, numCPUs int, numaAffinity bit } // Get any remaining CPUs from what's leftover after attempting to grab aligned ones. - remainingCPUs, err := takeByTopology(p.topology, assignableCPUs.Difference(result), numCPUs-result.Size()) + remainingCPUs, err := takeByTopology(p.topology, allocatableCPUs.Difference(result), numCPUs-result.Size()) if err != nil { return cpuset.NewCPUSet(), err } @@ -368,7 +368,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod *v1.Pod, container *v } // Get a list of available CPUs. - available := p.assignableCPUs(s) + available := p.GetAllocatableCPUs(s) // Get a list of reusable CPUs (e.g. CPUs reused from initContainers). // It should be an empty CPUSet for a newly created pod. @@ -423,7 +423,7 @@ func (p *staticPolicy) GetPodTopologyHints(s state.State, pod *v1.Pod) map[strin } // Get a list of available CPUs. - available := p.assignableCPUs(s) + available := p.GetAllocatableCPUs(s) // Get a list of reusable CPUs (e.g. CPUs reused from initContainers). // It should be an empty CPUSet for a newly created pod. diff --git a/pkg/kubelet/cm/cpuset/cpuset.go b/pkg/kubelet/cm/cpuset/cpuset.go index ad7ea27afa43..7d892f85c512 100644 --- a/pkg/kubelet/cm/cpuset/cpuset.go +++ b/pkg/kubelet/cm/cpuset/cpuset.go @@ -75,6 +75,15 @@ func NewCPUSet(cpus ...int) CPUSet { return b.Result() } +// NewCPUSet returns a new CPUSet containing the supplied elements, as slice of int64. +func NewCPUSetInt64(cpus ...int64) CPUSet { + b := NewBuilder() + for _, c := range cpus { + b.Add(int(c)) + } + return b.Result() +} + // Size returns the number of elements in this set. func (s CPUSet) Size() int { return len(s.elems) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 445cf34ddb8e..a39763b9bda0 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -37,7 +37,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" utilfeature "k8s.io/apiserver/pkg/util/feature" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -85,8 +84,8 @@ type ManagerImpl struct { // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback - // allDevices is a map by resource name of all the devices currently registered to the device manager - allDevices map[string]map[string]pluginapi.Device + // allDevices holds all the devices currently registered to the device manager + allDevices ResourceDeviceInstances // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. healthyDevices map[string]sets.String @@ -152,7 +151,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi socketname: file, socketdir: dir, - allDevices: make(map[string]map[string]pluginapi.Device), + allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), @@ -1068,8 +1067,17 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool { return false } +// GetAllocatableDevices returns information about all the devices known to the manager +func (m *ManagerImpl) GetAllocatableDevices() ResourceDeviceInstances { + m.mutex.Lock() + resp := m.allDevices.Clone() + m.mutex.Unlock() + klog.V(4).Infof("known devices: %d", len(resp)) + return resp +} + // GetDevices returns the devices used by the specified container -func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { +func (m *ManagerImpl) GetDevices(podUID, containerName string) ResourceDeviceInstances { return m.podDevices.getContainerDevices(podUID, containerName) } diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index ea04f5a16c02..e6874f88d8ad 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -18,7 +18,6 @@ package devicemanager import ( v1 "k8s.io/api/core/v1" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -80,7 +79,12 @@ func (h *ManagerStub) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymana } // GetDevices returns nil -func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { +func (h *ManagerStub) GetDevices(_, _ string) ResourceDeviceInstances { + return nil +} + +// GetAllocatableDevices returns nothing +func (h *ManagerStub) GetAllocatableDevices() ResourceDeviceInstances { return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index a607f964689d..86f39eaa0169 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -622,7 +622,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso activePods: activePods, sourcesReady: &sourcesReadyStub{}, checkpointManager: ckm, - allDevices: make(map[string]map[string]pluginapi.Device), + allDevices: NewResourceDeviceInstances(), } for _, res := range testRes { diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 8e20eb7bb7bd..059684e9fa82 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -23,7 +23,6 @@ import ( "k8s.io/apimachinery/pkg/util/sets" pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -324,7 +323,7 @@ func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *Devi } // getContainerDevices returns the devices assigned to the provided container for all ResourceNames -func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices { +func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDeviceInstances { pdev.RLock() defer pdev.RUnlock() @@ -334,15 +333,51 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresou if _, contExists := pdev.devs[podUID][contName]; !contExists { return nil } - cDev := []*podresourcesapi.ContainerDevices{} + resDev := NewResourceDeviceInstances() for resource, allocateInfo := range pdev.devs[podUID][contName] { + if len(allocateInfo.deviceIds) == 0 { + continue + } + devicePluginMap := make(map[string]pluginapi.Device) for numaid, devlist := range allocateInfo.deviceIds { - cDev = append(cDev, &podresourcesapi.ContainerDevices{ - ResourceName: resource, - DeviceIds: devlist, - Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaid}}}, - }) + for _, devId := range devlist { + NUMANodes := []*pluginapi.NUMANode{{ID: numaid}} + if pDev, ok := devicePluginMap[devId]; ok && pDev.Topology != nil { + if nodes := pDev.Topology.GetNodes(); nodes != nil { + NUMANodes = append(NUMANodes, nodes...) + } + } + + devicePluginMap[devId] = pluginapi.Device{ + // ID and Healthy are not relevant here. + Topology: &pluginapi.TopologyInfo{ + Nodes: NUMANodes, + }, + } + } + } + resDev[resource] = devicePluginMap + } + return resDev +} + +// DeviceInstances is a mapping device name -> plugin device data +type DeviceInstances map[string]pluginapi.Device + +// ResourceDeviceInstances is a mapping resource name -> DeviceInstances +type ResourceDeviceInstances map[string]DeviceInstances + +func NewResourceDeviceInstances() ResourceDeviceInstances { + return make(ResourceDeviceInstances) +} + +func (rdev ResourceDeviceInstances) Clone() ResourceDeviceInstances { + clone := NewResourceDeviceInstances() + for resourceName, resourceDevs := range rdev { + clone[resourceName] = make(map[string]pluginapi.Device) + for devID, dev := range resourceDevs { + clone[resourceName][devID] = dev } } - return cDev + return clone } diff --git a/pkg/kubelet/cm/devicemanager/pod_devices_test.go b/pkg/kubelet/cm/devicemanager/pod_devices_test.go index b2ff8376a73b..72264c467f6e 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices_test.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices_test.go @@ -35,13 +35,18 @@ func TestGetContainerDevices(t *testing.T) { devices, constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) - contDevices := podDevices.getContainerDevices(podID, contID) - require.Equal(t, len(devices), len(contDevices), "Incorrect container devices") - for _, contDev := range contDevices { - for _, node := range contDev.Topology.Nodes { + resContDevices := podDevices.getContainerDevices(podID, contID) + contDevices, ok := resContDevices[resourceName1] + require.True(t, ok, "resource %q not present", resourceName1) + + for devId, plugInfo := range contDevices { + nodes := plugInfo.GetTopology().GetNodes() + require.Equal(t, len(nodes), len(devices), "Incorrect container devices: %v - %v (nodes %v)", devices, contDevices, nodes) + + for _, node := range plugInfo.GetTopology().GetNodes() { dev, ok := devices[node.ID] require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID) - require.Equal(t, contDev.DeviceIds[0], dev[0], "Can't find device %s in result", dev[0]) + require.Equal(t, devId, dev[0], "Can't find device %s in result", dev[0]) } } } diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index f0e43c1a9b9a..da9910fba874 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -56,7 +56,7 @@ func TestGetTopologyHints(t *testing.T) { for _, tc := range tcases { m := ManagerImpl{ - allDevices: make(map[string]map[string]pluginapi.Device), + allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: newPodDevices(), @@ -66,7 +66,7 @@ func TestGetTopologyHints(t *testing.T) { } for r := range tc.devices { - m.allDevices[r] = make(map[string]pluginapi.Device) + m.allDevices[r] = make(DeviceInstances) m.healthyDevices[r] = sets.NewString() for _, d := range tc.devices[r] { @@ -409,7 +409,7 @@ func TestTopologyAlignedAllocation(t *testing.T) { } for _, tc := range tcases { m := ManagerImpl{ - allDevices: make(map[string]map[string]pluginapi.Device), + allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpointInfo), @@ -419,7 +419,7 @@ func TestTopologyAlignedAllocation(t *testing.T) { topologyAffinityStore: &mockAffinityStore{tc.hint}, } - m.allDevices[tc.resource] = make(map[string]pluginapi.Device) + m.allDevices[tc.resource] = make(DeviceInstances) m.healthyDevices[tc.resource] = sets.NewString() m.endpoints[tc.resource] = endpointInfo{} @@ -598,7 +598,7 @@ func TestGetPreferredAllocationParameters(t *testing.T) { } for _, tc := range tcases { m := ManagerImpl{ - allDevices: make(map[string]map[string]pluginapi.Device), + allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpointInfo), @@ -608,7 +608,7 @@ func TestGetPreferredAllocationParameters(t *testing.T) { topologyAffinityStore: &mockAffinityStore{tc.hint}, } - m.allDevices[tc.resource] = make(map[string]pluginapi.Device) + m.allDevices[tc.resource] = make(DeviceInstances) m.healthyDevices[tc.resource] = sets.NewString() for _, d := range tc.allDevices { m.allDevices[tc.resource][d.ID] = d @@ -920,7 +920,7 @@ func TestGetPodTopologyHints(t *testing.T) { for _, tc := range tcases { m := ManagerImpl{ - allDevices: make(map[string]map[string]pluginapi.Device), + allDevices: NewResourceDeviceInstances(), healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: newPodDevices(), @@ -930,7 +930,7 @@ func TestGetPodTopologyHints(t *testing.T) { } for r := range tc.devices { - m.allDevices[r] = make(map[string]pluginapi.Device) + m.allDevices[r] = make(DeviceInstances) m.healthyDevices[r] = sets.NewString() for _, d := range tc.devices[r] { diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 779d91e3df12..b46c5e1cce9f 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -20,7 +20,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -60,7 +59,10 @@ type Manager interface { GetWatcherHandler() cache.PluginHandler // GetDevices returns information about the devices assigned to pods and containers - GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + GetDevices(podUID, containerName string) ResourceDeviceInstances + + // GetAllocatableDevices returns information about all the devices known to the manager + GetAllocatableDevices() ResourceDeviceInstances // ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not, // depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates diff --git a/pkg/kubelet/cm/fake_container_manager.go b/pkg/kubelet/cm/fake_container_manager.go index 027fffc7e72a..a05b5fce114f 100644 --- a/pkg/kubelet/cm/fake_container_manager.go +++ b/pkg/kubelet/cm/fake_container_manager.go @@ -174,6 +174,13 @@ func (cm *FakeContainerManager) GetDevices(_, _ string) []*podresourcesapi.Conta return nil } +func (cm *FakeContainerManager) GetAllocatableDevices() []*podresourcesapi.ContainerDevices { + cm.Lock() + defer cm.Unlock() + cm.CalledFunctions = append(cm.CalledFunctions, "GetAllocatableDevices") + return nil +} + func (cm *FakeContainerManager) ShouldResetExtendedResourceCapacity() bool { cm.Lock() defer cm.Unlock() @@ -201,3 +208,9 @@ func (cm *FakeContainerManager) GetCPUs(_, _ string) []int64 { cm.CalledFunctions = append(cm.CalledFunctions, "GetCPUs") return nil } + +func (cm *FakeContainerManager) GetAllocatableCPUs() []int64 { + cm.Lock() + defer cm.Unlock() + return nil +} diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 61269fedd400..6fb40e1875ba 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -63,7 +63,11 @@ const ( DevicePluginRegistrationCountKey = "device_plugin_registration_total" DevicePluginAllocationDurationKey = "device_plugin_alloc_duration_seconds" // Metrics keys of pod resources operations - PodResourcesEndpointRequestsTotalKey = "pod_resources_endpoint_requests_total" + PodResourcesEndpointRequestsTotalKey = "pod_resources_endpoint_requests_total" + PodResourcesEndpointRequestsListKey = "pod_resources_endpoint_requests_list" + PodResourcesEndpointRequestsGetAllocatableKey = "pod_resources_endpoint_requests_get_allocatable" + PodResourcesEndpointErrorsListKey = "pod_resources_endpoint_errors_list" + PodResourcesEndpointErrorsGetAllocatableKey = "pod_resources_endpoint_errors_get_allocatable" // Metric keys for node config AssignedConfigKey = "node_config_assigned" @@ -293,6 +297,54 @@ var ( []string{"server_api_version"}, ) + // PodResourcesEndpointRequestsListCount is a Counter that tracks the number of requests to the PodResource List() endpoint. + // Broken down by server API version. + PodResourcesEndpointRequestsListCount = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: PodResourcesEndpointRequestsListKey, + Help: "Number of requests to the PodResource List endpoint. Broken down by server api version.", + StabilityLevel: metrics.ALPHA, + }, + []string{"server_api_version"}, + ) + + // PodResourcesEndpointRequestsGetAllocatableCount is a Counter that tracks the number of requests to the PodResource GetAllocatableResources() endpoint. + // Broken down by server API version. + PodResourcesEndpointRequestsGetAllocatableCount = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: PodResourcesEndpointRequestsGetAllocatableKey, + Help: "Number of requests to the PodResource GetAllocatableResources endpoint. Broken down by server api version.", + StabilityLevel: metrics.ALPHA, + }, + []string{"server_api_version"}, + ) + + // PodResourcesEndpointErrorsListCount is a Counter that tracks the number of errors returned by he PodResource List() endpoint. + // Broken down by server API version. + PodResourcesEndpointErrorsListCount = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: PodResourcesEndpointErrorsListKey, + Help: "Number of requests to the PodResource List endpoint which returned error. Broken down by server api version.", + StabilityLevel: metrics.ALPHA, + }, + []string{"server_api_version"}, + ) + + // PodResourcesEndpointErrorsGetAllocatableCount is a Counter that tracks the number of errors returned by the PodResource GetAllocatableResources() endpoint. + // Broken down by server API version. + PodResourcesEndpointErrorsGetAllocatableCount = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: KubeletSubsystem, + Name: PodResourcesEndpointErrorsGetAllocatableKey, + Help: "Number of requests to the PodResource GetAllocatableResources endpoint which returned error. Broken down by server api version.", + StabilityLevel: metrics.ALPHA, + }, + []string{"server_api_version"}, + ) + // Metrics for node config // AssignedConfig is a Gauge that is set 1 if the Kubelet has a NodeConfig assigned. diff --git a/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.pb.go b/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.pb.go index fc3e707a7aae..d8d15eb2f726 100644 --- a/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.pb.go +++ b/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.pb.go @@ -45,6 +45,97 @@ var _ = math.Inf // proto package needs to be updated. const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package +type AllocatableResourcesRequest struct { + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AllocatableResourcesRequest) Reset() { *m = AllocatableResourcesRequest{} } +func (*AllocatableResourcesRequest) ProtoMessage() {} +func (*AllocatableResourcesRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{0} +} +func (m *AllocatableResourcesRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AllocatableResourcesRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AllocatableResourcesRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AllocatableResourcesRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_AllocatableResourcesRequest.Merge(m, src) +} +func (m *AllocatableResourcesRequest) XXX_Size() int { + return m.Size() +} +func (m *AllocatableResourcesRequest) XXX_DiscardUnknown() { + xxx_messageInfo_AllocatableResourcesRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_AllocatableResourcesRequest proto.InternalMessageInfo + +// AllocatableResourcesResponses contains informations about all the devices known by the kubelet +type AllocatableResourcesResponse struct { + Devices []*ContainerDevices `protobuf:"bytes,1,rep,name=devices,proto3" json:"devices,omitempty"` + CpuIds []int64 `protobuf:"varint,2,rep,packed,name=cpu_ids,json=cpuIds,proto3" json:"cpu_ids,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *AllocatableResourcesResponse) Reset() { *m = AllocatableResourcesResponse{} } +func (*AllocatableResourcesResponse) ProtoMessage() {} +func (*AllocatableResourcesResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_00212fb1f9d3bf1c, []int{1} +} +func (m *AllocatableResourcesResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *AllocatableResourcesResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_AllocatableResourcesResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *AllocatableResourcesResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_AllocatableResourcesResponse.Merge(m, src) +} +func (m *AllocatableResourcesResponse) XXX_Size() int { + return m.Size() +} +func (m *AllocatableResourcesResponse) XXX_DiscardUnknown() { + xxx_messageInfo_AllocatableResourcesResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_AllocatableResourcesResponse proto.InternalMessageInfo + +func (m *AllocatableResourcesResponse) GetDevices() []*ContainerDevices { + if m != nil { + return m.Devices + } + return nil +} + +func (m *AllocatableResourcesResponse) GetCpuIds() []int64 { + if m != nil { + return m.CpuIds + } + return nil +} + // ListPodResourcesRequest is the request made to the PodResourcesLister service type ListPodResourcesRequest struct { XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -54,7 +145,7 @@ type ListPodResourcesRequest struct { func (m *ListPodResourcesRequest) Reset() { *m = ListPodResourcesRequest{} } func (*ListPodResourcesRequest) ProtoMessage() {} func (*ListPodResourcesRequest) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{0} + return fileDescriptor_00212fb1f9d3bf1c, []int{2} } func (m *ListPodResourcesRequest) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -93,7 +184,7 @@ type ListPodResourcesResponse struct { func (m *ListPodResourcesResponse) Reset() { *m = ListPodResourcesResponse{} } func (*ListPodResourcesResponse) ProtoMessage() {} func (*ListPodResourcesResponse) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{1} + return fileDescriptor_00212fb1f9d3bf1c, []int{3} } func (m *ListPodResourcesResponse) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -141,7 +232,7 @@ type PodResources struct { func (m *PodResources) Reset() { *m = PodResources{} } func (*PodResources) ProtoMessage() {} func (*PodResources) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{2} + return fileDescriptor_00212fb1f9d3bf1c, []int{4} } func (m *PodResources) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -203,7 +294,7 @@ type ContainerResources struct { func (m *ContainerResources) Reset() { *m = ContainerResources{} } func (*ContainerResources) ProtoMessage() {} func (*ContainerResources) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{3} + return fileDescriptor_00212fb1f9d3bf1c, []int{5} } func (m *ContainerResources) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -265,7 +356,7 @@ type ContainerDevices struct { func (m *ContainerDevices) Reset() { *m = ContainerDevices{} } func (*ContainerDevices) ProtoMessage() {} func (*ContainerDevices) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{4} + return fileDescriptor_00212fb1f9d3bf1c, []int{6} } func (m *ContainerDevices) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -325,7 +416,7 @@ type TopologyInfo struct { func (m *TopologyInfo) Reset() { *m = TopologyInfo{} } func (*TopologyInfo) ProtoMessage() {} func (*TopologyInfo) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{5} + return fileDescriptor_00212fb1f9d3bf1c, []int{7} } func (m *TopologyInfo) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -371,7 +462,7 @@ type NUMANode struct { func (m *NUMANode) Reset() { *m = NUMANode{} } func (*NUMANode) ProtoMessage() {} func (*NUMANode) Descriptor() ([]byte, []int) { - return fileDescriptor_00212fb1f9d3bf1c, []int{6} + return fileDescriptor_00212fb1f9d3bf1c, []int{8} } func (m *NUMANode) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -408,6 +499,8 @@ func (m *NUMANode) GetID() int64 { } func init() { + proto.RegisterType((*AllocatableResourcesRequest)(nil), "v1.AllocatableResourcesRequest") + proto.RegisterType((*AllocatableResourcesResponse)(nil), "v1.AllocatableResourcesResponse") proto.RegisterType((*ListPodResourcesRequest)(nil), "v1.ListPodResourcesRequest") proto.RegisterType((*ListPodResourcesResponse)(nil), "v1.ListPodResourcesResponse") proto.RegisterType((*PodResources)(nil), "v1.PodResources") @@ -420,34 +513,37 @@ func init() { func init() { proto.RegisterFile("api.proto", fileDescriptor_00212fb1f9d3bf1c) } var fileDescriptor_00212fb1f9d3bf1c = []byte{ - // 424 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x52, 0xc1, 0x6e, 0xd3, 0x40, - 0x10, 0xcd, 0xda, 0xa5, 0xad, 0x07, 0x17, 0x55, 0x2b, 0x44, 0x4d, 0x08, 0x56, 0xb4, 0x5c, 0x7a, - 0x00, 0x57, 0x0d, 0x82, 0x3b, 0x34, 0x17, 0x4b, 0x10, 0xc1, 0x0a, 0x0e, 0x9c, 0x22, 0xc7, 0xbb, - 0x35, 0x96, 0xa8, 0x67, 0xeb, 0xb5, 0x23, 0xb8, 0x71, 0xe0, 0x03, 0xf8, 0xac, 0x1e, 0x39, 0x72, - 0xa4, 0xe6, 0x47, 0xd0, 0xae, 0x71, 0xe3, 0x90, 0x70, 0xf2, 0xcc, 0x7b, 0x33, 0xef, 0x8d, 0x77, - 0x06, 0xbc, 0x44, 0xe5, 0x91, 0x2a, 0xb1, 0x42, 0xea, 0x2c, 0x4f, 0x87, 0x4f, 0xb2, 0xbc, 0xfa, - 0x58, 0x2f, 0xa2, 0x14, 0x2f, 0x4e, 0x32, 0xcc, 0xf0, 0xc4, 0x52, 0x8b, 0xfa, 0xdc, 0x66, 0x36, - 0xb1, 0x51, 0xdb, 0xc2, 0xee, 0xc3, 0xd1, 0xab, 0x5c, 0x57, 0x6f, 0x50, 0x70, 0xa9, 0xb1, 0x2e, - 0x53, 0xa9, 0xb9, 0xbc, 0xac, 0xa5, 0xae, 0xd8, 0x5b, 0x08, 0x36, 0x29, 0xad, 0xb0, 0xd0, 0x92, - 0x3e, 0x83, 0x03, 0x85, 0x62, 0x5e, 0x76, 0x44, 0x40, 0xc6, 0xee, 0xf1, 0xed, 0xc9, 0x61, 0xb4, - 0x3c, 0x8d, 0xd6, 0x1a, 0x7c, 0xd5, 0xcb, 0xd8, 0x67, 0xf0, 0xfb, 0x2c, 0xa5, 0xb0, 0x53, 0x24, - 0x17, 0x32, 0x20, 0x63, 0x72, 0xec, 0x71, 0x1b, 0xd3, 0x11, 0x78, 0xe6, 0xab, 0x55, 0x92, 0xca, - 0xc0, 0xb1, 0xc4, 0x0a, 0xa0, 0xcf, 0x01, 0x52, 0x2c, 0xaa, 0x24, 0x2f, 0x64, 0xa9, 0x03, 0xd7, - 0xba, 0xde, 0x33, 0xae, 0x67, 0x1d, 0xba, 0xf2, 0xee, 0x55, 0xb2, 0x4b, 0xa0, 0x9b, 0x15, 0x5b, - 0xfd, 0x23, 0xd8, 0x13, 0x72, 0x99, 0x9b, 0x9f, 0x72, 0xac, 0xfc, 0xdd, 0x35, 0xf9, 0x69, 0xcb, - 0xf1, 0xae, 0x88, 0x1e, 0xc1, 0x5e, 0xaa, 0xea, 0x79, 0x2e, 0xda, 0x71, 0x5c, 0xbe, 0x9b, 0xaa, - 0x3a, 0x16, 0x9a, 0x7d, 0x23, 0x70, 0xf8, 0x6f, 0x1b, 0x7d, 0x04, 0x07, 0xdd, 0xa3, 0xcd, 0x7b, - 0xd6, 0x7e, 0x07, 0xce, 0xcc, 0x08, 0x0f, 0x01, 0x5a, 0x75, 0xab, 0x6a, 0xa6, 0xf0, 0xb8, 0xd7, - 0x22, 0xb1, 0xd0, 0xf4, 0x31, 0xec, 0x57, 0xa8, 0xf0, 0x13, 0x66, 0x5f, 0x02, 0x77, 0x4c, 0xba, - 0x77, 0x7f, 0xf7, 0x17, 0x8b, 0x8b, 0x73, 0xe4, 0x37, 0x15, 0x6c, 0x02, 0x7e, 0x9f, 0xa1, 0x0c, - 0x6e, 0x15, 0x28, 0x6e, 0x56, 0xe6, 0x9b, 0xd6, 0xd9, 0xfb, 0xd7, 0x2f, 0x66, 0x28, 0x24, 0x6f, - 0x29, 0x36, 0x84, 0xfd, 0x0e, 0xa2, 0x77, 0xc0, 0x89, 0xa7, 0x76, 0x4c, 0x97, 0x3b, 0xf1, 0x74, - 0xf2, 0x01, 0x68, 0x7f, 0x87, 0xe6, 0x44, 0x64, 0x49, 0xcf, 0x60, 0xc7, 0x44, 0xf4, 0x81, 0x91, - 0xfb, 0xcf, 0x45, 0x0d, 0x47, 0xdb, 0xc9, 0xf6, 0xa6, 0xd8, 0xe0, 0xe5, 0xe8, 0xea, 0x3a, 0x24, - 0x3f, 0xaf, 0xc3, 0xc1, 0xd7, 0x26, 0x24, 0x57, 0x4d, 0x48, 0x7e, 0x34, 0x21, 0xf9, 0xd5, 0x84, - 0xe4, 0xfb, 0xef, 0x70, 0xb0, 0xd8, 0xb5, 0x17, 0xfb, 0xf4, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, - 0x1f, 0x52, 0x67, 0x23, 0xf1, 0x02, 0x00, 0x00, + // 480 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x93, 0xcd, 0x6e, 0xd3, 0x40, + 0x10, 0x80, 0xb3, 0x76, 0x69, 0x9b, 0xc1, 0x45, 0xd5, 0x0a, 0x11, 0x93, 0xa6, 0xc6, 0x5a, 0x2e, + 0x39, 0x80, 0xab, 0x06, 0xc1, 0xbd, 0x34, 0x12, 0xb2, 0x04, 0x11, 0xac, 0xe0, 0x4a, 0xe4, 0xd8, + 0x5b, 0x63, 0x29, 0xf5, 0x6c, 0xbd, 0x76, 0x04, 0x37, 0x0e, 0x3c, 0x00, 0xaf, 0xc3, 0x1b, 0xf4, + 0xc8, 0x91, 0x23, 0x0d, 0x2f, 0x82, 0xbc, 0x8e, 0x53, 0x87, 0xa4, 0x48, 0x3d, 0x79, 0x66, 0xbe, + 0xf9, 0xf3, 0xcc, 0x2c, 0xb4, 0x03, 0x99, 0x78, 0x32, 0xc3, 0x1c, 0xa9, 0x31, 0x3b, 0xee, 0x3e, + 0x8d, 0x93, 0xfc, 0x53, 0x31, 0xf1, 0x42, 0x3c, 0x3f, 0x8a, 0x31, 0xc6, 0x23, 0x8d, 0x26, 0xc5, + 0x99, 0xd6, 0xb4, 0xa2, 0xa5, 0x2a, 0x84, 0x1d, 0xc2, 0xc1, 0xc9, 0x74, 0x8a, 0x61, 0x90, 0x07, + 0x93, 0xa9, 0xe0, 0x42, 0x61, 0x91, 0x85, 0x42, 0x71, 0x71, 0x51, 0x08, 0x95, 0xb3, 0x18, 0x7a, + 0x9b, 0xb1, 0x92, 0x98, 0x2a, 0x41, 0x3d, 0xd8, 0x89, 0xc4, 0x2c, 0x09, 0x85, 0xb2, 0x89, 0x6b, + 0xf6, 0xef, 0x0e, 0xee, 0x7b, 0xb3, 0x63, 0xef, 0x14, 0xd3, 0x3c, 0x48, 0x52, 0x91, 0x0d, 0x2b, + 0xc6, 0x6b, 0x27, 0xda, 0x81, 0x9d, 0x50, 0x16, 0xe3, 0x24, 0x52, 0xb6, 0xe1, 0x9a, 0x7d, 0x93, + 0x6f, 0x87, 0xb2, 0xf0, 0x23, 0xc5, 0x1e, 0x42, 0xe7, 0x75, 0xa2, 0xf2, 0xb7, 0x18, 0xad, 0xf5, + 0xf0, 0x0e, 0xec, 0x75, 0xb4, 0xa8, 0xff, 0x1c, 0xf6, 0x24, 0x46, 0xe3, 0xac, 0x06, 0x8b, 0x2e, + 0xf6, 0xcb, 0x2e, 0x56, 0x02, 0x2c, 0xd9, 0xd0, 0xd8, 0x67, 0xb0, 0x9a, 0x94, 0x52, 0xd8, 0x4a, + 0x83, 0x73, 0x61, 0x13, 0x97, 0xf4, 0xdb, 0x5c, 0xcb, 0xb4, 0x07, 0xed, 0xf2, 0xab, 0x64, 0x10, + 0x0a, 0xdb, 0xd0, 0xe0, 0xda, 0x40, 0x5f, 0x00, 0x84, 0xf5, 0x5f, 0x2a, 0xdb, 0xd4, 0x55, 0x1f, + 0xac, 0xfc, 0xfb, 0x75, 0xed, 0x86, 0x27, 0xbb, 0x00, 0xba, 0xee, 0xb1, 0xb1, 0x7e, 0x63, 0xb4, + 0xc6, 0x2d, 0x47, 0x6b, 0xae, 0x8c, 0xf6, 0x1b, 0x81, 0xfd, 0x7f, 0xc3, 0xe8, 0x63, 0xd8, 0xab, + 0x87, 0x36, 0x6e, 0x94, 0xb6, 0x6a, 0xe3, 0xa8, 0x6c, 0xe1, 0x10, 0xa0, 0xca, 0xbe, 0x5c, 0x58, + 0x9b, 0xb7, 0x2b, 0x8b, 0x1f, 0x29, 0xfa, 0x04, 0x76, 0x73, 0x94, 0x38, 0xc5, 0xf8, 0x8b, 0x6d, + 0xba, 0xa4, 0x9e, 0xfb, 0xfb, 0x85, 0xcd, 0x4f, 0xcf, 0x90, 0x2f, 0x3d, 0xd8, 0x00, 0xac, 0x26, + 0xa1, 0x0c, 0xee, 0xa4, 0x18, 0x2d, 0x57, 0x66, 0x95, 0xa1, 0xa3, 0x0f, 0x6f, 0x4e, 0x46, 0x18, + 0x09, 0x5e, 0x21, 0xd6, 0x85, 0xdd, 0xda, 0x44, 0xef, 0x81, 0xe1, 0x0f, 0x75, 0x9b, 0x26, 0x37, + 0xfc, 0xe1, 0xe0, 0x07, 0x01, 0xda, 0x5c, 0x62, 0x79, 0x23, 0x22, 0xa3, 0xa7, 0xb0, 0x55, 0x4a, + 0xf4, 0xa0, 0xcc, 0x77, 0xc3, 0x49, 0x75, 0x7b, 0x9b, 0x61, 0x75, 0x54, 0xac, 0x45, 0x3f, 0x42, + 0xe7, 0x95, 0xc8, 0x37, 0x5d, 0x3e, 0x7d, 0x54, 0x86, 0xfe, 0xe7, 0xc9, 0x74, 0xdd, 0x9b, 0x1d, + 0xea, 0xfc, 0x2f, 0x7b, 0x97, 0x57, 0x0e, 0xf9, 0x75, 0xe5, 0xb4, 0xbe, 0xce, 0x1d, 0x72, 0x39, + 0x77, 0xc8, 0xcf, 0xb9, 0x43, 0x7e, 0xcf, 0x1d, 0xf2, 0xfd, 0x8f, 0xd3, 0x9a, 0x6c, 0xeb, 0xa7, + 0xf9, 0xec, 0x6f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x70, 0xd4, 0x4f, 0xda, 0x03, 0x00, 0x00, } // Reference imports to suppress errors if they are not otherwise used. @@ -463,6 +559,7 @@ const _ = grpc.SupportPackageIsVersion4 // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. type PodResourcesListerClient interface { List(ctx context.Context, in *ListPodResourcesRequest, opts ...grpc.CallOption) (*ListPodResourcesResponse, error) + GetAllocatableResources(ctx context.Context, in *AllocatableResourcesRequest, opts ...grpc.CallOption) (*AllocatableResourcesResponse, error) } type podResourcesListerClient struct { @@ -482,9 +579,19 @@ func (c *podResourcesListerClient) List(ctx context.Context, in *ListPodResource return out, nil } +func (c *podResourcesListerClient) GetAllocatableResources(ctx context.Context, in *AllocatableResourcesRequest, opts ...grpc.CallOption) (*AllocatableResourcesResponse, error) { + out := new(AllocatableResourcesResponse) + err := c.cc.Invoke(ctx, "/v1.PodResourcesLister/GetAllocatableResources", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + // PodResourcesListerServer is the server API for PodResourcesLister service. type PodResourcesListerServer interface { List(context.Context, *ListPodResourcesRequest) (*ListPodResourcesResponse, error) + GetAllocatableResources(context.Context, *AllocatableResourcesRequest) (*AllocatableResourcesResponse, error) } // UnimplementedPodResourcesListerServer can be embedded to have forward compatible implementations. @@ -494,6 +601,9 @@ type UnimplementedPodResourcesListerServer struct { func (*UnimplementedPodResourcesListerServer) List(ctx context.Context, req *ListPodResourcesRequest) (*ListPodResourcesResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method List not implemented") } +func (*UnimplementedPodResourcesListerServer) GetAllocatableResources(ctx context.Context, req *AllocatableResourcesRequest) (*AllocatableResourcesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetAllocatableResources not implemented") +} func RegisterPodResourcesListerServer(s *grpc.Server, srv PodResourcesListerServer) { s.RegisterService(&_PodResourcesLister_serviceDesc, srv) @@ -517,6 +627,24 @@ func _PodResourcesLister_List_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _PodResourcesLister_GetAllocatableResources_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(AllocatableResourcesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(PodResourcesListerServer).GetAllocatableResources(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/v1.PodResourcesLister/GetAllocatableResources", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(PodResourcesListerServer).GetAllocatableResources(ctx, req.(*AllocatableResourcesRequest)) + } + return interceptor(ctx, in, info, handler) +} + var _PodResourcesLister_serviceDesc = grpc.ServiceDesc{ ServiceName: "v1.PodResourcesLister", HandlerType: (*PodResourcesListerServer)(nil), @@ -525,11 +653,94 @@ var _PodResourcesLister_serviceDesc = grpc.ServiceDesc{ MethodName: "List", Handler: _PodResourcesLister_List_Handler, }, + { + MethodName: "GetAllocatableResources", + Handler: _PodResourcesLister_GetAllocatableResources_Handler, + }, }, Streams: []grpc.StreamDesc{}, Metadata: "api.proto", } +func (m *AllocatableResourcesRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AllocatableResourcesRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AllocatableResourcesRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + return len(dAtA) - i, nil +} + +func (m *AllocatableResourcesResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *AllocatableResourcesResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *AllocatableResourcesResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if len(m.CpuIds) > 0 { + dAtA2 := make([]byte, len(m.CpuIds)*10) + var j1 int + for _, num1 := range m.CpuIds { + num := uint64(num1) + for num >= 1<<7 { + dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + num >>= 7 + j1++ + } + dAtA2[j1] = uint8(num) + j1++ + } + i -= j1 + copy(dAtA[i:], dAtA2[:j1]) + i = encodeVarintApi(dAtA, i, uint64(j1)) + i-- + dAtA[i] = 0x12 + } + if len(m.Devices) > 0 { + for iNdEx := len(m.Devices) - 1; iNdEx >= 0; iNdEx-- { + { + size, err := m.Devices[iNdEx].MarshalToSizedBuffer(dAtA[:i]) + if err != nil { + return 0, err + } + i -= size + i = encodeVarintApi(dAtA, i, uint64(size)) + } + i-- + dAtA[i] = 0xa + } + } + return len(dAtA) - i, nil +} + func (m *ListPodResourcesRequest) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -662,21 +873,21 @@ func (m *ContainerResources) MarshalToSizedBuffer(dAtA []byte) (int, error) { var l int _ = l if len(m.CpuIds) > 0 { - dAtA2 := make([]byte, len(m.CpuIds)*10) - var j1 int + dAtA4 := make([]byte, len(m.CpuIds)*10) + var j3 int for _, num1 := range m.CpuIds { num := uint64(num1) for num >= 1<<7 { - dAtA2[j1] = uint8(uint64(num)&0x7f | 0x80) + dAtA4[j3] = uint8(uint64(num)&0x7f | 0x80) num >>= 7 - j1++ + j3++ } - dAtA2[j1] = uint8(num) - j1++ + dAtA4[j3] = uint8(num) + j3++ } - i -= j1 - copy(dAtA[i:], dAtA2[:j1]) - i = encodeVarintApi(dAtA, i, uint64(j1)) + i -= j3 + copy(dAtA[i:], dAtA4[:j3]) + i = encodeVarintApi(dAtA, i, uint64(j3)) i-- dAtA[i] = 0x1a } @@ -831,6 +1042,37 @@ func encodeVarintApi(dAtA []byte, offset int, v uint64) int { dAtA[offset] = uint8(v) return base } +func (m *AllocatableResourcesRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + return n +} + +func (m *AllocatableResourcesResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if len(m.Devices) > 0 { + for _, e := range m.Devices { + l = e.Size() + n += 1 + l + sovApi(uint64(l)) + } + } + if len(m.CpuIds) > 0 { + l = 0 + for _, e := range m.CpuIds { + l += sovApi(uint64(e)) + } + n += 1 + sovApi(uint64(l)) + l + } + return n +} + func (m *ListPodResourcesRequest) Size() (n int) { if m == nil { return 0 @@ -960,6 +1202,31 @@ func sovApi(x uint64) (n int) { func sozApi(x uint64) (n int) { return sovApi(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } +func (this *AllocatableResourcesRequest) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AllocatableResourcesRequest{`, + `}`, + }, "") + return s +} +func (this *AllocatableResourcesResponse) String() string { + if this == nil { + return "nil" + } + repeatedStringForDevices := "[]*ContainerDevices{" + for _, f := range this.Devices { + repeatedStringForDevices += strings.Replace(f.String(), "ContainerDevices", "ContainerDevices", 1) + "," + } + repeatedStringForDevices += "}" + s := strings.Join([]string{`&AllocatableResourcesResponse{`, + `Devices:` + repeatedStringForDevices + `,`, + `CpuIds:` + fmt.Sprintf("%v", this.CpuIds) + `,`, + `}`, + }, "") + return s +} func (this *ListPodResourcesRequest) String() string { if this == nil { return "nil" @@ -1063,6 +1330,216 @@ func valueToStringApi(v interface{}) string { pv := reflect.Indirect(rv).Interface() return fmt.Sprintf("*%v", pv) } +func (m *AllocatableResourcesRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AllocatableResourcesRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AllocatableResourcesRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AllocatableResourcesResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AllocatableResourcesResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AllocatableResourcesResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Devices", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Devices = append(m.Devices, &ContainerDevices{}) + if err := m.Devices[len(m.Devices)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType == 0 { + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.CpuIds = append(m.CpuIds, v) + } else if wireType == 2 { + var packedLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + packedLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if packedLen < 0 { + return ErrInvalidLengthApi + } + postIndex := iNdEx + packedLen + if postIndex < 0 { + return ErrInvalidLengthApi + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + var elementCount int + var count int + for _, integer := range dAtA[iNdEx:postIndex] { + if integer < 128 { + count++ + } + } + elementCount = count + if elementCount != 0 && len(m.CpuIds) == 0 { + m.CpuIds = make([]int64, 0, elementCount) + } + for iNdEx < postIndex { + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowApi + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.CpuIds = append(m.CpuIds, v) + } + } else { + return fmt.Errorf("proto: wrong wireType = %d for field CpuIds", wireType) + } + default: + iNdEx = preIndex + skippy, err := skipApi(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthApi + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *ListPodResourcesRequest) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 diff --git a/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.proto b/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.proto index cd8dbeaf9579..a05c5fd5605a 100644 --- a/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.proto +++ b/staging/src/k8s.io/kubelet/pkg/apis/podresources/v1/api.proto @@ -18,6 +18,15 @@ option (gogoproto.goproto_unrecognized_all) = false; // node resources consumed by pods and containers on the node service PodResourcesLister { rpc List(ListPodResourcesRequest) returns (ListPodResourcesResponse) {} + rpc GetAllocatableResources(AllocatableResourcesRequest) returns (AllocatableResourcesResponse) {} +} + +message AllocatableResourcesRequest {} + +// AllocatableResourcesResponses contains informations about all the devices known by the kubelet +message AllocatableResourcesResponse { + repeated ContainerDevices devices = 1; + repeated int64 cpu_ids = 2; } // ListPodResourcesRequest is the request made to the PodResourcesLister service diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index e21aee4ef099..379cbe63a5d6 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -81,13 +81,17 @@ func makeCPUManagerPod(podName string, ctnAttributes []ctnAttribute) *v1.Pod { } } +func deletePodSyncByName(f *framework.Framework, podName string) { + gp := int64(0) + delOpts := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } + f.PodClient().DeleteSync(podName, delOpts, framework.DefaultPodDeletionTimeout) +} + func deletePods(f *framework.Framework, podNames []string) { for _, podName := range podNames { - gp := int64(0) - delOpts := metav1.DeleteOptions{ - GracePeriodSeconds: &gp, - } - f.PodClient().DeleteSync(podName, delOpts, framework.DefaultPodDeletionTimeout) + deletePodSyncByName(f, podName) } } @@ -206,6 +210,10 @@ func disableCPUManagerInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.K } func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (oldCfg *kubeletconfig.KubeletConfiguration) { + return configureCPUManagerInKubelet(f, cleanStateFile, cpuset.CPUSet{}) +} + +func configureCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) { // Enable CPU Manager in Kubelet with static policy. oldCfg, err := getCurrentKubeletConfig() framework.ExpectNoError(err) @@ -235,15 +243,21 @@ func enableCPUManagerInKubelet(f *framework.Framework, cleanStateFile bool) (old // Set the CPU Manager reconcile period to 1 second. newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} - // The Kubelet panics if either kube-reserved or system-reserved is not set - // when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that - // kubelet doesn't panic. - if newCfg.KubeReserved == nil { - newCfg.KubeReserved = map[string]string{} - } + if reservedSystemCPUs.Size() > 0 { + cpus := reservedSystemCPUs.String() + framework.Logf("configureCPUManagerInKubelet: using reservedSystemCPUs=%q", cpus) + newCfg.ReservedSystemCPUs = cpus + } else { + // The Kubelet panics if either kube-reserved or system-reserved is not set + // when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that + // kubelet doesn't panic. + if newCfg.KubeReserved == nil { + newCfg.KubeReserved = map[string]string{} + } - if _, ok := newCfg.KubeReserved["cpu"]; !ok { - newCfg.KubeReserved["cpu"] = "200m" + if _, ok := newCfg.KubeReserved["cpu"]; !ok { + newCfg.KubeReserved["cpu"] = "200m" + } } // Update the Kubelet configuration. framework.ExpectNoError(setKubeletConfiguration(f, newCfg)) diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go new file mode 100644 index 000000000000..b4190d75e848 --- /dev/null +++ b/test/e2e_node/podresources_test.go @@ -0,0 +1,754 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "io/ioutil" + "strings" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilfeature "k8s.io/apiserver/pkg/util/feature" + kubeletpodresourcesv1 "k8s.io/kubelet/pkg/apis/podresources/v1" + kubefeatures "k8s.io/kubernetes/pkg/features" + kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" + "k8s.io/kubernetes/pkg/kubelet/apis/podresources" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + "k8s.io/kubernetes/pkg/kubelet/util" + + "k8s.io/kubernetes/test/e2e/framework" + e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" +) + +type podDesc struct { + podName string + cntName string + resourceName string + resourceAmount int + cpuCount int +} + +func makePodResourcesTestPod(desc podDesc) *v1.Pod { + cnt := v1.Container{ + Name: desc.cntName, + Image: busyboxImage, + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{}, + Limits: v1.ResourceList{}, + }, + Command: []string{"sh", "-c", "sleep 1d"}, + } + if desc.cpuCount > 0 { + cnt.Resources.Requests[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%d", desc.cpuCount)) + cnt.Resources.Limits[v1.ResourceCPU] = resource.MustParse(fmt.Sprintf("%d", desc.cpuCount)) + // we don't really care, we only need to be in guaranteed QoS + cnt.Resources.Requests[v1.ResourceMemory] = resource.MustParse("100Mi") + cnt.Resources.Limits[v1.ResourceMemory] = resource.MustParse("100Mi") + } + if desc.resourceName != "" && desc.resourceAmount > 0 { + cnt.Resources.Requests[v1.ResourceName(desc.resourceName)] = resource.MustParse(fmt.Sprintf("%d", desc.resourceAmount)) + cnt.Resources.Limits[v1.ResourceName(desc.resourceName)] = resource.MustParse(fmt.Sprintf("%d", desc.resourceAmount)) + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: desc.podName, + }, + Spec: v1.PodSpec{ + RestartPolicy: v1.RestartPolicyNever, + Containers: []v1.Container{ + cnt, + }, + }, + } +} + +func logPodResources(podIdx int, pr *kubeletpodresourcesv1.PodResources) { + ns := pr.GetNamespace() + cnts := pr.GetContainers() + if len(cnts) == 0 { + framework.Logf("#%02d/%02d/%02d - %s/%s/%s No containers", podIdx, 0, 0, ns, pr.GetName(), "_") + return + } + + for cntIdx, cnt := range cnts { + if len(cnt.Devices) == 0 { + framework.Logf("#%02d/%02d/%02d - %s/%s/%s cpus -> %v resources -> none", podIdx, cntIdx, 0, ns, pr.GetName(), cnt.Name, cnt.CpuIds) + continue + } + + for devIdx, dev := range cnt.Devices { + framework.Logf("#%02d/%02d/%02d - %s/%s/%s cpus -> %v %s -> %s", podIdx, cntIdx, devIdx, ns, pr.GetName(), cnt.Name, cnt.CpuIds, dev.ResourceName, strings.Join(dev.DeviceIds, ", ")) + } + } +} + +type podResMap map[string]map[string]kubeletpodresourcesv1.ContainerResources + +func getPodResources(cli kubeletpodresourcesv1.PodResourcesListerClient) podResMap { + resp, err := cli.List(context.TODO(), &kubeletpodresourcesv1.ListPodResourcesRequest{}) + framework.ExpectNoError(err) + + res := make(map[string]map[string]kubeletpodresourcesv1.ContainerResources) + for idx, podResource := range resp.GetPodResources() { + // to make troubleshooting easier + logPodResources(idx, podResource) + + cnts := make(map[string]kubeletpodresourcesv1.ContainerResources) + for _, cnt := range podResource.GetContainers() { + cnts[cnt.GetName()] = *cnt + } + res[podResource.GetName()] = cnts + } + return res +} + +type testPodData struct { + PodMap map[string]*v1.Pod +} + +func newTestPodData() *testPodData { + return &testPodData{ + PodMap: make(map[string]*v1.Pod), + } +} + +func (tpd *testPodData) createPodsForTest(f *framework.Framework, podReqs []podDesc) { + for _, podReq := range podReqs { + pod := makePodResourcesTestPod(podReq) + pod = f.PodClient().CreateSync(pod) + + framework.Logf("created pod %s", podReq.podName) + tpd.PodMap[podReq.podName] = pod + } +} + +/* deletePodsForTest clean up all the pods run for a testcase. Must ensure proper cleanup */ +func (tpd *testPodData) deletePodsForTest(f *framework.Framework) { + podNS := f.Namespace.Name + var wg sync.WaitGroup + for podName := range tpd.PodMap { + wg.Add(1) + go func(podName string) { + defer ginkgo.GinkgoRecover() + defer wg.Done() + + deletePodSyncByName(f, podName) + waitForAllContainerRemoval(podName, podNS) + }(podName) + } + wg.Wait() +} + +/* deletePod removes pod during a test. Should do a best-effort clean up */ +func (tpd *testPodData) deletePod(f *framework.Framework, podName string) { + _, ok := tpd.PodMap[podName] + if !ok { + return + } + deletePodSyncByName(f, podName) + delete(tpd.PodMap, podName) +} + +func findContainerDeviceByName(devs []*kubeletpodresourcesv1.ContainerDevices, resourceName string) *kubeletpodresourcesv1.ContainerDevices { + for _, dev := range devs { + if dev.ResourceName == resourceName { + return dev + } + } + return nil +} + +func matchPodDescWithResources(expected []podDesc, found podResMap) error { + for _, podReq := range expected { + framework.Logf("matching: %#v", podReq) + + podInfo, ok := found[podReq.podName] + if !ok { + return fmt.Errorf("no pod resources for pod %q", podReq.podName) + } + cntInfo, ok := podInfo[podReq.cntName] + if !ok { + return fmt.Errorf("no container resources for pod %q container %q", podReq.podName, podReq.cntName) + } + + if podReq.cpuCount > 0 { + if len(cntInfo.CpuIds) != podReq.cpuCount { + return fmt.Errorf("pod %q container %q expected %d cpus got %v", podReq.podName, podReq.cntName, podReq.cpuCount, cntInfo.CpuIds) + } + } + + if podReq.resourceName != "" && podReq.resourceAmount > 0 { + dev := findContainerDeviceByName(cntInfo.GetDevices(), podReq.resourceName) + if dev == nil { + return fmt.Errorf("pod %q container %q expected data for resource %q not found", podReq.podName, podReq.cntName, podReq.resourceName) + } + if len(dev.DeviceIds) != podReq.resourceAmount { + return fmt.Errorf("pod %q container %q resource %q expected %d items got %v", podReq.podName, podReq.cntName, podReq.resourceName, podReq.resourceAmount, dev.DeviceIds) + } + } else { + devs := cntInfo.GetDevices() + if len(devs) > 0 { + return fmt.Errorf("pod %q container %q expected no resources, got %v", podReq.podName, podReq.cntName, devs) + } + } + } + return nil +} + +func expectPodResources(offset int, cli kubeletpodresourcesv1.PodResourcesListerClient, expected []podDesc) { + gomega.EventuallyWithOffset(1+offset, func() error { + found := getPodResources(cli) + return matchPodDescWithResources(expected, found) + }, time.Minute, 10*time.Second).Should(gomega.BeNil()) +} + +func filterOutDesc(descs []podDesc, name string) []podDesc { + var ret []podDesc + for _, desc := range descs { + if desc.podName == name { + continue + } + ret = append(ret, desc) + } + return ret +} + +func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData) { + var tpd *testPodData + + var found podResMap + var expected []podDesc + var extra podDesc + + expectedBasePods := 0 /* nothing but pods we create */ + if sd != nil { + expectedBasePods = 1 // sriovdp + } + + ginkgo.By("checking the output when no pods are present") + found = getPodResources(cli) + gomega.ExpectWithOffset(1, found).To(gomega.HaveLen(expectedBasePods), "base pod expectation mismatch") + + tpd = newTestPodData() + ginkgo.By("checking the output when only pods which don't require resources are present") + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + }, + { + podName: "pod-01", + cntName: "cnt-00", + }, + } + tpd.createPodsForTest(f, expected) + expectPodResources(1, cli, expected) + tpd.deletePodsForTest(f) + + tpd = newTestPodData() + ginkgo.By("checking the output when only a subset of pods require resources") + if sd != nil { + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + }, + { + podName: "pod-01", + cntName: "cnt-00", + resourceName: sd.resourceName, + resourceAmount: 1, + cpuCount: 2, + }, + { + podName: "pod-02", + cntName: "cnt-00", + cpuCount: 2, + }, + { + podName: "pod-03", + cntName: "cnt-00", + resourceName: sd.resourceName, + resourceAmount: 1, + cpuCount: 1, + }, + } + } else { + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + }, + { + podName: "pod-01", + cntName: "cnt-00", + cpuCount: 2, + }, + { + podName: "pod-02", + cntName: "cnt-00", + cpuCount: 2, + }, + { + podName: "pod-03", + cntName: "cnt-00", + cpuCount: 1, + }, + } + + } + tpd.createPodsForTest(f, expected) + expectPodResources(1, cli, expected) + tpd.deletePodsForTest(f) + + tpd = newTestPodData() + ginkgo.By("checking the output when creating pods which require resources between calls") + if sd != nil { + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + }, + { + podName: "pod-01", + cntName: "cnt-00", + resourceName: sd.resourceName, + resourceAmount: 1, + cpuCount: 2, + }, + { + podName: "pod-02", + cntName: "cnt-00", + cpuCount: 2, + }, + } + } else { + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + }, + { + podName: "pod-01", + cntName: "cnt-00", + cpuCount: 2, + }, + { + podName: "pod-02", + cntName: "cnt-00", + cpuCount: 2, + }, + } + } + + tpd.createPodsForTest(f, expected) + expectPodResources(1, cli, expected) + + if sd != nil { + extra = podDesc{ + podName: "pod-03", + cntName: "cnt-00", + resourceName: sd.resourceName, + resourceAmount: 1, + cpuCount: 1, + } + } else { + extra = podDesc{ + podName: "pod-03", + cntName: "cnt-00", + cpuCount: 1, + } + + } + + tpd.createPodsForTest(f, []podDesc{ + extra, + }) + + expected = append(expected, extra) + expectPodResources(1, cli, expected) + tpd.deletePodsForTest(f) + + tpd = newTestPodData() + ginkgo.By("checking the output when deleting pods which require resources between calls") + + if sd != nil { + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + cpuCount: 1, + }, + { + podName: "pod-01", + cntName: "cnt-00", + resourceName: sd.resourceName, + resourceAmount: 1, + cpuCount: 2, + }, + { + podName: "pod-02", + cntName: "cnt-00", + }, + { + podName: "pod-03", + cntName: "cnt-00", + resourceName: sd.resourceName, + resourceAmount: 1, + cpuCount: 1, + }, + } + } else { + expected = []podDesc{ + { + podName: "pod-00", + cntName: "cnt-00", + cpuCount: 1, + }, + { + podName: "pod-01", + cntName: "cnt-00", + cpuCount: 2, + }, + { + podName: "pod-02", + cntName: "cnt-00", + }, + { + podName: "pod-03", + cntName: "cnt-00", + cpuCount: 1, + }, + } + } + tpd.createPodsForTest(f, expected) + expectPodResources(1, cli, expected) + + tpd.deletePod(f, "pod-01") + expectedPostDelete := filterOutDesc(expected, "pod-01") + expectPodResources(1, cli, expectedPostDelete) + tpd.deletePodsForTest(f) +} + +func podresourcesGetAllocatableResourcesTests(f *framework.Framework, cli kubeletpodresourcesv1.PodResourcesListerClient, sd *sriovData, onlineCPUs, reservedSystemCPUs cpuset.CPUSet) { + ginkgo.By("checking the devices known to the kubelet") + resp, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectNoErrorWithOffset(1, err) + devs := resp.GetDevices() + allocatableCPUs := cpuset.NewCPUSetInt64(resp.GetCpuIds()...) + + if onlineCPUs.Size() == 0 { + ginkgo.By("expecting no CPUs reported") + gomega.ExpectWithOffset(1, onlineCPUs.Size()).To(gomega.Equal(reservedSystemCPUs.Size()), "with no online CPUs, no CPUs should be reserved") + } else { + ginkgo.By(fmt.Sprintf("expecting online CPUs reported - online=%v (%d) reserved=%v (%d)", onlineCPUs, onlineCPUs.Size(), reservedSystemCPUs, reservedSystemCPUs.Size())) + if reservedSystemCPUs.Size() > onlineCPUs.Size() { + ginkgo.Fail("more reserved CPUs than online") + } + expectedCPUs := onlineCPUs.Difference(reservedSystemCPUs) + + ginkgo.By(fmt.Sprintf("expecting CPUs '%v'='%v'", allocatableCPUs, expectedCPUs)) + gomega.ExpectWithOffset(1, allocatableCPUs.Equals(expectedCPUs)).To(gomega.BeTrue(), "mismatch expecting CPUs") + } + + if sd == nil { // no devices in the environment, so expect no devices + ginkgo.By("expecting no devices reported") + gomega.ExpectWithOffset(1, devs).To(gomega.BeEmpty(), fmt.Sprintf("got unexpected devices %#v", devs)) + return + } + + ginkgo.By(fmt.Sprintf("expecting some %q devices reported", sd.resourceName)) + gomega.ExpectWithOffset(1, devs).ToNot(gomega.BeEmpty()) + for _, dev := range devs { + framework.ExpectEqual(dev.ResourceName, sd.resourceName) + gomega.ExpectWithOffset(1, dev.DeviceIds).ToNot(gomega.BeEmpty()) + } +} + +// Serial because the test updates kubelet configuration. +var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:PodResources]", func() { + f := framework.NewDefaultFramework("podresources-test") + + reservedSystemCPUs := cpuset.MustParse("1") + + ginkgo.Context("With SRIOV devices in the system", func() { + ginkgo.It("should return the expected responses with cpumanager static policy enabled", func() { + // this is a very rough check. We just want to rule out system that does NOT have enough resources + _, cpuAlloc, _ := getLocalNodeCPUDetails(f) + + if cpuAlloc < minCoreCount { + e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount) + } + if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 { + e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device") + } + + onlineCPUs, err := getOnlineCPUs() + framework.ExpectNoError(err) + + // Make sure all the feature gates and the right settings are in place. + oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) + sd := setupSRIOVConfigOrFail(f, configMap) + defer teardownSRIOVConfigOrFail(f, sd) + + waitForSRIOVResources(f, sd) + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + waitForSRIOVResources(f, sd) + + ginkgo.By("checking List()") + podresourcesListTests(f, cli, sd) + ginkgo.By("checking GetAllocatableResources()") + podresourcesGetAllocatableResourcesTests(f, cli, sd, onlineCPUs, reservedSystemCPUs) + + }) + + ginkgo.It("should return the expected responses with cpumanager none policy", func() { + // current default is "none" policy - no need to restart the kubelet + + if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount == 0 { + e2eskipper.Skipf("this test is meant to run on a system with at least one configured VF from SRIOV device") + } + + oldCfg := enablePodResourcesFeatureGateInKubelet(f) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + configMap := getSRIOVDevicePluginConfigMap(framework.TestContext.SriovdpConfigMapFile) + sd := setupSRIOVConfigOrFail(f, configMap) + defer teardownSRIOVConfigOrFail(f, sd) + + waitForSRIOVResources(f, sd) + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + waitForSRIOVResources(f, sd) + + // intentionally passing empty cpuset instead of onlineCPUs because with none policy + // we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static + podresourcesGetAllocatableResourcesTests(f, cli, sd, cpuset.CPUSet{}, cpuset.CPUSet{}) + }) + + }) + + ginkgo.Context("Without SRIOV devices in the system", func() { + ginkgo.It("should return the expected responses with cpumanager static policy enabled", func() { + // this is a very rough check. We just want to rule out system that does NOT have enough resources + _, cpuAlloc, _ := getLocalNodeCPUDetails(f) + + if cpuAlloc < minCoreCount { + e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount) + } + if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount > 0 { + e2eskipper.Skipf("this test is meant to run on a system with no configured VF from SRIOV device") + } + + onlineCPUs, err := getOnlineCPUs() + framework.ExpectNoError(err) + + // Make sure all the feature gates and the right settings are in place. + oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + podresourcesListTests(f, cli, nil) + podresourcesGetAllocatableResourcesTests(f, cli, nil, onlineCPUs, reservedSystemCPUs) + }) + + ginkgo.It("should return the expected responses with cpumanager none policy", func() { + // current default is "none" policy - no need to restart the kubelet + + if sriovdevCount, err := countSRIOVDevices(); err != nil || sriovdevCount > 0 { + e2eskipper.Skipf("this test is meant to run on a system with no configured VF from SRIOV device") + } + + oldCfg := enablePodResourcesFeatureGateInKubelet(f) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + // intentionally passing empty cpuset instead of onlineCPUs because with none policy + // we should get no allocatable cpus - no exclusively allocatable CPUs, depends on policy static + podresourcesGetAllocatableResourcesTests(f, cli, nil, cpuset.CPUSet{}, cpuset.CPUSet{}) + }) + + ginkgo.It("should return the expected error with the feature gate disabled", func() { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResourcesGetAllocatable) { + e2eskipper.Skipf("this test is meant to run with the POD Resources Extensions feature gate disabled") + } + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + ginkgo.By("checking GetAllocatableResources fail if the feature gate is not enabled") + _, err = cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.ExpectError(err, "With feature gate disabled, the call must fail") + }) + + }) +}) + +func getOnlineCPUs() (cpuset.CPUSet, error) { + onlineCPUList, err := ioutil.ReadFile("/sys/devices/system/cpu/online") + if err != nil { + return cpuset.CPUSet{}, err + } + return cpuset.Parse(strings.TrimSpace(string(onlineCPUList))) +} + +func configurePodResourcesInKubelet(f *framework.Framework, cleanStateFile bool, reservedSystemCPUs cpuset.CPUSet) (oldCfg *kubeletconfig.KubeletConfiguration) { + // we also need CPUManager with static policy to be able to do meaningful testing + oldCfg, err := getCurrentKubeletConfig() + framework.ExpectNoError(err) + newCfg := oldCfg.DeepCopy() + if newCfg.FeatureGates == nil { + newCfg.FeatureGates = make(map[string]bool) + } + newCfg.FeatureGates["CPUManager"] = true + newCfg.FeatureGates["KubeletPodResourcesGetAllocatable"] = true + + // After graduation of the CPU Manager feature to Beta, the CPU Manager + // "none" policy is ON by default. But when we set the CPU Manager policy to + // "static" in this test and the Kubelet is restarted so that "static" + // policy can take effect, there will always be a conflict with the state + // checkpointed in the disk (i.e., the policy checkpointed in the disk will + // be "none" whereas we are trying to restart Kubelet with "static" + // policy). Therefore, we delete the state file so that we can proceed + // with the tests. + // Only delete the state file at the begin of the tests. + if cleanStateFile { + deleteStateFile() + } + + // Set the CPU Manager policy to static. + newCfg.CPUManagerPolicy = string(cpumanager.PolicyStatic) + + // Set the CPU Manager reconcile period to 1 second. + newCfg.CPUManagerReconcilePeriod = metav1.Duration{Duration: 1 * time.Second} + + if reservedSystemCPUs.Size() > 0 { + cpus := reservedSystemCPUs.String() + framework.Logf("configurePodResourcesInKubelet: using reservedSystemCPUs=%q", cpus) + newCfg.ReservedSystemCPUs = cpus + } else { + // The Kubelet panics if either kube-reserved or system-reserved is not set + // when CPU Manager is enabled. Set cpu in kube-reserved > 0 so that + // kubelet doesn't panic. + if newCfg.KubeReserved == nil { + newCfg.KubeReserved = map[string]string{} + } + + if _, ok := newCfg.KubeReserved["cpu"]; !ok { + newCfg.KubeReserved["cpu"] = "200m" + } + } + // Update the Kubelet configuration. + framework.ExpectNoError(setKubeletConfiguration(f, newCfg)) + + // Wait for the Kubelet to be ready. + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + return oldCfg +} + +func enablePodResourcesFeatureGateInKubelet(f *framework.Framework) (oldCfg *kubeletconfig.KubeletConfiguration) { + oldCfg, err := getCurrentKubeletConfig() + framework.ExpectNoError(err) + newCfg := oldCfg.DeepCopy() + if newCfg.FeatureGates == nil { + newCfg.FeatureGates = make(map[string]bool) + } + newCfg.FeatureGates["KubeletPodResourcesGetAllocatable"] = true + + // Update the Kubelet configuration. + framework.ExpectNoError(setKubeletConfiguration(f, newCfg)) + + // Wait for the Kubelet to be ready. + gomega.Eventually(func() bool { + nodes, err := e2enode.TotalReady(f.ClientSet) + framework.ExpectNoError(err) + return nodes == 1 + }, time.Minute, time.Second).Should(gomega.BeTrue()) + + return oldCfg +} diff --git a/test/e2e_node/topology_manager_test.go b/test/e2e_node/topology_manager_test.go index 0a4953f74266..21a80d7c0286 100644 --- a/test/e2e_node/topology_manager_test.go +++ b/test/e2e_node/topology_manager_test.go @@ -89,13 +89,17 @@ func detectCoresPerSocket() int { return coreCount } -func detectSRIOVDevices() int { +func countSRIOVDevices() (int, error) { outData, err := exec.Command("/bin/sh", "-c", "ls /sys/bus/pci/devices/*/physfn | wc -w").Output() - framework.ExpectNoError(err) + if err != nil { + return -1, err + } + return strconv.Atoi(strings.TrimSpace(string(outData))) +} - devCount, err := strconv.Atoi(strings.TrimSpace(string(outData))) +func detectSRIOVDevices() int { + devCount, err := countSRIOVDevices() framework.ExpectNoError(err) - return devCount } @@ -387,6 +391,11 @@ func runTopologyManagerPolicySuiteTests(f *framework.Framework) { runMultipleGuPods(f) } +// waitForAllContainerRemoval waits until all the containers on a given pod are really gone. +// This is needed by the e2e tests which involve exclusive resource allocation (cpu, topology manager; podresources; etc.) +// In these cases, we need to make sure the tests clean up after themselves to make sure each test runs in +// a pristine environment. The only way known so far to do that is to introduce this wait. +// Worth noting, however, that this makes the test runtime much bigger. func waitForAllContainerRemoval(podName, podNS string) { rs, _, err := getCRIClient() framework.ExpectNoError(err) @@ -434,7 +443,7 @@ func runTopologyManagerPositiveTest(f *framework.Framework, numPods int, ctnAttr pod := pods[podID] framework.Logf("deleting the pod %s/%s and waiting for container removal", pod.Namespace, pod.Name) - deletePods(f, []string{pod.Name}) + deletePodSyncByName(f, pod.Name) waitForAllContainerRemoval(pod.Name, pod.Namespace) } } @@ -462,7 +471,7 @@ func runTopologyManagerNegativeTest(f *framework.Framework, ctnAttrs, initCtnAtt framework.Failf("pod %s failed for wrong reason: %q", pod.Name, pod.Status.Reason) } - deletePods(f, []string{pod.Name}) + deletePodSyncByName(f, pod.Name) } func isTopologyAffinityError(pod *v1.Pod) bool { @@ -565,7 +574,7 @@ func teardownSRIOVConfigOrFail(f *framework.Framework, sd *sriovData) { ginkgo.By(fmt.Sprintf("Delete SRIOV device plugin pod %s/%s", sd.pod.Namespace, sd.pod.Name)) err = f.ClientSet.CoreV1().Pods(sd.pod.Namespace).Delete(context.TODO(), sd.pod.Name, deleteOptions) framework.ExpectNoError(err) - waitForContainerRemoval(sd.pod.Spec.Containers[0].Name, sd.pod.Name, sd.pod.Namespace) + waitForAllContainerRemoval(sd.pod.Name, sd.pod.Namespace) ginkgo.By(fmt.Sprintf("Deleting configMap %v/%v", metav1.NamespaceSystem, sd.configMap.Name)) err = f.ClientSet.CoreV1().ConfigMaps(metav1.NamespaceSystem).Delete(context.TODO(), sd.configMap.Name, deleteOptions)