Skip to content

Commit

Permalink
Handle Non-graceful Node Shutdown (#108486)
Browse files Browse the repository at this point in the history
Signed-off-by: Ashutosh Kumar <sonasingh46@gmail.com>

Co-authored-by: Ashutosh Kumar <sonasingh46@gmail.com>

Co-authored-by: xing-yang <xingyang105@gmail.com>
  • Loading branch information
sonasingh46 and xing-yang committed Mar 26, 2022
1 parent 78889cd commit c009753
Show file tree
Hide file tree
Showing 9 changed files with 475 additions and 18 deletions.
55 changes: 54 additions & 1 deletion pkg/controller/podgc/gc_controller.go
Expand Up @@ -29,14 +29,17 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/component-base/metrics/prometheus/ratelimiter"

"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
nodeutil "k8s.io/kubernetes/pkg/util/node"
"k8s.io/kubernetes/pkg/util/taints"
)

const (
Expand Down Expand Up @@ -113,6 +116,9 @@ func (gcc *PodGCController) gc(ctx context.Context) {
if gcc.terminatedPodThreshold > 0 {
gcc.gcTerminated(pods)
}
if utilfeature.DefaultFeatureGate.Enabled(features.NodeOutOfServiceVolumeDetach) {
gcc.gcTerminating(pods)
}
gcc.gcOrphaned(ctx, pods, nodes)
gcc.gcUnscheduledTerminating(pods)
}
Expand All @@ -124,6 +130,53 @@ func isPodTerminated(pod *v1.Pod) bool {
return false
}

// isPodTerminating returns true if the pod is terminating.
func isPodTerminating(pod *v1.Pod) bool {
return pod.ObjectMeta.DeletionTimestamp != nil
}

func (gcc *PodGCController) gcTerminating(pods []*v1.Pod) {
klog.V(4).Info("GC'ing terminating pods that are on out-of-service nodes")
terminatingPods := []*v1.Pod{}
for _, pod := range pods {
if isPodTerminating(pod) {
node, err := gcc.nodeLister.Get(string(pod.Spec.NodeName))
if err != nil {
klog.Errorf("failed to get node %s : %s", string(pod.Spec.NodeName), err)
continue
}
// Add this pod to terminatingPods list only if the following conditions are met:
// 1. Node is not ready.
// 2. Node has `node.kubernetes.io/out-of-service` taint.
if !nodeutil.IsNodeReady(node) && taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService) {
klog.V(4).Infof("garbage collecting pod %s that is terminating. Phase [%v]", pod.Name, pod.Status.Phase)
terminatingPods = append(terminatingPods, pod)
}
}
}

deleteCount := len(terminatingPods)
if deleteCount == 0 {
return
}

klog.V(4).Infof("Garbage collecting %v pods that are terminating on node tainted with node.kubernetes.io/out-of-service", deleteCount)
// sort only when necessary
sort.Sort(byCreationTimestamp(terminatingPods))
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
defer wait.Done()
if err := gcc.deletePod(namespace, name); err != nil {
// ignore not founds
utilruntime.HandleError(err)
}
}(terminatingPods[i].Namespace, terminatingPods[i].Name)
}
wait.Wait()
}

func (gcc *PodGCController) gcTerminated(pods []*v1.Pod) {
terminatedPods := []*v1.Pod{}
for _, pod := range pods {
Expand Down
154 changes: 154 additions & 0 deletions pkg/controller/podgc/gc_controller_test.go
Expand Up @@ -27,13 +27,16 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/util/workqueue"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/testutil"
"k8s.io/kubernetes/pkg/features"
testingclock "k8s.io/utils/clock/testing"
)

Expand Down Expand Up @@ -448,3 +451,154 @@ func TestGCUnscheduledTerminating(t *testing.T) {
})
}
}

func TestGCTerminating(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.NodeOutOfServiceVolumeDetach, true)()
type node struct {
name string
readyCondition v1.ConditionStatus
taints []v1.Taint
}

type nameToPodConfig struct {
name string
phase v1.PodPhase
deletionTimeStamp *metav1.Time
nodeName string
}

