Skip to content

Commit

Permalink
Merge pull request #96441 from smarterclayton/daemonset_surge_impl
Browse files Browse the repository at this point in the history
DaemonSet controller respects MaxSurge during update
  • Loading branch information
k8s-ci-robot committed Mar 6, 2021
2 parents 4770211 + 8d8884a commit 377ed3c
Show file tree
Hide file tree
Showing 7 changed files with 1,334 additions and 450 deletions.
2 changes: 1 addition & 1 deletion pkg/api/v1/pod/util.go
Expand Up @@ -290,7 +290,7 @@ func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool {

c := GetPodReadyCondition(pod.Status)
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time) {
if minReadySeconds == 0 || (!c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time)) {
return true
}
return false
Expand Down
117 changes: 72 additions & 45 deletions pkg/controller/daemon/daemon_controller.go
Expand Up @@ -642,11 +642,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) {
}
node := obj.(*v1.Node)
for _, ds := range dsList {
shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
continue
}
if shouldRun {
if shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds); shouldRun {
dsc.enqueueDaemonSet(ds)
}
}
Expand Down Expand Up @@ -704,14 +700,8 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
for _, ds := range dsList {
oldShouldRun, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
if err != nil {
continue
}
currentShouldRun, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
if err != nil {
continue
}
oldShouldRun, oldShouldContinueRunning := dsc.nodeShouldRunDaemonPod(oldNode, ds)
currentShouldRun, currentShouldContinueRunning := dsc.nodeShouldRunDaemonPod(curNode, ds)
if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
dsc.enqueueDaemonSet(ds)
}
Expand Down Expand Up @@ -806,13 +796,10 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
node *v1.Node,
nodeToDaemonPods map[string][]*v1.Pod,
ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) {

shouldRun, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
}
hash string,
) (nodesNeedingDaemonPods, podsToDelete []string) {

shouldRun, shouldContinueRunning := dsc.nodeShouldRunDaemonPod(node, ds)
daemonPods, exists := nodeToDaemonPods[node.Name]

switch {
Expand Down Expand Up @@ -853,14 +840,60 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
// Sort the daemon pods by creation time, so the oldest is preserved.
if len(daemonPodsRunning) > 1 {

// When surge is not enabled, if there is more than 1 running pod on a node delete all but the oldest
if !util.AllowsSurge(ds) {
if len(daemonPodsRunning) <= 1 {
// There are no excess pods to be pruned, and no pods to create
break
}

sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
}
break
}

if len(daemonPodsRunning) <= 1 {
// // There are no excess pods to be pruned
if len(daemonPodsRunning) == 0 && shouldRun {
// We are surging so we need to have at least one non-deleted pod on the node
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
}
break
}

// When surge is enabled, we allow 2 pods if and only if the oldest pod matching the current hash state
// is not ready AND the oldest pod that doesn't match the current hash state is ready. All other pods are
// deleted. If neither pod is ready, only the one matching the current hash revision is kept.
var oldestNewPod, oldestOldPod *v1.Pod
sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for _, pod := range daemonPodsRunning {
if pod.Labels[apps.ControllerRevisionHashLabelKey] == hash {
if oldestNewPod == nil {
oldestNewPod = pod
continue
}
} else {
if oldestOldPod == nil {
oldestOldPod = pod
continue
}
}
podsToDelete = append(podsToDelete, pod.Name)
}
if oldestNewPod != nil && oldestOldPod != nil {
switch {
case !podutil.IsPodReady(oldestOldPod):
klog.V(5).Infof("Pod %s/%s from daemonset %s is no longer ready and will be replaced with newer pod %s", oldestOldPod.Namespace, oldestOldPod.Name, ds.Name, oldestNewPod.Name)
podsToDelete = append(podsToDelete, oldestOldPod.Name)
case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name)
podsToDelete = append(podsToDelete, oldestOldPod.Name)
}
}

case !shouldContinueRunning && exists:
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
for _, pod := range daemonPods {
Expand All @@ -871,7 +904,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
}
}

return nodesNeedingDaemonPods, podsToDelete, nil
return nodesNeedingDaemonPods, podsToDelete
}

// manage manages the scheduling and running of Pods of ds on nodes.
Expand All @@ -889,12 +922,8 @@ func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node,
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds)

if err != nil {
continue
}
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds, hash)

nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
Expand Down Expand Up @@ -1074,12 +1103,9 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
}

var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
now := dsc.failedPodsBackoff.Clock.Now()
for _, node := range nodeList {
shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return err
}

shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds)
scheduled := len(nodeToDaemonPods[node.Name]) > 0

if shouldRun {
Expand All @@ -1092,7 +1118,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
pod := daemonPods[0]
if podutil.IsPodReady(pod) {
numberReady++
if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numberAvailable++
}
}
Expand Down Expand Up @@ -1127,9 +1153,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
}

func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
startTime := time.Now()
startTime := dsc.failedPodsBackoff.Clock.Now()

defer func() {
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -1222,39 +1249,39 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
// * shouldContinueRunning:
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
// running on that node.
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) {
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
pod := NewPod(ds, node.Name)

// If the daemon set specifies a node name, check that it matches with node.Name.
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, nil
return false, false
}

taints := node.Spec.Taints
fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
if !fitsNodeName || !fitsNodeAffinity {
return false, false, nil
return false, false
}

if !fitsTaints {
// Scheduled daemon pods should continue running if they tolerate NoExecute taint.
_, untolerated := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
})
return false, !untolerated, nil
return false, !hasUntoleratedTaint
}

return true, true, nil
return true, true
}

// Predicates checks if a DaemonSet's pod can run on a node.
func Predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node)
_, untolerated := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
})
fitsTaints = !untolerated
fitsTaints = !hasUntoleratedTaint
return
}

Expand Down

0 comments on commit 377ed3c

Please sign in to comment.