Skip to content

Commit

Permalink
Merge pull request #103979 from cynepco3hahue/save_admitted_pods
Browse files Browse the repository at this point in the history
Do not clear state of pods pending admission for CPU/Memory/Device manager
  • Loading branch information
k8s-ci-robot committed Aug 9, 2021
2 parents 4023eb7 + 73a5cce commit 6a04333
Show file tree
Hide file tree
Showing 7 changed files with 217 additions and 11 deletions.
27 changes: 25 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ type manager struct {

// allocatableCPUs is the set of online CPUs as reported by the system
allocatableCPUs cpuset.CPUSet

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

var _ Manager = &manager{}
Expand Down Expand Up @@ -236,6 +239,10 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
}

func (m *manager) Allocate(p *v1.Pod, c *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(p)

// Garbage collect any stranded resources before allocating CPUs.
m.removeStaleState()

Expand Down Expand Up @@ -304,13 +311,19 @@ func (m *manager) State() state.Reader {
}

func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
return m.policy.GetTopologyHints(m.state, pod, container)
}

func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)
// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand Down Expand Up @@ -343,11 +356,14 @@ func (m *manager) removeStaleState() {
defer m.Unlock()

// Get the list of active pods.
activePods := m.activePods()
activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}

// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
for _, pod := range activeAndAdmittedPods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
Expand Down Expand Up @@ -493,3 +509,10 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet)
func (m *manager) GetCPUs(podUID, containerName string) cpuset.CPUSet {
return m.state.GetCPUSetOrDefault(podUID, containerName)
}

func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
m.Lock()
defer m.Unlock()