testCases := []struct {
name string
pods []nameToPodConfig
nodes []node
deletedPodNames sets.String
}{
{
name: "pods have deletion timestamp set and the corresponding nodes are not ready",
nodes: []node{
{name: "worker-0", readyCondition: v1.ConditionFalse},
{name: "worker-1", readyCondition: v1.ConditionFalse},
},
pods: []nameToPodConfig{
{name: "a", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-0"},
{name: "b", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-1"},
},
deletedPodNames: sets.NewString(),
},

{
name: "some pods have deletion timestamp and/or phase set and some of the corresponding nodes have an" +
"outofservice taint that are not ready",
nodes: []node{
// terminated pods on this node should be force deleted
{name: "worker-0", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService,
Effect: v1.TaintEffectNoExecute}}},
// terminated pods on this node should not be force deleted
{name: "worker-1", readyCondition: v1.ConditionFalse},
// terminated pods on this node should not be force deleted
{name: "worker-2", readyCondition: v1.ConditionTrue},
// terminated pods on this node should be force deleted
{name: "worker-3", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService,
Effect: v1.TaintEffectNoSchedule}}},
// terminated pods on this node should be force deleted
{name: "worker-4", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService,
Effect: v1.TaintEffectPreferNoSchedule}}},
// terminated pods on this node should be force deleted
{name: "worker-5", readyCondition: v1.ConditionFalse, taints: []v1.Taint{{Key: v1.TaintNodeOutOfService,
Value: "any-value", Effect: v1.TaintEffectNoExecute}}},
},
pods: []nameToPodConfig{
// pods a1, b1, c1, d1 and e1 are on node worker-0
{name: "a1", nodeName: "worker-0"},
{name: "b1", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-0"},
{name: "c1", phase: v1.PodPending, nodeName: "worker-0"},
{name: "d1", phase: v1.PodRunning, nodeName: "worker-0"},
{name: "e1", phase: v1.PodUnknown, nodeName: "worker-0"},

// pods a2, b2, c2, d2 and e2 are on node worker-1
{name: "a2", nodeName: "worker-1"},
{name: "b2", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-1"},
{name: "c2", phase: v1.PodPending, nodeName: "worker-1"},
{name: "d2", phase: v1.PodRunning, nodeName: "worker-1"},
{name: "e2", phase: v1.PodUnknown, nodeName: "worker-1"},

// pods a3, b3, c3, d3 and e3 are on node worker-2
{name: "a3", nodeName: "worker-2"},
{name: "b3", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-2"},
{name: "c3", phase: v1.PodPending, nodeName: "worker-2"},
{name: "d3", phase: v1.PodRunning, nodeName: "worker-2"},
{name: "e3", phase: v1.PodUnknown, nodeName: "worker-2"},

// pods a4, b4, c4, d4 and e4 are on node worker-3
{name: "a4", nodeName: "worker-3"},
{name: "b4", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-3"},
{name: "c4", phase: v1.PodPending, nodeName: "worker-3"},
{name: "d4", phase: v1.PodRunning, nodeName: "worker-3"},
{name: "e4", phase: v1.PodUnknown, nodeName: "worker-3"},

// pods a5, b5, c5, d5 and e5 are on node worker-4
{name: "a5", nodeName: "worker-3"},
{name: "b5", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-4"},
{name: "c5", phase: v1.PodPending, nodeName: "worker-4"},
{name: "d5", phase: v1.PodRunning, nodeName: "worker-4"},
{name: "e5", phase: v1.PodUnknown, nodeName: "worker-4"},

// pods a6, b6, c6, d6 and e6 are on node worker-5
{name: "a6", nodeName: "worker-5"},
{name: "b6", deletionTimeStamp: &metav1.Time{}, nodeName: "worker-5"},
{name: "c6", phase: v1.PodPending, nodeName: "worker-5"},
{name: "d6", phase: v1.PodRunning, nodeName: "worker-5"},
{name: "e6", phase: v1.PodUnknown, nodeName: "worker-5"},
},
deletedPodNames: sets.NewString("b1", "b4", "b5", "b6"),
},
}
for i, test := range testCases {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset(&v1.NodeList{Items: []v1.Node{*testutil.NewNode("node-a")}})
gcc, podInformer, nodeInformer := NewFromClient(client, -1)
deletedPodNames := make([]string, 0)
var lock sync.Mutex
gcc.deletePod = func(_, name string) error {
lock.Lock()
defer lock.Unlock()
deletedPodNames = append(deletedPodNames, name)
return nil
}
creationTime := time.Unix(0, 0)
for _, node := range test.nodes {
creationTime = creationTime.Add(2 * time.Hour)
nodeInformer.Informer().GetStore().Add(&v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: node.name, CreationTimestamp: metav1.Time{Time: creationTime}},
Spec: v1.NodeSpec{
Taints: node.taints,
},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
{
Type: v1.NodeReady,
Status: node.readyCondition,
},
},
},
})
}

for _, pod := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
podInformer.Informer().GetStore().Add(&v1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: pod.name, CreationTimestamp: metav1.Time{Time: creationTime},
DeletionTimestamp: pod.deletionTimeStamp},
Status: v1.PodStatus{Phase: pod.phase},
Spec: v1.PodSpec{NodeName: pod.nodeName},
})
}

