From a8ba015ec51185d802777e60b7c4088e8670d52a Mon Sep 17 00:00:00 2001 From: xing-yang Date: Fri, 12 Feb 2021 04:54:48 -0800 Subject: [PATCH] Handle Non-graceful Node Shutdown Signed-off-by: Ashutosh Kumar Co-authored-by: Ashutosh Kumar --- pkg/controller/podgc/gc_controller.go | 55 ++++- pkg/controller/podgc/gc_controller_test.go | 154 +++++++++++++ .../attachdetach/attach_detach_controller.go | 1 + .../attachdetach/reconciler/reconciler.go | 35 ++- .../reconciler/reconciler_test.go | 214 ++++++++++++++++-- pkg/features/kube_features.go | 10 +- pkg/util/node/node.go | 10 + pkg/util/taints/taints.go | 10 + .../k8s.io/api/core/v1/well_known_taints.go | 4 + 9 files changed, 475 insertions(+), 18 deletions(-) diff --git a/pkg/controller/podgc/gc_controller.go b/pkg/controller/podgc/gc_controller.go index 644074900051..bf64bd2208e7 100644 --- a/pkg/controller/podgc/gc_controller.go +++ b/pkg/controller/podgc/gc_controller.go @@ -29,14 +29,17 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/component-base/metrics/prometheus/ratelimiter" - "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/features" + nodeutil "k8s.io/kubernetes/pkg/util/node" + "k8s.io/kubernetes/pkg/util/taints" ) const ( @@ -113,6 +116,9 @@ func (gcc *PodGCController) gc(ctx context.Context) { if gcc.terminatedPodThreshold > 0 { gcc.gcTerminated(pods) } + if utilfeature.DefaultFeatureGate.Enabled(features.NodeOutOfServiceVolumeDetach) { + gcc.gcTerminating(pods) + } gcc.gcOrphaned(ctx, pods, nodes) gcc.gcUnscheduledTerminating(pods) } @@ -124,6 +130,53 @@ func isPodTerminated(pod *v1.Pod) bool { return false } +// isPodTerminating returns true if the pod is terminating. +func isPodTerminating(pod *v1.Pod) bool { + return pod.ObjectMeta.DeletionTimestamp != nil +} + +func (gcc *PodGCController) gcTerminating(pods []*v1.Pod) { + klog.V(4).Info("GC'ing terminating pods that are on out-of-service nodes") + terminatingPods := []*v1.Pod{} + for _, pod := range pods { + if isPodTerminating(pod) { + node, err := gcc.nodeLister.Get(string(pod.Spec.NodeName)) + if err != nil { + klog.Errorf("failed to get node %s : %s", string(pod.Spec.NodeName), err) + continue + } + // Add this pod to terminatingPods list only if the following conditions are met: + // 1. Node is not ready. + // 2. Node has `node.kubernetes.io/out-of-service` taint. + if !nodeutil.IsNodeReady(node) && taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService) { + klog.V(4).Infof("garbage collecting pod %s that is terminating. Phase [%v]", pod.Name, pod.Status.Phase) + terminatingPods = append(terminatingPods, pod) + } + } + } + + deleteCount := len(terminatingPods) + if deleteCount == 0 { + return + } + + klog.V(4).Infof("Garbage collecting %v pods that are terminating on node tainted with node.kubernetes.io/out-of-service", deleteCount) + // sort only when necessary + sort.Sort(byCreationTimestamp(terminatingPods)) + var wait sync.WaitGroup + for i := 0; i < deleteCount; i++ { + wait.Add(1) + go func(namespace string, name string) { + defer wait.Done() + if err := gcc.deletePod(namespace, name); err != nil { + // ignore not founds + utilruntime.HandleError(err) + } + }(terminatingPods[i].Namespace, terminatingPods[i].Name) + } + wait.Wait() +} + func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) { terminatedPods := []*v1.Pod{} for _, pod := range pods { diff --git a/pkg/controller/podgc/gc_controller_test.go b/pkg/controller/podgc/gc_controller_test.go index 9693d60f1744..4805f36293f1 100644 --- a/pkg/controller/podgc/gc_controller_test.go +++ b/pkg/controller/podgc/gc_controller_test.go @@ -27,13 +27,16 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/testutil" + "k8s.io/kubernetes/pkg/features" testingclock "k8s.io/utils/clock/testing" ) @@ -448,3 +451,154 @@ func TestGCUnscheduledTerminating(t *testing.T) { }) } } + +func TestGCTerminating(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)() + type node struct { + name string + readyCondition v1.ConditionStatus + taints []v1.Taint + } + + type nameToPodConfig struct { + name string + phase v1.PodPhase + deletionTimeStamp *metav1.Time + nodeName string + } + + testCases := []struct { + name string + pods []nameToPodConfig + nodes []node + deletedPodNames sets.String + }{ + { + name: "pods have deletion timestamp set and the corresponding nodes are not ready", + nodes: []node{ + {name: "worker-0", readyCondition: v1.ConditionFalse}, + {name: "worker-1", readyCondition: v1.ConditionFalse}, + }, + pods: []nameToPodConfig{ + {name: "a", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-0"}, + {name: "b", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-1"}, + }, + deletedPodNames: sets.NewString(), + }, + + { + name: "some pods have deletion timestamp and/or phase set and some of the corresponding nodes have an" + + "outofservice taint that are not ready", + nodes: []node{ + // terminated pods on this node should be force deleted + {name: "worker-0", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectNoExecute}}}, + // terminated pods on this node should not be force deleted + {name: "worker-1", readyCondition: v1.ConditionFalse}, + // terminated pods on this node should not be force deleted + {name: "worker-2", readyCondition: v1.ConditionTrue}, + // terminated pods on this node should be force deleted + {name: "worker-3", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectNoSchedule}}}, + // terminated pods on this node should be force deleted + {name: "worker-4", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, + Effect: v1.TaintEffectPreferNoSchedule}}}, + // terminated pods on this node should be force deleted + {name: "worker-5", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, + Value: "any-value", Effect: v1.TaintEffectNoExecute}}}, + }, + pods: []nameToPodConfig{ + // pods a1, b1, c1, d1 and e1 are on node worker-0 + {name: "a1", nodeName: "worker-0"}, + {name: "b1", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-0"}, + {name: "c1", phase: v1.PodPending, nodeName: "worker-0"}, + {name: "d1", phase: v1.PodRunning, nodeName: "worker-0"}, + {name: "e1", phase: v1.PodUnknown, nodeName: "worker-0"}, + + // pods a2, b2, c2, d2 and e2 are on node worker-1 + {name: "a2", nodeName: "worker-1"}, + {name: "b2", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-1"}, + {name: "c2", phase: v1.PodPending, nodeName: "worker-1"}, + {name: "d2", phase: v1.PodRunning, nodeName: "worker-1"}, + {name: "e2", phase: v1.PodUnknown, nodeName: "worker-1"}, + + // pods a3, b3, c3, d3 and e3 are on node worker-2 + {name: "a3", nodeName: "worker-2"}, + {name: "b3", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-2"}, + {name: "c3", phase: v1.PodPending, nodeName: "worker-2"}, + {name: "d3", phase: v1.PodRunning, nodeName: "worker-2"}, + {name: "e3", phase: v1.PodUnknown, nodeName: "worker-2"}, + + // pods a4, b4, c4, d4 and e4 are on node worker-3 + {name: "a4", nodeName: "worker-3"}, + {name: "b4", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-3"}, + {name: "c4", phase: v1.PodPending, nodeName: "worker-3"}, + {name: "d4", phase: v1.PodRunning, nodeName: "worker-3"}, + {name: "e4", phase: v1.PodUnknown, nodeName: "worker-3"}, + + // pods a5, b5, c5, d5 and e5 are on node worker-4 + {name: "a5", nodeName: "worker-3"}, + {name: "b5", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-4"}, + {name: "c5", phase: v1.PodPending, nodeName: "worker-4"}, + {name: "d5", phase: v1.PodRunning, nodeName: "worker-4"}, + {name: "e5", phase: v1.PodUnknown, nodeName: "worker-4"}, + + // pods a6, b6, c6, d6 and e6 are on node worker-5 + {name: "a6", nodeName: "worker-5"}, + {name: "b6", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-5"}, + {name: "c6", phase: v1.PodPending, nodeName: "worker-5"}, + {name: "d6", phase: v1.PodRunning, nodeName: "worker-5"}, + {name: "e6", phase: v1.PodUnknown, nodeName: "worker-5"}, + }, + deletedPodNames: sets.NewString("b1", "b4", "b5", "b6"), + }, + } + for i, test := range testCases { + t.Run(test.name, func(t *testing.T) { + client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node-a")}}) + gcc, podInformer, nodeInformer := NewFromClient(client, -1) + deletedPodNames := make([]string, 0) + var lock sync.Mutex + gcc.deletePod = func(_, name string) error { + lock.Lock() + defer lock.Unlock() + deletedPodNames = append(deletedPodNames, name) + return nil + } + creationTime := time.Unix(0, 0) + for _, node := range test.nodes { + creationTime = creationTime.Add(2 * time.Hour) + nodeInformer.Informer().GetStore().Add(&v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: node.name, CreationTimestamp: metav1.Time{Time: creationTime}}, + Spec: v1.NodeSpec{ + Taints: node.taints, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + { + Type: v1.NodeReady, + Status: node.readyCondition, + }, + }, + }, + }) + } + + for _, pod := range test.pods { + creationTime = creationTime.Add(1 * time.Hour) + podInformer.Informer().GetStore().Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime}, + DeletionTimestamp: pod.deletionTimeStamp}, + Status: v1.PodStatus{Phase: pod.phase}, + Spec: v1.PodSpec{NodeName: pod.nodeName}, + }) + } + + gcc.gc(context.TODO()) + if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass { + t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", + i, test.deletedPodNames.List(), deletedPodNames) + } + }) + } +} diff --git a/pkg/controller/volume/attachdetach/attach_detach_controller.go b/pkg/controller/volume/attachdetach/attach_detach_controller.go index f84194934e8a..56f404f1e1f4 100644 --- a/pkg/controller/volume/attachdetach/attach_detach_controller.go +++ b/pkg/controller/volume/attachdetach/attach_detach_controller.go @@ -180,6 +180,7 @@ func NewAttachDetachController( adc.actualStateOfWorld, adc.attacherDetacher, adc.nodeStatusUpdater, + adc.nodeLister, recorder) csiTranslator := csitrans.New() diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler.go b/pkg/controller/volume/attachdetach/reconciler/reconciler.go index e09443902cd0..a2708f0ee107 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler.go @@ -27,13 +27,17 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" + corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" + "k8s.io/kubernetes/pkg/features" kevents "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff" + "k8s.io/kubernetes/pkg/util/taints" "k8s.io/kubernetes/pkg/volume/util" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" ) @@ -69,6 +73,7 @@ func NewReconciler( actualStateOfWorld cache.ActualStateOfWorld, attacherDetacher operationexecutor.OperationExecutor, nodeStatusUpdater statusupdater.NodeStatusUpdater, + nodeLister corelisters.NodeLister, recorder record.EventRecorder) Reconciler { return &reconciler{ loopPeriod: loopPeriod, @@ -79,6 +84,7 @@ func NewReconciler( actualStateOfWorld: actualStateOfWorld, attacherDetacher: attacherDetacher, nodeStatusUpdater: nodeStatusUpdater, + nodeLister: nodeLister, timeOfLastSync: time.Now(), recorder: recorder, } @@ -92,6 +98,7 @@ type reconciler struct { actualStateOfWorld cache.ActualStateOfWorld attacherDetacher operationexecutor.OperationExecutor nodeStatusUpdater statusupdater.NodeStatusUpdater + nodeLister corelisters.NodeLister timeOfLastSync time.Time disableReconciliationSync bool recorder record.EventRecorder @@ -134,6 +141,19 @@ func (rc *reconciler) syncStates() { rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld) } +// hasOutOfServiceTaint returns true if the node has out-of-service taint present +// and `NodeOutOfServiceVolumeDetach` feature gate is enabled. +func (rc *reconciler) hasOutOfServiceTaint(nodeName types.NodeName) (bool, error) { + if utilfeature.DefaultFeatureGate.Enabled(features.NodeOutOfServiceVolumeDetach) { + node, err := rc.nodeLister.Get(string(nodeName)) + if err != nil { + return false, err + } + return taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService), nil + } + return false, nil +} + func (rc *reconciler) reconcile() { // Detaches are triggered before attaches so that volumes referenced by // pods that are rescheduled to a different node are detached first. @@ -183,8 +203,15 @@ func (rc *reconciler) reconcile() { } // Check whether timeout has reached the maximum waiting time timeout := elapsedTime > rc.maxWaitForUnmountDuration + + hasOutOfServiceTaint, err := rc.hasOutOfServiceTaint(attachedVolume.NodeName) + if err != nil { + klog.Errorf("failed to get taint specs for node %s: %s", attachedVolume.NodeName, err.Error()) + } + // Check whether volume is still mounted. Skip detach if it is still mounted unless timeout - if attachedVolume.MountedByNode && !timeout { + // or the node has `node.kubernetes.io/out-of-service` taint. + if attachedVolume.MountedByNode && !timeout && !hasOutOfServiceTaint { klog.V(5).InfoS("Cannot detach volume because it is still mounted", "volume", attachedVolume) continue } @@ -211,8 +238,12 @@ func (rc *reconciler) reconcile() { // Trigger detach volume which requires verifying safe to detach step // If timeout is true, skip verifySafeToDetach check + // If the node has node.kubernetes.io/out-of-service taint with NoExecute effect, skip verifySafeToDetach check klog.V(5).InfoS("Starting attacherDetacher.DetachVolume", "volume", attachedVolume) - verifySafeToDetach := !timeout + if hasOutOfServiceTaint { + klog.V(4).Infof("node %q has out-of-service taint", attachedVolume.NodeName) + } + verifySafeToDetach := !(timeout || hasOutOfServiceTaint) err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld) if err == nil { if !timeout { diff --git a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go index 413181a49c83..4dbd2808837b 100644 --- a/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go +++ b/pkg/controller/volume/attachdetach/reconciler/reconciler_test.go @@ -21,14 +21,18 @@ import ( "time" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/tools/record" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache" "k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater" controllervolumetesting "k8s.io/kubernetes/pkg/controller/volume/attachdetach/testing" + "k8s.io/kubernetes/pkg/features" volumetesting "k8s.io/kubernetes/pkg/volume/testing" "k8s.io/kubernetes/pkg/volume/util/operationexecutor" "k8s.io/kubernetes/pkg/volume/util/types" @@ -36,9 +40,10 @@ import ( ) const ( - reconcilerLoopPeriod time.Duration = 10 * time.Millisecond - syncLoopPeriod time.Duration = 100 * time.Minute - maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond + reconcilerLoopPeriod time.Duration = 10 * time.Millisecond + syncLoopPeriod time.Duration = 100 * time.Minute + maxWaitForUnmountDuration time.Duration = 50 * time.Millisecond + maxLongWaitForUnmountDuration time.Duration = 4200 * time.Second ) // Calls Run() @@ -48,6 +53,7 @@ func Test_Run_Positive_DoNothing(t *testing.T) { volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() fakeRecorder := &record.FakeRecorder{} fakeHandler := volumetesting.NewBlockVolumePathHandler() @@ -59,8 +65,9 @@ func Test_Run_Positive_DoNothing(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) nsu := statusupdater.NewNodeStatusUpdater( fakeKubeClient, informerFactory.Core().V1().Nodes().Lister(), asw) + nodeLister := informerFactory.Core().V1().Nodes().Lister() reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) // Act ch := make(chan struct{}) @@ -91,9 +98,11 @@ func Test_Run_Positive_OneDesiredVolumeAttach(t *testing.T) { volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + nodeLister := informerFactory.Core().V1().Nodes().Lister() reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName := "pod-uid" volumeName := v1.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -142,9 +151,11 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithUnmountedVolume(t *te volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + nodeLister := informerFactory.Core().V1().Nodes().Lister() reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName := "pod-uid" volumeName := v1.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -214,9 +225,11 @@ func Test_Run_Positive_OneDesiredVolumeAttachThenDetachWithMountedVolume(t *test volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName := "pod-uid" volumeName := v1.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -286,9 +299,11 @@ func Test_Run_Negative_OneDesiredVolumeAttachThenDetachWithUnmountedVolumeUpdate volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(true /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName := "pod-uid" volumeName := v1.UniqueVolumeName("volume-name") volumeSpec := controllervolumetesting.GetTestVolumeSpec(string(volumeName), volumeName) @@ -362,8 +377,10 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteMany(t *testing. fakeRecorder, fakeHandler)) nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName1 := "pod-uid1" podName2 := "pod-uid2" volumeName := v1.UniqueVolumeName("volume-name") @@ -453,9 +470,11 @@ func Test_Run_OneVolumeAttachAndDetachMultipleNodesWithReadWriteOnce(t *testing. volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName1 := "pod-uid1" podName2 := "pod-uid2" volumeName := v1.UniqueVolumeName("volume-name") @@ -543,9 +562,11 @@ func Test_Run_OneVolumeAttachAndDetachUncertainNodesWithReadWriteOnce(t *testing volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName1 := "pod-uid1" podName2 := "pod-uid2" volumeName := v1.UniqueVolumeName("volume-name") @@ -604,9 +625,11 @@ func Test_Run_OneVolumeDetachFailNodeWithReadWriteOnce(t *testing.T) { volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName1 := "pod-uid1" podName2 := "pod-uid2" podName3 := "pod-uid3" @@ -705,9 +728,11 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) reconciler := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) podName1 := "pod-uid1" podName2 := "pod-uid2" volumeName := v1.UniqueVolumeName("volume-name") @@ -752,6 +777,165 @@ func Test_Run_OneVolumeAttachAndDetachTimeoutNodesWithReadWriteOnce(t *testing.T } +// Populates desiredStateOfWorld cache with one node/volume/pod tuple. +// The node has node.kubernetes.io/out-of-service taint present. +// +// The maxWaitForUnmountDuration is longer (in this case it is 4200 * time.Second so that detach does not happen +// immediately due to timeout. +// +// Calls Run() +// Verifies there is one attach call and no detach calls. +// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted. +// Verifies there is one detach call and no (new) attach calls. +func Test_Run_OneVolumeDetachOnOutOfServiceTaintedNode(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)() + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + fakeKubeClient, + volumePluginMgr, + fakeRecorder, + fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + nodeLister := informerFactory.Core().V1().Nodes().Lister() + reconciler := NewReconciler( + reconcilerLoopPeriod, maxLongWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, + nsu, nodeLister, fakeRecorder) + podName1 := "pod-uid1" + volumeName1 := v1.UniqueVolumeName("volume-name1") + volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1) + nodeName1 := k8stypes.NodeName("worker-0") + node1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)}, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{{Key: v1.TaintNodeOutOfService, Effect: v1.TaintEffectNoExecute}}, + }, + } + informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1) + dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/) + volumeExists := dsw.VolumeExists(volumeName1, nodeName1) + if volumeExists { + t.Fatalf( + "Volume %q/node %q should not exist, but it does.", + volumeName1, + nodeName1) + } + + generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, + podName1), volumeSpec1, nodeName1) + if podErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podErr) + } + + // Act + ch := make(chan struct{}) + go reconciler.Run(ch) + defer close(ch) + + // Assert + waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) + + // Delete the pod and the volume will be detached only after the maxLongWaitForUnmountDuration expires as volume is + //not unmounted. Here maxLongWaitForUnmountDuration is used to mimic that node is out of service. + // But in this case the node has the node.kubernetes.io/out-of-service taint and hence it will not wait for + // maxLongWaitForUnmountDuration and will progress to detach immediately. + dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1) + // Assert -- Detach will be triggered if node has out of service taint + waitForNewDetacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, false /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 1 /* expectedDetachCallCount */, fakePlugin) +} + +// Populates desiredStateOfWorld cache with one node/volume/pod tuple. +// The node does not have the node.kubernetes.io/out-of-service taint present. +// +// The maxWaitForUnmountDuration is longer (in this case it is 4200 * time.Second so that detach does not happen +// immediately due to timeout. +// +// Calls Run() +// Verifies there is one attach call and no detach calls. +// Deletes the pod from desiredStateOfWorld cache without first marking the node/volume as unmounted. +// Verifies there is no detach call and no (new) attach calls. +func Test_Run_OneVolumeDetachOnNoOutOfServiceTaintedNode(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)() + // Arrange + volumePluginMgr, fakePlugin := volumetesting.GetTestVolumePluginMgr(t) + dsw := cache.NewDesiredStateOfWorld(volumePluginMgr) + asw := cache.NewActualStateOfWorld(volumePluginMgr) + fakeKubeClient := controllervolumetesting.CreateTestClient() + fakeRecorder := &record.FakeRecorder{} + fakeHandler := volumetesting.NewBlockVolumePathHandler() + ad := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator( + fakeKubeClient, + volumePluginMgr, + fakeRecorder, + fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) + nodeLister := informerFactory.Core().V1().Nodes().Lister() + reconciler := NewReconciler( + reconcilerLoopPeriod, maxLongWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, + nsu, nodeLister, fakeRecorder) + podName1 := "pod-uid1" + volumeName1 := v1.UniqueVolumeName("volume-name1") + volumeSpec1 := controllervolumetesting.GetTestVolumeSpec(string(volumeName1), volumeName1) + nodeName1 := k8stypes.NodeName("worker-0") + node1 := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{Name: string(nodeName1)}, + } + informerFactory.Core().V1().Nodes().Informer().GetStore().Add(node1) + dsw.AddNode(nodeName1, false /*keepTerminatedPodVolumes*/) + volumeExists := dsw.VolumeExists(volumeName1, nodeName1) + if volumeExists { + t.Fatalf( + "Volume %q/node %q should not exist, but it does.", + volumeName1, + nodeName1) + } + + generatedVolumeName, podErr := dsw.AddPod(types.UniquePodName(podName1), controllervolumetesting.NewPod(podName1, + podName1), volumeSpec1, nodeName1) + if podErr != nil { + t.Fatalf("AddPod failed. Expected: Actual: <%v>", podErr) + } + + // Act + ch := make(chan struct{}) + go reconciler.Run(ch) + defer close(ch) + + // Assert + waitForNewAttacherCallCount(t, 1 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) + + // Delete the pod and the volume will be detached only after the maxLongWaitForUnmountDuration expires as volume is + // not unmounted. Here maxLongWaitForUnmountDuration is used to mimic that node is out of service. + // But in this case the node does not have the node.kubernetes.io/out-of-service taint and hence it will wait for + // maxLongWaitForUnmountDuration and will not be detached immediately. + dsw.DeletePod(types.UniquePodName(podName1), generatedVolumeName, nodeName1) + // Assert -- Detach will be triggered only after maxLongWaitForUnmountDuration expires + waitForNewDetacherCallCount(t, 0 /* expectedCallCount */, fakePlugin) + verifyNewAttacherCallCount(t, false /* expectZeroNewAttacherCallCount */, fakePlugin) + waitForAttachCallCount(t, 1 /* expectedAttachCallCount */, fakePlugin) + verifyNewDetacherCallCount(t, true /* expectZeroNewDetacherCallCount */, fakePlugin) + waitForDetachCallCount(t, 0 /* expectedDetachCallCount */, fakePlugin) +} + func Test_ReportMultiAttachError(t *testing.T) { type nodeWithPods struct { name k8stypes.NodeName @@ -810,9 +994,11 @@ func Test_ReportMultiAttachError(t *testing.T) { volumePluginMgr, fakeRecorder, fakeHandler)) + informerFactory := informers.NewSharedInformerFactory(fakeKubeClient, controller.NoResyncPeriodFunc()) + nodeLister := informerFactory.Core().V1().Nodes().Lister() nsu := statusupdater.NewFakeNodeStatusUpdater(false /* returnError */) rc := NewReconciler( - reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, fakeRecorder) + reconcilerLoopPeriod, maxWaitForUnmountDuration, syncLoopPeriod, false, dsw, asw, ad, nsu, nodeLister, fakeRecorder) nodes := []k8stypes.NodeName{} for _, n := range test.nodes { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 90d296734c18..5745fa4a656d 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -844,12 +844,20 @@ const ( // // Enable MinDomains in Pod Topology Spread. MinDomainsInPodTopologySpread featuregate.Feature = "MinDomainsInPodTopologySpread" + // owner: @aojea // kep: http://kep.k8s.io/3070 // alpha: v1.24 // // Subdivide the ClusterIP range for dynamic and static IP allocation. ServiceIPStaticSubrange featuregate.Feature = "ServiceIPStaticSubrange" + + // owner: @xing-yang @sonasingh46 + // kep: http://kep.k8s.io/2268 + // alpha: v1.24 + // + // Allow pods to failover to a different node in case of non graceful node shutdown + NodeOutOfServiceVolumeDetach featuregate.Feature = "NodeOutOfServiceVolumeDetach" ) func init() { @@ -973,7 +981,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS LegacyServiceAccountTokenNoAutoGeneration: {Default: true, PreRelease: featuregate.Beta}, MinDomainsInPodTopologySpread: {Default: false, PreRelease: featuregate.Alpha}, ServiceIPStaticSubrange: {Default: false, PreRelease: featuregate.Alpha}, - + NodeOutOfServiceVolumeDetach: {Default: false, PreRelease: featuregate.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: genericfeatures.AdvancedAuditing: {Default: true, PreRelease: featuregate.GA}, diff --git a/pkg/util/node/node.go b/pkg/util/node/node.go index aac738c200c9..5646207cbbc1 100644 --- a/pkg/util/node/node.go +++ b/pkg/util/node/node.go @@ -164,3 +164,13 @@ func GetNodeIP(client clientset.Interface, name string) net.IP { } return nodeIP } + +// IsNodeReady returns true if a node is ready; false otherwise. +func IsNodeReady(node *v1.Node) bool { + for _, c := range node.Status.Conditions { + if c.Type == v1.NodeReady { + return c.Status == v1.ConditionTrue + } + } + return false +} diff --git a/pkg/util/taints/taints.go b/pkg/util/taints/taints.go index 4cfe227f8b82..abc3bcf50d14 100644 --- a/pkg/util/taints/taints.go +++ b/pkg/util/taints/taints.go @@ -275,6 +275,16 @@ func TaintExists(taints []v1.Taint, taintToFind *v1.Taint) bool { return false } +// TaintKeyExists checks if the given taint key exists in list of taints. Returns true if exists false otherwise. +func TaintKeyExists(taints []v1.Taint, taintKeyToMatch string) bool { + for _, taint := range taints { + if taint.Key == taintKeyToMatch { + return true + } + } + return false +} + func TaintSetDiff(t1, t2 []v1.Taint) (taintsToAdd []*v1.Taint, taintsToRemove []*v1.Taint) { for _, taint := range t1 { if !TaintExists(t2, &taint) { diff --git a/staging/src/k8s.io/api/core/v1/well_known_taints.go b/staging/src/k8s.io/api/core/v1/well_known_taints.go index 84d268197c6a..a6d8c272b25a 100644 --- a/staging/src/k8s.io/api/core/v1/well_known_taints.go +++ b/staging/src/k8s.io/api/core/v1/well_known_taints.go @@ -45,4 +45,8 @@ const ( // TaintNodePIDPressure will be added when node has pid pressure // and removed when node has enough pid. TaintNodePIDPressure = "node.kubernetes.io/pid-pressure" + + // TaintNodeOutOfService can be added when node is out of service in case of + // a non-graceful shutdown + TaintNodeOutOfService = "node.kubernetes.io/out-of-service" )