m.pendingAdmissionPod = pod
}
4 changes: 2 additions & 2 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestCPUManagerAdd(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
mgr.activePods = func() []*v1.Pod { return nil }

err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
Expand Down Expand Up @@ -1043,7 +1043,7 @@ func TestCPUManagerAddWithResvList(t *testing.T) {

pod := makePod("fakePod", "fakeContainer", "2", "2")
container := &pod.Spec.Containers[0]
mgr.activePods = func() []*v1.Pod { return []*v1.Pod{pod} }
mgr.activePods = func() []*v1.Pod { return nil }

err := mgr.Allocate(pod, container)
if !reflect.DeepEqual(err, testCase.expAllocateErr) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/kubelet/cm/cpumanager/topology_hints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ func TestGetTopologyHints(t *testing.T) {
if len(tc.expectedHints) == 0 && len(hints) == 0 {
continue
}

if m.pendingAdmissionPod == nil {
t.Errorf("The pendingAdmissionPod should point to the current pod after the call to GetTopologyHints()")
}

sort.SliceStable(hints, func(i, j int) bool {
return hints[i].LessThan(hints[j])
})
Expand Down Expand Up @@ -236,6 +241,7 @@ func TestGetPodTopologyHints(t *testing.T) {
if len(tc.expectedHints) == 0 && len(podHints) == 0 {
continue
}

sort.SliceStable(podHints, func(i, j int) bool {
return podHints[i].LessThan(podHints[j])
})
Expand Down
24 changes: 22 additions & 2 deletions pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ type ManagerImpl struct {
// devicesToReuse contains devices that can be reused as they have been allocated to
// init containers.
devicesToReuse PodReusableDevices

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

type endpointInfo struct {
Expand Down Expand Up @@ -367,6 +370,10 @@ func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
// Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins.
func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

if _, ok := m.devicesToReuse[string(pod.UID)]; !ok {
m.devicesToReuse[string(pod.UID)] = make(map[string]sets.String)
}
Expand Down Expand Up @@ -619,14 +626,20 @@ func (m *ManagerImpl) readCheckpoint() error {

// UpdateAllocatedDevices frees any Devices that are bound to terminated pods.
func (m *ManagerImpl) UpdateAllocatedDevices() {
activePods := m.activePods()
if !m.sourcesReady.AllReady() {
return
}

m.mutex.Lock()
defer m.mutex.Unlock()

activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}

podsToBeRemoved := m.podDevices.pods()
for _, pod := range activePods {
for _, pod := range activeAndAdmittedPods {
podsToBeRemoved.Delete(string(pod.UID))
}
if len(podsToBeRemoved) <= 0 {
Expand Down Expand Up @@ -1117,3 +1130,10 @@ func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
}
return false
}

func (m *ManagerImpl) setPodPendingAdmission(pod *v1.Pod) {
m.mutex.Lock()
defer m.mutex.Unlock()

m.pendingAdmissionPod = pod
}
8 changes: 8 additions & 0 deletions pkg/kubelet/cm/devicemanager/topology_hints.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
// ensures the Device Manager is consulted when Topology Aware Hints for each
// container are created.
func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()

Expand Down Expand Up @@ -83,6 +87,10 @@ func (m *ManagerImpl) GetTopologyHints(pod *v1.Pod, container *v1.Container) map
// GetPodTopologyHints implements the topologymanager.HintProvider Interface which
// ensures the Device Manager is consulted when Topology Aware Hints for Pod are created.
func (m *ManagerImpl) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded device resources before providing TopologyHints
m.UpdateAllocatedDevices()

Expand Down
31 changes: 28 additions & 3 deletions pkg/kubelet/cm/memorymanager/memory_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ type manager struct {

// allocatableMemory holds the allocatable memory for each NUMA node
allocatableMemory []state.Block

// pendingAdmissionPod contain the pod during the admission phase
pendingAdmissionPod *v1.Pod
}

var _ Manager = &manager{}
Expand Down Expand Up @@ -230,6 +233,10 @@ func (m *manager) GetMemoryNUMANodes(pod *v1.Pod, container *v1.Container) sets.

// Allocate is called to pre-allocate memory resources during Pod admission.
func (m *manager) Allocate(pod *v1.Pod, container *v1.Container) error {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before allocation
m.removeStaleState()

Expand Down Expand Up @@ -268,6 +275,10 @@ func (m *manager) State() state.Reader {

// GetPodTopologyHints returns the topology hints for the topology manager
func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand All @@ -276,6 +287,10 @@ func (m *manager) GetPodTopologyHints(pod *v1.Pod) map[string][]topologymanager.

// GetTopologyHints returns the topology hints for the topology manager
func (m *manager) GetTopologyHints(pod *v1.Pod, container *v1.Container) map[string][]topologymanager.TopologyHint {
// The pod is during the admission phase. We need to save the pod to avoid it
// being cleaned before the admission ended
m.setPodPendingAdmission(pod)

// Garbage collect any stranded resources before providing TopologyHints
m.removeStaleState()
// Delegate to active policy
Expand All @@ -298,12 +313,15 @@ func (m *manager) removeStaleState() {
m.Lock()
defer m.Unlock()

// Get the list of active pods.
activePods := m.activePods()
// Get the list of admitted and active pods.
activeAndAdmittedPods := m.activePods()
if m.pendingAdmissionPod != nil {
activeAndAdmittedPods = append(activeAndAdmittedPods, m.pendingAdmissionPod)
}

// Build a list of (podUID, containerName) pairs for all containers in all active Pods.
activeContainers := make(map[string]map[string]struct{})
for _, pod := range activePods {
for _, pod := range activeAndAdmittedPods {
activeContainers[string(pod.UID)] = make(map[string]struct{})
for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
activeContainers[string(pod.UID)][container.Name] = struct{}{}
Expand Down Expand Up @@ -430,3 +448,10 @@ func (m *manager) GetAllocatableMemory() []state.Block {
func (m *manager) GetMemory(podUID, containerName string) []state.Block {
return m.state.GetMemoryBlocks(podUID, containerName)
}

func (m *manager) setPodPendingAdmission(pod *v1.Pod) {
m.Lock()
defer m.Unlock()

m.pendingAdmissionPod = pod
}

0 comments on commit 6a04333

Please sign in to comment.