gcc.gc(context.TODO())
if pass := compareStringSetToList(test.deletedPodNames, deletedPodNames); !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v",
i, test.deletedPodNames.List(), deletedPodNames)
}
})
}
}
Expand Up @@ -180,6 +180,7 @@ func NewAttachDetachController(
adc.actualStateOfWorld,
adc.attacherDetacher,
adc.nodeStatusUpdater,
adc.nodeLister,
recorder)

csiTranslator := csitrans.New()
Expand Down
35 changes: 33 additions & 2 deletions pkg/controller/volume/attachdetach/reconciler/reconciler.go
Expand Up @@ -27,13 +27,17 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/metrics"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach/statusupdater"
"k8s.io/kubernetes/pkg/features"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
"k8s.io/kubernetes/pkg/util/taints"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/operationexecutor"
)
Expand Down Expand Up @@ -69,6 +73,7 @@ func NewReconciler(
actualStateOfWorld cache.ActualStateOfWorld,
attacherDetacher operationexecutor.OperationExecutor,
nodeStatusUpdater statusupdater.NodeStatusUpdater,
nodeLister corelisters.NodeLister,
recorder record.EventRecorder) Reconciler {
return &reconciler{
loopPeriod: loopPeriod,
Expand All @@ -79,6 +84,7 @@ func NewReconciler(
actualStateOfWorld: actualStateOfWorld,
attacherDetacher: attacherDetacher,
nodeStatusUpdater: nodeStatusUpdater,
nodeLister: nodeLister,
timeOfLastSync: time.Now(),
recorder: recorder,
}
Expand All @@ -92,6 +98,7 @@ type reconciler struct {
actualStateOfWorld cache.ActualStateOfWorld
attacherDetacher operationexecutor.OperationExecutor
nodeStatusUpdater statusupdater.NodeStatusUpdater
nodeLister corelisters.NodeLister
timeOfLastSync time.Time
disableReconciliationSync bool
recorder record.EventRecorder
Expand Down Expand Up @@ -134,6 +141,19 @@ func (rc *reconciler) syncStates() {
rc.attacherDetacher.VerifyVolumesAreAttached(volumesPerNode, rc.actualStateOfWorld)
}

// hasOutOfServiceTaint returns true if the node has out-of-service taint present
// and `NodeOutOfServiceVolumeDetach` feature gate is enabled.
func (rc *reconciler) hasOutOfServiceTaint(nodeName types.NodeName) (bool, error) {
if utilfeature.DefaultFeatureGate.Enabled(features.NodeOutOfServiceVolumeDetach) {
node, err := rc.nodeLister.Get(string(nodeName))
if err != nil {
return false, err
}
return taints.TaintKeyExists(node.Spec.Taints, v1.TaintNodeOutOfService), nil
}
return false, nil
}

func (rc *reconciler) reconcile() {
// Detaches are triggered before attaches so that volumes referenced by
// pods that are rescheduled to a different node are detached first.
Expand Down Expand Up @@ -183,8 +203,15 @@ func (rc *reconciler) reconcile() {
}
// Check whether timeout has reached the maximum waiting time
timeout := elapsedTime > rc.maxWaitForUnmountDuration

hasOutOfServiceTaint, err := rc.hasOutOfServiceTaint(attachedVolume.NodeName)
if err != nil {
klog.Errorf("failed to get taint specs for node %s: %s", attachedVolume.NodeName, err.Error())
}

// Check whether volume is still mounted. Skip detach if it is still mounted unless timeout
if attachedVolume.MountedByNode && !timeout {
// or the node has `node.kubernetes.io/out-of-service` taint.
if attachedVolume.MountedByNode && !timeout && !hasOutOfServiceTaint {
klog.V(5).InfoS("Cannot detach volume because it is still mounted", "volume", attachedVolume)
continue
}
Expand All @@ -211,8 +238,12 @@ func (rc *reconciler) reconcile() {

// Trigger detach volume which requires verifying safe to detach step
// If timeout is true, skip verifySafeToDetach check
// If the node has node.kubernetes.io/out-of-service taint with NoExecute effect, skip verifySafeToDetach check
klog.V(5).InfoS("Starting attacherDetacher.DetachVolume", "volume", attachedVolume)
verifySafeToDetach := !timeout
if hasOutOfServiceTaint {
klog.V(4).Infof("node %q has out-of-service taint", attachedVolume.NodeName)
}
verifySafeToDetach := !(timeout || hasOutOfServiceTaint)
err = rc.attacherDetacher.DetachVolume(attachedVolume.AttachedVolume, verifySafeToDetach, rc.actualStateOfWorld)
if err == nil {
if !timeout {
Expand Down

0 comments on commit c009753

Please sign in to comment.