From 04fcbd721cd3e72eb9375275891e9bc3e28c3341 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Tue, 2 Aug 2022 07:58:08 +0200 Subject: [PATCH] Introduction of a pod condition type indicating disruption. Its `reason` field indicates the reason: - PreemptionByKubeScheduler (Pod preempted by kube-scheduler) - DeletionByTaintManager (Pod deleted by taint manager due to NoExecute taint) - EvictionByEvictionAPI (Pod evicted by Eviction API) - DeletionByPodGC (an orphaned Pod deleted by PodGC)PreemptedByScheduler (Pod preempted by kube-scheduler) --- pkg/apis/core/types.go | 4 + .../nodelifecycle/scheduler/taint_manager.go | 27 +- .../scheduler/taint_manager_test.go | 253 +++++++------- pkg/controller/podgc/gc_controller.go | 53 ++- pkg/controller/podgc/gc_controller_test.go | 265 ++++++++++----- pkg/features/kube_features.go | 10 + pkg/kubelet/status/status_manager.go | 2 +- pkg/registry/core/pod/storage/eviction.go | 44 ++- pkg/registry/core/pod/storage/storage.go | 2 +- .../framework/preemption/preemption.go | 26 +- pkg/util/pod/pod.go | 4 +- pkg/util/pod/pod_test.go | 2 +- .../rbac/bootstrappolicy/controller_policy.go | 48 ++- staging/src/k8s.io/api/core/v1/types.go | 4 + test/integration/evictions/evictions_test.go | 114 +++++-- test/integration/node/lifecycle_test.go | 139 ++++++++ test/integration/podgc/podgc_test.go | 315 +++++++++++------- .../scheduler/preemption/preemption_test.go | 53 ++- 18 files changed, 974 insertions(+), 391 deletions(-) diff --git a/pkg/apis/core/types.go b/pkg/apis/core/types.go index 8d49b3bd3a45..a140753fb61b 100644 --- a/pkg/apis/core/types.go +++ b/pkg/apis/core/types.go @@ -2430,6 +2430,10 @@ const ( PodReasonUnschedulable = "Unschedulable" // ContainersReady indicates whether all containers in the pod are ready. ContainersReady PodConditionType = "ContainersReady" + // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be deleted due to a + // disruption (such as preemption, eviction API or garbage-collection). + // The constant is to be renamed once the name is accepted within the KEP-3329. + AlphaNoCompatGuaranteeDisruptionTarget PodConditionType = "DisruptionTarget" ) // PodCondition represents pod's condition diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager.go b/pkg/controller/nodelifecycle/scheduler/taint_manager.go index 243a1a52eff0..b79435d5081d 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager.go @@ -30,14 +30,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" v1core "k8s.io/client-go/kubernetes/typed/core/v1" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/core/helper" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" + utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/klog/v2" ) @@ -105,7 +109,7 @@ func deletePodHandler(c clientset.Interface, emitEventFunc func(types.Namespaced } var err error for i := 0; i < retries; i++ { - err = c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) + err = addConditionAndDeletePod(ctx, c, name, ns) if err == nil { break } @@ -115,6 +119,27 @@ func deletePodHandler(c clientset.Interface, emitEventFunc func(types.Namespaced } } +func addConditionAndDeletePod(ctx context.Context, c clientset.Interface, name, ns string) (err error) { + if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + newStatus := pod.Status.DeepCopy() + if apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByTaintManager", + Message: "Taint manager: deleting due to NoExecute taint", + }) { + if _, _, _, err = utilpod.PatchPodStatus(ctx, c, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + return err + } + } + } + return c.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{}) +} + func getNoExecuteTaints(taints []v1.Taint) []v1.Taint { result := []v1.Taint{} for i := range taints { diff --git a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go index 71803915090a..741f0b3b3ae1 100644 --- a/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go +++ b/pkg/controller/nodelifecycle/scheduler/taint_manager_test.go @@ -27,11 +27,14 @@ import ( "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller/testutil" + "k8s.io/kubernetes/pkg/features" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -128,10 +131,12 @@ func TestFilterNoExecuteTaints(t *testing.T) { func TestCreatePod(t *testing.T) { testCases := []struct { - description string - pod *v1.Pod - taintedNodes map[string][]v1.Taint - expectDelete bool + description string + pod *v1.Pod + taintedNodes map[string][]v1.Taint + expectPatch bool + expectDelete bool + enablePodDisruptionConditions bool }{ { description: "not scheduled - ignore", @@ -153,6 +158,16 @@ func TestCreatePod(t *testing.T) { }, expectDelete: true, }, + { + description: "schedule on tainted Node; PodDisruptionConditions enabled", + pod: testutil.NewPod("pod1", "node1"), + taintedNodes: map[string][]v1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + enablePodDisruptionConditions: true, + }, { description: "schedule on tainted Node with finite toleration", pod: addToleration(testutil.NewPod("pod1", "node1"), 1, 100), @@ -180,28 +195,24 @@ func TestCreatePod(t *testing.T) { } for _, item := range testCases { - ctx, cancel := context.WithCancel(context.Background()) - fakeClientset := fake.NewSimpleClientset() - controller, podIndexer, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset) - controller.recorder = testutil.NewFakeRecorder() - go controller.Run(ctx) - controller.taintedNodes = item.taintedNodes - - podIndexer.Add(item.pod) - controller.PodUpdated(nil, item.pod) - // wait a bit - time.Sleep(timeForControllerToProgress) - - podDeleted := false - for _, action := range fakeClientset.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podDeleted = true - } - } - if podDeleted != item.expectDelete { - t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted) - } - cancel() + t.Run(item.description, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)() + ctx, cancel := context.WithCancel(context.Background()) + fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*item.pod}}) + controller, podIndexer, _ := setupNewNoExecuteTaintManager(ctx, fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.taintedNodes = item.taintedNodes + + podIndexer.Add(item.pod) + controller.PodUpdated(nil, item.pod) + // wait a bit + time.Sleep(timeForControllerToProgress) + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + + cancel() + }) } } @@ -222,13 +233,26 @@ func TestDeletePod(t *testing.T) { func TestUpdatePod(t *testing.T) { testCases := []struct { - description string - prevPod *v1.Pod - newPod *v1.Pod - taintedNodes map[string][]v1.Taint - expectDelete bool - additionalSleep time.Duration + description string + prevPod *v1.Pod + newPod *v1.Pod + taintedNodes map[string][]v1.Taint + expectPatch bool + expectDelete bool + additionalSleep time.Duration + enablePodDisruptionConditions bool }{ + { + description: "scheduling onto tainted Node results in patch and delete when PodDisruptionConditions enabled", + prevPod: testutil.NewPod("pod1", ""), + newPod: testutil.NewPod("pod1", "node1"), + taintedNodes: map[string][]v1.Taint{ + "node1": {createNoExecuteTaint(1)}, + }, + expectPatch: true, + expectDelete: true, + enablePodDisruptionConditions: true, + }, { description: "scheduling onto tainted Node", prevPod: testutil.NewPod("pod1", ""), @@ -269,36 +293,31 @@ func TestUpdatePod(t *testing.T) { } for _, item := range testCases { - ctx, cancel := context.WithCancel(context.Background()) - fakeClientset := fake.NewSimpleClientset() - controller, podIndexer, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) - controller.recorder = testutil.NewFakeRecorder() - go controller.Run(ctx) - controller.taintedNodes = item.taintedNodes - - podIndexer.Add(item.prevPod) - controller.PodUpdated(nil, item.prevPod) + t.Run(item.description, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)() + ctx, cancel := context.WithCancel(context.Background()) + fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*item.prevPod}}) + controller, podIndexer, _ := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(ctx) + controller.taintedNodes = item.taintedNodes - fakeClientset.ClearActions() - time.Sleep(timeForControllerToProgress) - podIndexer.Update(item.newPod) - controller.PodUpdated(item.prevPod, item.newPod) - // wait a bit - time.Sleep(timeForControllerToProgress) - if item.additionalSleep > 0 { - time.Sleep(item.additionalSleep) - } + podIndexer.Add(item.prevPod) + controller.PodUpdated(nil, item.prevPod) - podDeleted := false - for _, action := range fakeClientset.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podDeleted = true + fakeClientset.ClearActions() + time.Sleep(timeForControllerToProgress) + podIndexer.Update(item.newPod) + controller.PodUpdated(item.prevPod, item.newPod) + // wait a bit + time.Sleep(timeForControllerToProgress) + if item.additionalSleep > 0 { + time.Sleep(item.additionalSleep) } - } - if podDeleted != item.expectDelete { - t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted) - } - cancel() + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + cancel() + }) } } @@ -346,15 +365,8 @@ func TestCreateNode(t *testing.T) { // wait a bit time.Sleep(timeForControllerToProgress) - podDeleted := false - for _, action := range fakeClientset.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podDeleted = true - } - } - if podDeleted != item.expectDelete { - t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted) - } + verifyPodActions(t, item.description, fakeClientset, false, item.expectDelete) + cancel() } } @@ -381,13 +393,26 @@ func TestDeleteNode(t *testing.T) { func TestUpdateNode(t *testing.T) { testCases := []struct { - description string - pods []v1.Pod - oldNode *v1.Node - newNode *v1.Node - expectDelete bool - additionalSleep time.Duration + description string + pods []v1.Pod + oldNode *v1.Node + newNode *v1.Node + expectPatch bool + expectDelete bool + additionalSleep time.Duration + enablePodDisruptionConditions bool }{ + { + description: "Added taint, expect node patched and deleted when PodDisruptionConditions is enabled", + pods: []v1.Pod{ + *testutil.NewPod("pod1", "node1"), + }, + oldNode: testutil.NewNode("node1"), + newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), + expectPatch: true, + expectDelete: true, + enablePodDisruptionConditions: true, + }, { description: "Added taint", pods: []v1.Pod{ @@ -458,29 +483,24 @@ func TestUpdateNode(t *testing.T) { } for _, item := range testCases { - stopCh := make(chan struct{}) - fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) - controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) - nodeIndexer.Add(item.newNode) - controller.recorder = testutil.NewFakeRecorder() - go controller.Run(context.TODO()) - controller.NodeUpdated(item.oldNode, item.newNode) - // wait a bit - time.Sleep(timeForControllerToProgress) - if item.additionalSleep > 0 { - time.Sleep(item.additionalSleep) - } - - podDeleted := false - for _, action := range fakeClientset.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podDeleted = true + t.Run(item.description, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, item.enablePodDisruptionConditions)() + stopCh := make(chan struct{}) + fakeClientset := fake.NewSimpleClientset(&v1.PodList{Items: item.pods}) + controller, _, nodeIndexer := setupNewNoExecuteTaintManager(context.TODO(), fakeClientset) + nodeIndexer.Add(item.newNode) + controller.recorder = testutil.NewFakeRecorder() + go controller.Run(context.TODO()) + controller.NodeUpdated(item.oldNode, item.newNode) + // wait a bit + time.Sleep(timeForControllerToProgress) + if item.additionalSleep > 0 { + time.Sleep(item.additionalSleep) } - } - if podDeleted != item.expectDelete { - t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted) - } - close(stopCh) + + verifyPodActions(t, item.description, fakeClientset, item.expectPatch, item.expectDelete) + close(stopCh) + }) } } @@ -765,7 +785,7 @@ func TestEventualConsistency(t *testing.T) { newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100), oldNode: testutil.NewNode("node1"), newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), - expectDelete: false, + expectDelete: true, }, { description: "new pod2 created on tainted Node", @@ -787,7 +807,7 @@ func TestEventualConsistency(t *testing.T) { newPod: addToleration(testutil.NewPod("pod2", "node1"), 1, 100), oldNode: testutil.NewNode("node1"), newNode: addTaintsToNode(testutil.NewNode("node1"), "testTaint1", "taint1", []int{1}), - expectDelete: false, + expectDelete: true, }, } @@ -809,15 +829,7 @@ func TestEventualConsistency(t *testing.T) { // TODO(mborsz): Remove this sleep and other sleeps in this file. time.Sleep(timeForControllerToProgress) - podDeleted := false - for _, action := range fakeClientset.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podDeleted = true - } - } - if !podDeleted { - t.Errorf("%v: Unexpected test result. Expected delete, got: %v", item.description, podDeleted) - } + verifyPodActions(t, item.description, fakeClientset, false, item.expectDelete) fakeClientset.ClearActions() // And now the delayed update of 'pod2' comes to the TaintManager. We should delete it as well. @@ -826,15 +838,26 @@ func TestEventualConsistency(t *testing.T) { // wait a bit time.Sleep(timeForControllerToProgress) - podDeleted = false - for _, action := range fakeClientset.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - podDeleted = true - } + close(stopCh) + } +} + +func verifyPodActions(t *testing.T, description string, fakeClientset *fake.Clientset, expectPatch, expectDelete bool) { + t.Helper() + podPatched := false + podDeleted := false + for _, action := range fakeClientset.Actions() { + if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" { + podPatched = true } - if podDeleted != item.expectDelete { - t.Errorf("%v: Unexpected test result. Expected delete %v, got %v", item.description, item.expectDelete, podDeleted) + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + podDeleted = true } - close(stopCh) + } + if podPatched != expectPatch { + t.Errorf("[%v]Unexpected test result. Expected patch %v, got %v", description, expectPatch, podPatched) + } + if podDeleted != expectDelete { + t.Errorf("[%v]Unexpected test result. Expected delete %v, got %v", description, expectDelete, podDeleted) } } diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index a69740d5c0f6..447123cca579 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -37,8 +37,10 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" "k8s.io/klog/v2" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" nodeutil "k8s.io/kubernetes/pkg/util/node" + utilpod "k8s.io/kubernetes/pkg/util/pod" "k8s.io/kubernetes/pkg/util/taints" ) @@ -171,13 +173,13 @@ func (gcc *PodGCController) gcTerminating(ctx context.Context, pods []*v1.Pod) { var wait sync.WaitGroup for i := 0; i < deleteCount; i++ { wait.Add(1) - go func(namespace string, name string) { + go func(pod *v1.Pod) { defer wait.Done() - if err := gcc.deletePod(ctx, namespace, name); err != nil { + if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil { // ignore not founds utilruntime.HandleError(err) } - }(terminatingPods[i].Namespace, terminatingPods[i].Name) + }(terminatingPods[i]) } wait.Wait() } @@ -203,13 +205,13 @@ func (gcc *PodGCController) gcTerminated(ctx context.Context, pods []*v1.Pod) { var wait sync.WaitGroup for i := 0; i < deleteCount; i++ { wait.Add(1) - go func(namespace string, name string) { + go func(pod *v1.Pod) { defer wait.Done() - if err := gcc.deletePod(ctx, namespace, name); err != nil { + if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil { // ignore not founds defer utilruntime.HandleError(err) } - }(terminatedPods[i].Namespace, terminatedPods[i].Name) + }(terminatedPods[i]) } wait.Wait() } @@ -238,7 +240,13 @@ func (gcc *PodGCController) gcOrphaned(ctx context.Context, pods []*v1.Pod, node continue } klog.V(2).InfoS("Found orphaned Pod assigned to the Node, deleting.", "pod", klog.KObj(pod), "node", pod.Spec.NodeName) - if err := gcc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + condition := &v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "DeletionByPodGC", + Message: "PodGC: node no longer exists", + } + if err := gcc.markFailedAndDeletePodWithCondition(ctx, pod, condition); err != nil { utilruntime.HandleError(err) } else { klog.V(0).InfoS("Forced deletion of orphaned Pod succeeded", "pod", klog.KObj(pod)) @@ -287,7 +295,7 @@ func (gcc *PodGCController) gcUnscheduledTerminating(ctx context.Context, pods [ } klog.V(2).InfoS("Found unscheduled terminating Pod not assigned to any Node, deleting.", "pod", klog.KObj(pod)) - if err := gcc.deletePod(ctx, pod.Namespace, pod.Name); err != nil { + if err := gcc.markFailedAndDeletePod(ctx, pod); err != nil { utilruntime.HandleError(err) } else { klog.V(0).InfoS("Forced deletion of unscheduled terminating Pod succeeded", "pod", klog.KObj(pod)) @@ -308,7 +316,30 @@ func (o byCreationTimestamp) Less(i, j int) bool { return o[i].CreationTimestamp.Before(&o[j].CreationTimestamp) } -func (gcc *PodGCController) deletePod(ctx context.Context, namespace, name string) error { - klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(namespace, name)) - return gcc.kubeClient.CoreV1().Pods(namespace).Delete(ctx, name, *metav1.NewDeleteOptions(0)) +func (gcc *PodGCController) markFailedAndDeletePod(ctx context.Context, pod *v1.Pod) error { + return gcc.markFailedAndDeletePodWithCondition(ctx, pod, nil) +} + +func (gcc *PodGCController) markFailedAndDeletePodWithCondition(ctx context.Context, pod *v1.Pod, condition *v1.PodCondition) error { + klog.InfoS("PodGC is force deleting Pod", "pod", klog.KRef(pod.Namespace, pod.Name)) + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + newStatus := pod.Status.DeepCopy() + updated := false + if condition != nil { + updated = apipod.UpdatePodCondition(newStatus, condition) + } + // Mark the pod as failed - this is especially important in case the pod + // is orphaned, in which case the pod would remain in the Running phase + // forever as there is no kubelet running to change the phase. + if pod.Status.Phase != v1.PodSucceeded && pod.Status.Phase != v1.PodFailed { + newStatus.Phase = v1.PodFailed + updated = true + } + if updated { + if _, _, _, err := utilpod.PatchPodStatus(ctx, gcc.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, *newStatus); err != nil { + return err + } + } + } + return gcc.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, *metav1.NewDeleteOptions(0)) } diff --git a/pkg/controller/podgc/gc_controller_test.go b/pkg/controller/podgc/gc_controller_test.go index 75906cf5bbd5..923e2a1a7b96 100644 --- a/pkg/controller/podgc/gc_controller_test.go +++ b/pkg/controller/podgc/gc_controller_test.go @@ -21,6 +21,7 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -51,29 +52,6 @@ func NewFromClient(kubeClient clientset.Interface, terminatedPodThreshold int) ( return controller, podInformer, nodeInformer } -func compareStringSetToList(set sets.String, list []string) bool { - for _, item := range list { - if !set.Has(item) { - return false - } - } - if len(list) != len(set) { - return false - } - return true -} - -func getDeletedPodNames(client *fake.Clientset) []string { - deletedPodNames := make([]string, 0) - for _, action := range client.Actions() { - if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { - deleteAction := action.(clienttesting.DeleteAction) - deletedPodNames = append(deletedPodNames, deleteAction.GetName()) - } - } - return deletedPodNames -} - func TestGCTerminated(t *testing.T) { type nameToPhase struct { name string @@ -81,11 +59,25 @@ func TestGCTerminated(t *testing.T) { } testCases := []struct { - name string - pods []nameToPhase - threshold int - deletedPodNames sets.String + name string + pods []nameToPhase + threshold int + deletedPodNames sets.String + patchedPodNames sets.String + enablePodDisruptionConditions bool }{ + { + name: "delete pod a which is PodFailed and pod b which is PodSucceeded; PodDisruptionConditions enabled", + pods: []nameToPhase{ + {name: "a", phase: v1.PodFailed}, + {name: "b", phase: v1.PodSucceeded}, + {name: "c", phase: v1.PodFailed}, + }, + threshold: 1, + patchedPodNames: sets.NewString(), + deletedPodNames: sets.NewString("a", "b"), + enablePodDisruptionConditions: true, + }, { name: "threshold = 0, disables terminated pod deletion", pods: []nameToPhase{ @@ -136,29 +128,30 @@ func TestGCTerminated(t *testing.T) { }, } - for i, test := range testCases { + for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node")}}) - gcc, podInformer, _ := NewFromClient(client, test.threshold) - + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() creationTime := time.Unix(0, 0) + nodes := []*v1.Node{testutil.NewNode("node")} + + pods := make([]*v1.Pod, 0, len(test.pods)) for _, pod := range test.pods { creationTime = creationTime.Add(1 * time.Hour) - podInformer.Informer().GetStore().Add(&v1.Pod{ + pods = append(pods, &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}}, Status: v1.PodStatus{Phase: pod.phase}, Spec: v1.PodSpec{NodeName: "node"}, }) } + client := setupNewSimpleClient(nodes, pods) + gcc, podInformer, _ := NewFromClient(client, test.threshold) + for _, pod := range pods { + podInformer.Informer().GetStore().Add(pod) + } gcc.gc(context.TODO()) - deletedPodNames := getDeletedPodNames(client) - - if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { - t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", - i, test.deletedPodNames.List(), deletedPodNames) - } + verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames) }) } } @@ -185,17 +178,19 @@ func waitForAdded(q workqueue.DelayingInterface, depth int) error { func TestGCOrphaned(t *testing.T) { testCases := []struct { - name string - initialClientNodes []*v1.Node - initialInformerNodes []*v1.Node - delay time.Duration - addedClientNodes []*v1.Node - deletedClientNodes []*v1.Node - addedInformerNodes []*v1.Node - deletedInformerNodes []*v1.Node - pods []*v1.Pod - itemsInQueue int - deletedPodNames sets.String + name string + initialClientNodes []*v1.Node + initialInformerNodes []*v1.Node + delay time.Duration + addedClientNodes []*v1.Node + deletedClientNodes []*v1.Node + addedInformerNodes []*v1.Node + deletedInformerNodes []*v1.Node + pods []*v1.Pod + itemsInQueue int + deletedPodNames sets.String + patchedPodNames sets.String + enablePodDisruptionConditions bool }{ { name: "nodes present in lister", @@ -237,6 +232,18 @@ func TestGCOrphaned(t *testing.T) { itemsInQueue: 1, deletedPodNames: sets.NewString("a", "b"), }, + { + name: "no nodes with PodDisruptionConditions enabled", + delay: 2 * quarantineTime, + pods: []*v1.Pod{ + makePod("a", "deleted", v1.PodFailed), + makePod("b", "deleted", v1.PodSucceeded), + }, + itemsInQueue: 1, + deletedPodNames: sets.NewString("a", "b"), + patchedPodNames: sets.NewString("a", "b"), + enablePodDisruptionConditions: true, + }, { name: "quarantine not finished", delay: quarantineTime / 2, @@ -317,11 +324,16 @@ func TestGCOrphaned(t *testing.T) { for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - nodeList := &v1.NodeList{} + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() + nodes := make([]*v1.Node, 0, len(test.initialClientNodes)) for _, node := range test.initialClientNodes { - nodeList.Items = append(nodeList.Items, *node) + nodes = append(nodes, node) } - client := fake.NewSimpleClientset(nodeList) + pods := make([]*v1.Pod, 0, len(test.pods)) + for _, pod := range test.pods { + pods = append(pods, pod) + } + client := setupNewSimpleClient(nodes, pods) gcc, podInformer, nodeInformer := NewFromClient(client, -1) for _, node := range test.initialInformerNodes { nodeInformer.Informer().GetStore().Add(node) @@ -369,12 +381,7 @@ func TestGCOrphaned(t *testing.T) { // Actual pod deletion gcc.gc(context.TODO()) - deletedPodNames = getDeletedPodNames(client) - - if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { - t.Errorf("pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", - test.deletedPodNames.List(), deletedPodNames) - } + verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames) }) } } @@ -388,10 +395,23 @@ func TestGCUnscheduledTerminating(t *testing.T) { } testCases := []struct { - name string - pods []nameToPhase - deletedPodNames sets.String + name string + pods []nameToPhase + deletedPodNames sets.String + patchedPodNames sets.String + enablePodDisruptionConditions bool }{ + { + name: "Unscheduled pod in any phase must be deleted, the phase of the running pod is changed to Failed; PodDisruptionConditions enabled", + pods: []nameToPhase{ + {name: "a", phase: v1.PodFailed, deletionTimeStamp: &metav1.Time{}, nodeName: ""}, + {name: "b", phase: v1.PodSucceeded, deletionTimeStamp: &metav1.Time{}, nodeName: ""}, + {name: "c", phase: v1.PodRunning, deletionTimeStamp: &metav1.Time{}, nodeName: ""}, + }, + deletedPodNames: sets.NewString("a", "b", "c"), + patchedPodNames: sets.NewString("c"), + enablePodDisruptionConditions: true, + }, { name: "Unscheduled pod in any phase must be deleted", pods: []nameToPhase{ @@ -412,21 +432,28 @@ func TestGCUnscheduledTerminating(t *testing.T) { }, } - for i, test := range testCases { + for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset() - gcc, podInformer, _ := NewFromClient(client, -1) - + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() creationTime := time.Unix(0, 0) + + pods := make([]*v1.Pod, 0, len(test.pods)) for _, pod := range test.pods { creationTime = creationTime.Add(1 * time.Hour) - podInformer.Informer().GetStore().Add(&v1.Pod{ + pods = append(pods, &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}, DeletionTimestamp: pod.deletionTimeStamp}, Status: v1.PodStatus{Phase: pod.phase}, Spec: v1.PodSpec{NodeName: pod.nodeName}, }) } + nodes := []*v1.Node{} + client := setupNewSimpleClient(nodes, pods) + gcc, podInformer, _ := NewFromClient(client, -1) + + for _, pod := range pods { + podInformer.Informer().GetStore().Add(pod) + } pods, err := podInformer.Lister().List(labels.Everything()) if err != nil { @@ -434,12 +461,7 @@ func TestGCUnscheduledTerminating(t *testing.T) { return } gcc.gcUnscheduledTerminating(context.TODO(), pods) - deletedPodNames := getDeletedPodNames(client) - - if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { - t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v, test: %v", - i, test.deletedPodNames.List(), deletedPodNames, test.name) - } + verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames) }) } } @@ -460,10 +482,12 @@ func TestGCTerminating(t *testing.T) { } testCases := []struct { - name string - pods []nameToPodConfig - nodes []node - deletedPodNames sets.String + name string + pods []nameToPodConfig + nodes []node + deletedPodNames sets.String + patchedPodNames sets.String + enablePodDisruptionConditions bool }{ { name: "pods have deletion timestamp set and the corresponding nodes are not ready", @@ -544,16 +568,31 @@ func TestGCTerminating(t *testing.T) { }, deletedPodNames: sets.NewString("b1", "b4", "b5", "b6"), }, + { + name: "pods deleted from node tained out-of-service; PodDisruptionConditions enabled", + nodes: []node{ + {name: "worker", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectNoExecute}}}, + }, + pods: []nameToPodConfig{ + {name: "a", phase: v1.PodRunning, deletionTimeStamp: &metav1.Time{}, nodeName: "worker"}, + {name: "b", phase: v1.PodFailed, deletionTimeStamp: &metav1.Time{}, nodeName: "worker"}, + {name: "c", phase: v1.PodSucceeded, deletionTimeStamp: &metav1.Time{}, nodeName: "worker"}, + }, + deletedPodNames: sets.NewString("a", "b", "c"), + patchedPodNames: sets.NewString("a"), + enablePodDisruptionConditions: true, + }, } - for i, test := range testCases { + for _, test := range testCases { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node-a")}}) - gcc, podInformer, nodeInformer := NewFromClient(client, -1) + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() creationTime := time.Unix(0, 0) + nodes := make([]*v1.Node, 0, len(test.nodes)) for _, node := range test.nodes { creationTime = creationTime.Add(2 * time.Hour) - nodeInformer.Informer().GetStore().Add(&v1.Node{ + nodes = append(nodes, &v1.Node{ ObjectMeta: metav1.ObjectMeta{Name: node.name, CreationTimestamp: metav1.Time{Time: creationTime}}, Spec: v1.NodeSpec{ Taints: node.taints, @@ -568,24 +607,74 @@ func TestGCTerminating(t *testing.T) { }, }) } - + pods := make([]*v1.Pod, 0, len(test.pods)) for _, pod := range test.pods { creationTime = creationTime.Add(1 * time.Hour) - podInformer.Informer().GetStore().Add(&v1.Pod{ + pods = append(pods, &v1.Pod{ ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}, DeletionTimestamp: pod.deletionTimeStamp}, Status: v1.PodStatus{Phase: pod.phase}, Spec: v1.PodSpec{NodeName: pod.nodeName}, }) } + client := setupNewSimpleClient(nodes, pods) + gcc, podInformer, nodeInformer := NewFromClient(client, -1) - gcc.gc(context.TODO()) - deletedPodNames := getDeletedPodNames(client) - - if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { - t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", - i, test.deletedPodNames.List(), deletedPodNames) + for _, pod := range pods { + podInformer.Informer().GetStore().Add(pod) } + for _, node := range nodes { + nodeInformer.Informer().GetStore().Add(node) + } + + gcc.gc(context.TODO()) + verifyDeletedAndPatchedPods(t, client, test.deletedPodNames, test.patchedPodNames) }) } } + +func verifyDeletedAndPatchedPods(t *testing.T, client *fake.Clientset, wantDeletedPodNames, wantPatchedPodNames sets.String) { + t.Helper() + deletedPodNames := getDeletedPodNames(client) + if diff := cmp.Diff(wantDeletedPodNames, deletedPodNames); diff != "" { + t.Errorf("Deleted pod names (-want,+got):\n%s", diff) + } + patchedPodNames := getPatchedPodNames(client) + if diff := cmp.Diff(wantPatchedPodNames, patchedPodNames); diff != "" { + t.Errorf("Patched pod names (-want,+got):\n%s", diff) + } +} + +func setupNewSimpleClient(nodes []*v1.Node, pods []*v1.Pod) *fake.Clientset { + podList := &v1.PodList{} + for _, podItem := range pods { + podList.Items = append(podList.Items, *podItem) + } + nodeList := &v1.NodeList{} + for _, nodeItem := range nodes { + nodeList.Items = append(nodeList.Items, *nodeItem) + } + return fake.NewSimpleClientset(nodeList, podList) +} + +func getDeletedPodNames(client *fake.Clientset) sets.String { + deletedPodNames := sets.NewString() + for _, action := range client.Actions() { + if action.GetVerb() == "delete" && action.GetResource().Resource == "pods" { + deleteAction := action.(clienttesting.DeleteAction) + deletedPodNames.Insert(deleteAction.GetName()) + } + } + return deletedPodNames +} + +func getPatchedPodNames(client *fake.Clientset) sets.String { + patchedPodNames := sets.NewString() + for _, action := range client.Actions() { + if action.GetVerb() == "patch" && action.GetResource().Resource == "pods" { + patchAction := action.(clienttesting.PatchAction) + patchedPodNames.Insert(patchAction.GetName()) + } + } + return patchedPodNames +} diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 351e9fa5afef..5680afd02f84 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -629,6 +629,14 @@ const ( // Enables controlling pod ranking on replicaset scale-down. PodDeletionCost featuregate.Feature = "PodDeletionCost" + // owner: @mimowo + // kep: http://kep.k8s.io/3329 + // alpha: v1.25 + // + // Enables support for appending a dedicated pod condition indicating that + // the pod is being deleted due to a disruption. + PodDisruptionConditions featuregate.Feature = "PodDisruptionConditions" + // owner: @ddebroy // alpha: v1.25 // @@ -1005,6 +1013,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS PodDeletionCost: {Default: true, PreRelease: featuregate.Beta}, + PodDisruptionConditions: {Default: false, PreRelease: featuregate.Alpha}, + PodHasNetworkCondition: {Default: false, PreRelease: featuregate.Alpha}, PodOverhead: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.26 diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 9275a1c79302..c767e7dc41bf 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -684,7 +684,7 @@ func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { mergedStatus := mergePodStatus(pod.Status, status.status, m.podDeletionSafety.PodCouldHaveRunningContainers(pod)) - newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) + newPod, patchBytes, unchanged, err := statusutil.PatchPodStatus(context.TODO(), m.kubeClient, pod.Namespace, pod.Name, pod.UID, pod.Status, mergedStatus) klog.V(3).InfoS("Patch status for pod", "pod", klog.KObj(pod), "patch", string(patchBytes)) if err != nil { diff --git a/pkg/registry/core/pod/storage/eviction.go b/pkg/registry/core/pod/storage/eviction.go index 0fb7ac03eeba..844c27e475c4 100644 --- a/pkg/registry/core/pod/storage/eviction.go +++ b/pkg/registry/core/pod/storage/eviction.go @@ -32,12 +32,14 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/util/dryrun" + "k8s.io/apiserver/pkg/util/feature" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" "k8s.io/client-go/util/retry" pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget" podutil "k8s.io/kubernetes/pkg/api/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/policy" + "k8s.io/kubernetes/pkg/features" ) const ( @@ -153,11 +155,10 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje } err = retry.OnError(EvictionsRetry, shouldRetry, func() error { - obj, err = r.store.Get(ctx, eviction.Name, &metav1.GetOptions{}) + pod, err = getPod(r, ctx, eviction.Name) if err != nil { return err } - pod = obj.(*api.Pod) // Evicting a terminal pod should result in direct deletion of pod as it already caused disruption by the time we are evicting. // There is no need to check for pdb. @@ -178,7 +179,7 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje deleteOptions = deleteOptions.DeepCopy() setPreconditionsResourceVersion(deleteOptions, &pod.ResourceVersion) } - _, _, err = r.store.Delete(ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions) + err = addConditionAndDeletePod(r, ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions) if err != nil { return err } @@ -276,7 +277,7 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje } // Try the delete - _, _, err = r.store.Delete(ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions) + err = addConditionAndDeletePod(r, ctx, eviction.Name, rest.ValidateAllObjectFunc, deleteOptions) if err != nil { if errors.IsConflict(err) && updateDeletionOptions && (originalDeleteOptions.Preconditions == nil || originalDeleteOptions.Preconditions.ResourceVersion == nil) { @@ -292,6 +293,41 @@ func (r *EvictionREST) Create(ctx context.Context, name string, obj runtime.Obje return &metav1.Status{Status: metav1.StatusSuccess}, nil } +func addConditionAndDeletePod(r *EvictionREST, ctx context.Context, name string, validation rest.ValidateObjectFunc, options *metav1.DeleteOptions) error { + if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + pod, err := getPod(r, ctx, name) + if err != nil { + return err + } + conditionAppender := func(_ context.Context, newObj, _ runtime.Object) (runtime.Object, error) { + podObj := newObj.(*api.Pod) + podutil.UpdatePodCondition(&podObj.Status, &api.PodCondition{ + Type: api.AlphaNoCompatGuaranteeDisruptionTarget, + Status: api.ConditionTrue, + Reason: "EvictionByEvictionAPI", + Message: "Eviction API: evicting", + }) + return podObj, nil + } + + podCopyUpdated := rest.DefaultUpdatedObjectInfo(pod, conditionAppender) + + if _, _, err = r.store.Update(ctx, name, podCopyUpdated, rest.ValidateAllObjectFunc, rest.ValidateAllObjectUpdateFunc, false, &metav1.UpdateOptions{}); err != nil { + return err + } + } + _, _, err := r.store.Delete(ctx, name, rest.ValidateAllObjectFunc, options) + return err +} + +func getPod(r *EvictionREST, ctx context.Context, name string) (*api.Pod, error) { + obj, err := r.store.Get(ctx, name, &metav1.GetOptions{}) + if err != nil { + return nil, err + } + return obj.(*api.Pod), nil +} + func setPreconditionsResourceVersion(deleteOptions *metav1.DeleteOptions, resourceVersion *string) { if deleteOptions.Preconditions == nil { deleteOptions.Preconditions = &metav1.Preconditions{} diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 27af1f4d9ed3..db21b59da4da 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -105,7 +105,7 @@ func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGet Pod: &REST{store, proxyTransport}, Binding: &BindingREST{store: store}, LegacyBinding: &LegacyBindingREST{bindingREST}, - Eviction: newEvictionStorage(store, podDisruptionBudgetClient), + Eviction: newEvictionStorage(&statusStore, podDisruptionBudgetClient), Status: &StatusREST{store: &statusStore}, EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore}, Log: &podrest.LogREST{Store: store, KubeletConn: k}, diff --git a/pkg/scheduler/framework/preemption/preemption.go b/pkg/scheduler/framework/preemption/preemption.go index 30bd8ac1aba0..c447297a846e 100644 --- a/pkg/scheduler/framework/preemption/preemption.go +++ b/pkg/scheduler/framework/preemption/preemption.go @@ -28,11 +28,14 @@ import ( policy "k8s.io/api/policy/v1" "k8s.io/apimachinery/pkg/labels" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apiserver/pkg/util/feature" corelisters "k8s.io/client-go/listers/core/v1" policylisters "k8s.io/client-go/listers/policy/v1" corev1helpers "k8s.io/component-helpers/scheduling/corev1" "k8s.io/klog/v2" extenderv1 "k8s.io/kube-scheduler/extender/v1" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler/framework" "k8s.io/kubernetes/pkg/scheduler/metrics" "k8s.io/kubernetes/pkg/scheduler/util" @@ -340,9 +343,26 @@ func (ev *Evaluator) prepareCandidate(ctx context.Context, c Candidate, pod *v1. // Otherwise we should delete the victim. if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject(pluginName, "preempted") - } else if err := util.DeletePod(ctx, cs, victim); err != nil { - klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) - return framework.AsStatus(err) + } else { + if feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + condition := &v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + Reason: "PreemptionByKubeScheduler", + Message: "Kube-scheduler: preempting", + } + newStatus := pod.Status.DeepCopy() + if apipod.UpdatePodCondition(newStatus, condition) { + if err := util.PatchPodStatus(ctx, cs, victim, newStatus); err != nil { + klog.ErrorS(err, "Preparing pod preemption", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) + return framework.AsStatus(err) + } + } + } + if err := util.DeletePod(ctx, cs, victim); err != nil { + klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod)) + return framework.AsStatus(err) + } } fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", pod.Namespace, pod.Name, c.Name()) diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index a40f39b3b53f..d972e93d6eef 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -30,7 +30,7 @@ import ( ) // PatchPodStatus patches pod status. It returns true and avoids an update if the patch contains no changes. -func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) { +func PatchPodStatus(ctx context.Context, c clientset.Interface, namespace, name string, uid types.UID, oldPodStatus, newPodStatus v1.PodStatus) (*v1.Pod, []byte, bool, error) { patchBytes, unchanged, err := preparePatchBytesForPodStatus(namespace, name, uid, oldPodStatus, newPodStatus) if err != nil { return nil, nil, false, err @@ -39,7 +39,7 @@ func PatchPodStatus(c clientset.Interface, namespace, name string, uid types.UID return nil, patchBytes, true, nil } - updatedPod, err := c.CoreV1().Pods(namespace).Patch(context.TODO(), name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + updatedPod, err := c.CoreV1().Pods(namespace).Patch(ctx, name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") if err != nil { return nil, nil, false, fmt.Errorf("failed to patch status %q for pod %q/%q: %v", patchBytes, namespace, name, err) } diff --git a/pkg/util/pod/pod_test.go b/pkg/util/pod/pod_test.go index c3f928c77e49..40792d2fe608 100644 --- a/pkg/util/pod/pod_test.go +++ b/pkg/util/pod/pod_test.go @@ -88,7 +88,7 @@ func TestPatchPodStatus(t *testing.T) { } for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - _, patchBytes, unchanged, err := PatchPodStatus(client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) + _, patchBytes, unchanged, err := PatchPodStatus(context.TODO(), client, ns, name, uid, getPodStatus(), tc.mutate(getPodStatus())) if err != nil { t.Errorf("unexpected error: %v", err) } diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go index ed03ac7d5d7f..f55563350444 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/controller_policy.go @@ -241,17 +241,23 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) rbacv1helpers.NewRule("get", "list", "delete", "deletecollection").Groups("*").Resources("*").RuleOrDie(), }, }) - addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "node-controller"}, - Rules: []rbacv1.PolicyRule{ - rbacv1helpers.NewRule("get", "list", "update", "delete", "patch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), - rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), - // used for pod eviction - rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), - rbacv1helpers.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), - eventsRule(), - }, - }) + addControllerRole(&controllerRoles, &controllerRoleBindings, func() rbacv1.ClusterRole { + role := rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "node-controller"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("get", "list", "update", "delete", "patch").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + rbacv1helpers.NewRule("patch", "update").Groups(legacyGroup).Resources("nodes/status").RuleOrDie(), + // used for pod deletion + rbacv1helpers.NewRule("update").Groups(legacyGroup).Resources("pods/status").RuleOrDie(), + rbacv1helpers.NewRule("list", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), + eventsRule(), + }, + } + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie()) + } + return role + }()) addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "persistent-volume-binder"}, Rules: []rbacv1.PolicyRule{ @@ -275,13 +281,19 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding) eventsRule(), }, }) - addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ - ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "pod-garbage-collector"}, - Rules: []rbacv1.PolicyRule{ - rbacv1helpers.NewRule("list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), - rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(), - }, - }) + addControllerRole(&controllerRoles, &controllerRoleBindings, func() rbacv1.ClusterRole { + role := rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "pod-garbage-collector"}, + Rules: []rbacv1.PolicyRule{ + rbacv1helpers.NewRule("list", "watch", "delete").Groups(legacyGroup).Resources("pods").RuleOrDie(), + rbacv1helpers.NewRule("get", "list").Groups(legacyGroup).Resources("nodes").RuleOrDie(), + }, + } + if utilfeature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions) { + role.Rules = append(role.Rules, rbacv1helpers.NewRule("patch").Groups(legacyGroup).Resources("pods/status").RuleOrDie()) + } + return role + }()) addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "replicaset-controller"}, Rules: []rbacv1.PolicyRule{ diff --git a/staging/src/k8s.io/api/core/v1/types.go b/staging/src/k8s.io/api/core/v1/types.go index 77e9818c9ce3..f643bd80954a 100644 --- a/staging/src/k8s.io/api/core/v1/types.go +++ b/staging/src/k8s.io/api/core/v1/types.go @@ -2653,6 +2653,10 @@ const ( PodReady PodConditionType = "Ready" // PodScheduled represents status of the scheduling process for this pod. PodScheduled PodConditionType = "PodScheduled" + // AlphaNoCompatGuaranteeDisruptionTarget indicates the pod is about to be deleted due to a + // disruption (such as preemption, eviction API or garbage-collection). + // The constant is to be renamed once the name is accepted within the KEP-3329. + AlphaNoCompatGuaranteeDisruptionTarget PodConditionType = "DisruptionTarget" ) // These are reasons for a pod's transition to a condition. diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index d38359d99a16..5392d461ead3 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -39,6 +39,7 @@ import ( utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/dynamic" "k8s.io/client-go/informers" @@ -47,8 +48,12 @@ import ( "k8s.io/client-go/restmapper" "k8s.io/client-go/scale" "k8s.io/client-go/tools/cache" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/disruption" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" ) @@ -85,7 +90,7 @@ func TestConcurrentEvictionRequests(t *testing.T) { if _, err := clientSet.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil { t.Errorf("Failed to create pod: %v", err) } - + pod.Status.Phase = v1.PodRunning addPodConditionReady(pod) if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil { t.Fatal(err) @@ -194,7 +199,8 @@ func TestTerminalPodEviction(t *testing.T) { t.Errorf("Failed to create pod: %v", err) } - addPodConditionSucceeded(pod) + pod.Status.Phase = v1.PodSucceeded + addPodConditionReady(pod) if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil { t.Fatal(err) } @@ -334,6 +340,85 @@ func TestEvictionVersions(t *testing.T) { } } +// TestEvictionWithFinalizers tests eviction with the use of finalizers +func TestEvictionWithFinalizers(t *testing.T) { + cases := map[string]struct { + enablePodDisruptionConditions bool + phase v1.PodPhase + }{ + "terminal pod with PodDisruptionConditions enabled": { + enablePodDisruptionConditions: true, + phase: v1.PodSucceeded, + }, + "terminal pod with PodDisruptionConditions disabled": { + enablePodDisruptionConditions: false, + phase: v1.PodSucceeded, + }, + "running pod with PodDisruptionConditions enabled": { + enablePodDisruptionConditions: true, + phase: v1.PodRunning, + }, + "running pod with PodDisruptionConditions disabled": { + enablePodDisruptionConditions: false, + phase: v1.PodRunning, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + closeFn, rm, informers, _, clientSet := rmSetup(t) + defer closeFn() + + ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-finalizers", t) + defer framework.DeleteNamespaceOrDie(clientSet, ns, t) + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) + go rm.Run(ctx) + + pod := newPod("pod") + pod.ObjectMeta.Finalizers = []string{"test.k8s.io/finalizer"} + if _, err := clientSet.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create pod: %v", err) + } + + pod.Status.Phase = tc.phase + addPodConditionReady(pod) + if _, err := clientSet.CoreV1().Pods(ns.Name).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil { + t.Fatal(err) + } + + waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, tc.phase) + deleteOption := metav1.DeleteOptions{} + + eviction := newV1Eviction(ns.Name, pod.Name, deleteOption) + + err := wait.PollImmediate(5*time.Second, 60*time.Second, func() (bool, error) { + e := clientSet.PolicyV1().Evictions(ns.Name).Evict(ctx, eviction) + if e != nil { + return false, e + } + return true, nil + }) + if err != nil { + t.Fatalf("Eviction of pod failed %v", err) + } + + updatedPod, e := clientSet.CoreV1().Pods(ns.Name).Get(ctx, pod.Name, metav1.GetOptions{}) + if e != nil { + t.Fatalf("Failed to get the pod %q with error: %q", klog.KObj(pod), e) + } + _, cond := podutil.GetPodCondition(&updatedPod.Status, v1.PodConditionType(v1.AlphaNoCompatGuaranteeDisruptionTarget)) + if tc.enablePodDisruptionConditions == true && cond == nil { + t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(updatedPod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } else if tc.enablePodDisruptionConditions == false && cond != nil { + t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(updatedPod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } + }) + } +} + func newPod(podName string) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -351,28 +436,11 @@ func newPod(podName string) *v1.Pod { } } -func addPodConditionSucceeded(pod *v1.Pod) { - pod.Status = v1.PodStatus{ - Phase: v1.PodSucceeded, - Conditions: []v1.PodCondition{ - { - Type: v1.PodReady, - Status: v1.ConditionTrue, - }, - }, - } -} - func addPodConditionReady(pod *v1.Pod) { - pod.Status = v1.PodStatus{ - Phase: v1.PodRunning, - Conditions: []v1.PodCondition{ - { - Type: v1.PodReady, - Status: v1.ConditionTrue, - }, - }, - } + pod.Status.Conditions = append(pod.Status.Conditions, v1.PodCondition{ + Type: v1.PodReady, + Status: v1.ConditionTrue, + }) } func newPDB() *policyv1.PodDisruptionBudget { diff --git a/test/integration/node/lifecycle_test.go b/test/integration/node/lifecycle_test.go index e3e8ff8cbbf6..c274d691b7f5 100644 --- a/test/integration/node/lifecycle_test.go +++ b/test/integration/node/lifecycle_test.go @@ -26,12 +26,17 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/admission" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/nodelifecycle" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds" "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction" pluginapi "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction/apis/podtolerationrestriction" @@ -44,6 +49,140 @@ const poll = 2 * time.Second type podCondition func(pod *v1.Pod) (bool, error) +// TestEvictionForNoExecuteTaintAddedByUser tests taint-based eviction for a node tainted NoExecute +func TestEvictionForNoExecuteTaintAddedByUser(t *testing.T) { + tests := map[string]struct { + enablePodDisruptionConditions bool + }{ + "Test eviciton for NoExecute taint added by user; pod condition added when PodDisruptionConditions enabled": { + enablePodDisruptionConditions: true, + }, + "Test eviciton for NoExecute taint added by user; no pod condition added when PodDisruptionConditions disabled": { + enablePodDisruptionConditions: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + nodeIndex := 1 + nodeCount := 3 + var nodes []*v1.Node + for i := 0; i < nodeCount; i++ { + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("testnode-%d", i), + Labels: map[string]string{"node.kubernetes.io/exclude-disruption": "true"}, + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + } + nodes = append(nodes, node) + } + testPod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + }, + Spec: v1.PodSpec{ + NodeName: nodes[nodeIndex].Name, + Containers: []v1.Container{ + {Name: "container", Image: imageutils.GetPauseImageName()}, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + }, + }, + } + + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() + testCtx := testutils.InitTestAPIServer(t, "taint-no-execute", nil) + + // Build clientset and informers for controllers. + defer testutils.CleanupTest(t, testCtx) + cs := testCtx.ClientSet + + // Build clientset and informers for controllers. + externalClientConfig := restclient.CopyConfig(testCtx.KubeConfig) + externalClientConfig.QPS = -1 + externalClientset := clientset.NewForConfigOrDie(externalClientConfig) + externalInformers := informers.NewSharedInformerFactory(externalClientset, time.Second) + + // Start NodeLifecycleController for taint. + nc, err := nodelifecycle.NewNodeLifecycleController( + testCtx.Ctx, + externalInformers.Coordination().V1().Leases(), + externalInformers.Core().V1().Pods(), + externalInformers.Core().V1().Nodes(), + externalInformers.Apps().V1().DaemonSets(), + cs, + 1*time.Second, // Node monitor grace period + time.Minute, // Node startup grace period + time.Millisecond, // Node monitor period + 1, // Pod eviction timeout + 100, // Eviction limiter QPS + 100, // Secondary eviction limiter QPS + 50, // Large cluster threshold + 0.55, // Unhealthy zone threshold + true, // Run taint manager + ) + if err != nil { + t.Errorf("Failed to create node controller: %v", err) + return + } + + // Waiting for all controllers to sync + externalInformers.Start(testCtx.Ctx.Done()) + externalInformers.WaitForCacheSync(testCtx.Ctx.Done()) + + // Run all controllers + go nc.Run(testCtx.Ctx) + + for index := range nodes { + nodes[index], err = cs.CoreV1().Nodes().Create(testCtx.Ctx, nodes[index], metav1.CreateOptions{}) + if err != nil { + t.Errorf("Failed to create node, err: %v", err) + } + } + + testPod, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, testPod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Test Failed: error: %v, while creating pod", err) + } + + if err := testutils.AddTaintToNode(cs, nodes[nodeIndex].Name, v1.Taint{Key: "CustomTaintByUser", Effect: v1.TaintEffectNoExecute}); err != nil { + t.Errorf("Failed to taint node in test %s <%s>, err: %v", name, nodes[nodeIndex].Name, err) + } + + err = wait.PollImmediate(time.Second, time.Second*20, testutils.PodIsGettingEvicted(cs, testPod.Namespace, testPod.Name)) + if err != nil { + t.Fatalf("Error %q in test %q when waiting for terminating pod: %q", err, name, klog.KObj(testPod)) + } + testPod, err = cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, testPod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Test Failed: error: %q, while getting updated pod", err) + } + _, cond := podutil.GetPodCondition(&testPod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) + if test.enablePodDisruptionConditions == true && cond == nil { + t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(testPod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } else if test.enablePodDisruptionConditions == false && cond != nil { + t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(testPod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } + }) + } +} + // TestTaintBasedEvictions tests related cases for the TaintBasedEvictions feature func TestTaintBasedEvictions(t *testing.T) { // we need at least 2 nodes to prevent lifecycle manager from entering "fully-disrupted" mode diff --git a/test/integration/podgc/podgc_test.go b/test/integration/podgc/podgc_test.go index 5c2ee0c65297..da9858ac5aa5 100644 --- a/test/integration/podgc/podgc_test.go +++ b/test/integration/podgc/podgc_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/informers" featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller/podgc" "k8s.io/kubernetes/pkg/features" testutils "k8s.io/kubernetes/test/integration/util" @@ -36,144 +37,224 @@ import ( // TestPodGcOrphanedPodsWithFinalizer tests deletion of orphaned pods func TestPodGcOrphanedPodsWithFinalizer(t *testing.T) { - testCtx := setup(t, "podgc-orphaned") - defer testutils.CleanupTest(t, testCtx) - cs := testCtx.ClientSet - - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node", + tests := map[string]struct { + enablePodDisruptionConditions bool + wantPhase v1.PodPhase + }{ + "PodDisruptionConditions enabled": { + enablePodDisruptionConditions: true, + wantPhase: v1.PodFailed, }, - Spec: v1.NodeSpec{}, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionTrue, - }, - }, + "PodDisruptionConditions disabled": { + enablePodDisruptionConditions: false, + wantPhase: v1.PodPending, }, } - node, err := cs.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Failed to create node '%v', err: %v", node.Name, err) - } - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testpod", - Namespace: testCtx.NS.Name, - Finalizers: []string{"test.k8s.io/finalizer"}, - }, - Spec: v1.PodSpec{ - NodeName: node.Name, - Containers: []v1.Container{ - {Name: "foo", Image: "bar"}, - }, - }, - } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() + testCtx := setup(t, "podgc-orphaned") + defer testutils.CleanupTest(t, testCtx) + cs := testCtx.ClientSet - pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) - } - defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod}) - pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Error: '%v' while updating pod info: '%v'", err, klog.KObj(pod)) - } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionTrue, + }, + }, + }, + } + node, err := cs.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create node '%v', err: %v", node.Name, err) + } - // we delete the node to orphan the pod - err = cs.CoreV1().Nodes().Delete(testCtx.Ctx, pod.Spec.NodeName, metav1.DeleteOptions{}) - if err != nil { - t.Fatalf("Failed to delete node: %v, err: %v", pod.Spec.NodeName, err) - } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: testCtx.NS.Name, + Finalizers: []string{"test.k8s.io/finalizer"}, + }, + Spec: v1.PodSpec{ + NodeName: node.Name, + Containers: []v1.Container{ + {Name: "foo", Image: "bar"}, + }, + }, + } + + pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) + } + defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod}) - err = wait.PollImmediate(time.Second, time.Second*15, func() (bool, error) { - updatedPod, err := cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) - if err != nil { - return true, err - } - if updatedPod.ObjectMeta.DeletionTimestamp != nil { - return true, nil - } - return false, nil - }) - if err != nil { - t.Fatalf("Error '%v' while waiting for the pod '%v' to be deleted", err, klog.KObj(pod)) + // we delete the node to orphan the pod + err = cs.CoreV1().Nodes().Delete(testCtx.Ctx, pod.Spec.NodeName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete node: %v, err: %v", pod.Spec.NodeName, err) + } + err = wait.PollImmediate(time.Second, time.Second*15, testutils.PodIsGettingEvicted(cs, pod.Namespace, pod.Name)) + if err != nil { + t.Fatalf("Error '%v' while waiting for the pod '%v' to be terminating", err, klog.KObj(pod)) + } + pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Error: '%v' while updating pod info: '%v'", err, klog.KObj(pod)) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) + if test.enablePodDisruptionConditions == true && cond == nil { + t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(pod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } else if test.enablePodDisruptionConditions == false && cond != nil { + t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(pod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } + if pod.Status.Phase != test.wantPhase { + t.Errorf("Unexpected phase for pod %q. Got: %q, want: %q", klog.KObj(pod), pod.Status.Phase, test.wantPhase) + } + }) } } // TestTerminatingOnOutOfServiceNode tests deletion pods terminating on out-of-service nodes func TestTerminatingOnOutOfServiceNode(t *testing.T) { - defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)() - testCtx := setup(t, "podgc-out-of-service") - defer testutils.CleanupTest(t, testCtx) - cs := testCtx.ClientSet - - node := &v1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "node", + tests := map[string]struct { + enablePodDisruptionConditions bool + withFinalizer bool + wantPhase v1.PodPhase + }{ + "pod has phase changed to Failed when PodDisruptionConditions enabled": { + enablePodDisruptionConditions: true, + withFinalizer: true, + wantPhase: v1.PodFailed, }, - Spec: v1.NodeSpec{}, - Status: v1.NodeStatus{ - Conditions: []v1.NodeCondition{ - { - Type: v1.NodeReady, - Status: v1.ConditionFalse, - }, - }, + "pod has phase when PodDisruptionConditions disabled": { + enablePodDisruptionConditions: true, + withFinalizer: true, + wantPhase: v1.PodPending, }, - } - node, err := cs.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Failed to create node '%v', err: %v", node.Name, err) - } - - pod := &v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "testpod", - Namespace: testCtx.NS.Name, + "pod is getting deleted when no finalizer and PodDisruptionConditions enabled": { + enablePodDisruptionConditions: true, + withFinalizer: false, }, - Spec: v1.PodSpec{ - NodeName: node.Name, - Containers: []v1.Container{ - {Name: "foo", Image: "bar"}, - }, + "pod is getting deleted when no finalizer and PodDisruptionConditions disabled": { + enablePodDisruptionConditions: false, + withFinalizer: false, }, } - pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}) - if err != nil { - t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) - } + for name, test := range tests { + t.Run(name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)() + testCtx := setup(t, "podgc-out-of-service") + defer testutils.CleanupTest(t, testCtx) + cs := testCtx.ClientSet - // trigger termination of the pod, but with long grace period so that it is not removed immediately - err = cs.CoreV1().Pods(testCtx.NS.Name).Delete(testCtx.Ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: pointer.Int64(300)}) - if err != nil { - t.Fatalf("Error: '%v' while deleting pod: '%v'", err, klog.KObj(pod)) - } + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node", + }, + Spec: v1.NodeSpec{}, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: v1.ConditionFalse, + }, + }, + }, + } + node, err := cs.CoreV1().Nodes().Create(testCtx.Ctx, node, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed to create node '%v', err: %v", node.Name, err) + } - // taint the node with the out-of-service taint - err = testutils.AddTaintToNode(cs, pod.Spec.NodeName, v1.Taint{Key: v1.TaintNodeOutOfService, Value: "", Effect: v1.TaintEffectNoExecute}) - if err != nil { - t.Fatalf("Failed to taint node: %v, err: %v", pod.Spec.NodeName, err) - } + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "testpod", + Namespace: testCtx.NS.Name, + }, + Spec: v1.PodSpec{ + NodeName: node.Name, + Containers: []v1.Container{ + {Name: "foo", Image: "bar"}, + }, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + } + if test.withFinalizer { + pod.ObjectMeta.Finalizers = []string{"test.k8s.io/finalizer"} + } + + pod, err = cs.CoreV1().Pods(testCtx.NS.Name).Create(testCtx.Ctx, pod, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error %v, while creating pod: %v", err, klog.KObj(pod)) + } + if test.withFinalizer { + defer testutils.RemovePodFinalizers(testCtx.ClientSet, t, []*v1.Pod{pod}) + } - // wait until the pod is deleted - err = wait.PollImmediate(time.Second, time.Second*15, func() (bool, error) { - updatedPod, err := cs.CoreV1().Pods(pod.Namespace).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) - if err == nil { - return updatedPod == nil, nil - } - // there was an error - if apierrors.IsNotFound(err) { - return true, nil - } - return false, err - }) - if err != nil { - t.Fatalf("Error '%v' while waiting for the pod '%v' to be deleted", err, klog.KObj(pod)) + // trigger termination of the pod, but with long grace period so that it is not removed immediately + err = cs.CoreV1().Pods(testCtx.NS.Name).Delete(testCtx.Ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: pointer.Int64(300)}) + if err != nil { + t.Fatalf("Error: '%v' while deleting pod: '%v'", err, klog.KObj(pod)) + } + // wait until the pod is terminating + err = wait.PollImmediate(time.Second, time.Second*15, testutils.PodIsGettingEvicted(cs, pod.Namespace, pod.Name)) + if err != nil { + t.Fatalf("Error '%v' while waiting for the pod '%v' to be terminating", err, klog.KObj(pod)) + } + // taint the node with the out-of-service taint + err = testutils.AddTaintToNode(cs, pod.Spec.NodeName, v1.Taint{Key: v1.TaintNodeOutOfService, Value: "", Effect: v1.TaintEffectNoExecute}) + if err != nil { + t.Fatalf("Failed to taint node: %v, err: %v", pod.Spec.NodeName, err) + } + if test.withFinalizer { + // wait until the pod phase is set as expected + err = wait.PollImmediate(time.Second, time.Second*15, func() (bool, error) { + var e error + pod, e = cs.CoreV1().Pods(pod.Namespace).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) + if e != nil { + return true, e + } + return test.wantPhase == pod.Status.Phase, nil + }) + if err != nil { + t.Errorf("Error %q while waiting for the pod %q to be in expected phase", err, klog.KObj(pod)) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) + if cond != nil { + t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(pod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } + } else { + // wait until the pod is deleted + err = wait.PollImmediate(time.Second, time.Second*15, func() (bool, error) { + var e error + pod, e = cs.CoreV1().Pods(pod.Namespace).Get(testCtx.Ctx, pod.Name, metav1.GetOptions{}) + if e == nil { + return pod == nil, nil + } + // there was an error + if apierrors.IsNotFound(e) { + return true, nil + } + return false, e + }) + if err != nil { + t.Errorf("Error %q while waiting for the pod %q to be deleted", err, klog.KObj(pod)) + } + } + }) } } diff --git a/test/integration/scheduler/preemption/preemption_test.go b/test/integration/scheduler/preemption/preemption_test.go index f71498d40e51..fd87b15419a0 100644 --- a/test/integration/scheduler/preemption/preemption_test.go +++ b/test/integration/scheduler/preemption/preemption_test.go @@ -33,13 +33,16 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" configv1 "k8s.io/kube-scheduler/config/v1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/apis/scheduling" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/scheduler" configtesting "k8s.io/kubernetes/pkg/scheduler/apis/config/testing" "k8s.io/kubernetes/pkg/scheduler/framework" @@ -186,13 +189,40 @@ func TestPreemption(t *testing.T) { maxTokens := 1000 tests := []struct { - name string - existingPods []*v1.Pod - pod *v1.Pod - initTokens int - unresolvable bool - preemptedPodIndexes map[int]struct{} + name string + existingPods []*v1.Pod + pod *v1.Pod + initTokens int + unresolvable bool + preemptedPodIndexes map[int]struct{} + enablePodDisruptionConditions bool }{ + { + name: "basic pod preemption with PodDisruptionConditions enabled", + initTokens: maxTokens, + existingPods: []*v1.Pod{ + initPausePod(&testutils.PausePodConfig{ + Name: "victim-pod", + Namespace: testCtx.NS.Name, + Priority: &lowPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + }, + pod: initPausePod(&testutils.PausePodConfig{ + Name: "preemptor-pod", + Namespace: testCtx.NS.Name, + Priority: &highPriority, + Resources: &v1.ResourceRequirements{Requests: v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI)}, + }, + }), + preemptedPodIndexes: map[int]struct{}{0: {}}, + enablePodDisruptionConditions: true, + }, { name: "basic pod preemption", initTokens: maxTokens, @@ -412,6 +442,7 @@ func TestPreemption(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, test.enablePodDisruptionConditions)() filter.Tokens = test.initTokens filter.Unresolvable = test.unresolvable pods := make([]*v1.Pod, len(test.existingPods)) @@ -433,6 +464,16 @@ func TestPreemption(t *testing.T) { if err = wait.Poll(time.Second, wait.ForeverTestTimeout, podIsGettingEvicted(cs, p.Namespace, p.Name)); err != nil { t.Errorf("Pod %v/%v is not getting evicted.", p.Namespace, p.Name) } + pod, err := cs.CoreV1().Pods(p.Namespace).Get(testCtx.Ctx, p.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Error %v when getting the updated status for pod %v/%v ", err, p.Namespace, p.Name) + } + _, cond := podutil.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) + if test.enablePodDisruptionConditions == true && cond == nil { + t.Errorf("Pod %q does not have the expected condition: %q", klog.KObj(pod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } else if test.enablePodDisruptionConditions == false && cond != nil { + t.Errorf("Pod %q has an unexpected condition: %q", klog.KObj(pod), v1.AlphaNoCompatGuaranteeDisruptionTarget) + } } else { if p.DeletionTimestamp != nil { t.Errorf("Didn't expect pod %v to get preempted.", p.Name)