Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DaemonSet controller respects MaxSurge during update #96441

Merged
merged 5 commits into from Mar 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/api/v1/pod/util.go
Expand Up @@ -290,7 +290,7 @@ func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool {

c := GetPodReadyCondition(pod.Status)
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time) {
if minReadySeconds == 0 || (!c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time)) {
return true
}
return false
Expand Down
117 changes: 72 additions & 45 deletions pkg/controller/daemon/daemon_controller.go
Expand Up @@ -642,11 +642,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) {
}
node := obj.(*v1.Node)
for _, ds := range dsList {
shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
continue
}
if shouldRun {
if shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds); shouldRun {
dsc.enqueueDaemonSet(ds)
}
}
Expand Down Expand Up @@ -704,14 +700,8 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
}
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
for _, ds := range dsList {
oldShouldRun, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds)
if err != nil {
continue
}
currentShouldRun, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds)
if err != nil {
continue
}
oldShouldRun, oldShouldContinueRunning := dsc.nodeShouldRunDaemonPod(oldNode, ds)
currentShouldRun, currentShouldContinueRunning := dsc.nodeShouldRunDaemonPod(curNode, ds)
if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) {
dsc.enqueueDaemonSet(ds)
}
Expand Down Expand Up @@ -806,13 +796,10 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
node *v1.Node,
nodeToDaemonPods map[string][]*v1.Pod,
ds *apps.DaemonSet,
) (nodesNeedingDaemonPods, podsToDelete []string, err error) {

shouldRun, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return
}
hash string,
) (nodesNeedingDaemonPods, podsToDelete []string) {

shouldRun, shouldContinueRunning := dsc.nodeShouldRunDaemonPod(node, ds)
daemonPods, exists := nodeToDaemonPods[node.Name]

switch {
Expand Down Expand Up @@ -853,14 +840,60 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
daemonPodsRunning = append(daemonPodsRunning, pod)
}
}
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
// Sort the daemon pods by creation time, so the oldest is preserved.
if len(daemonPodsRunning) > 1 {

// When surge is not enabled, if there is more than 1 running pod on a node delete all but the oldest
if !util.AllowsSurge(ds) {
if len(daemonPodsRunning) <= 1 {
// There are no excess pods to be pruned, and no pods to create
break
}

sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for i := 1; i < len(daemonPodsRunning); i++ {
podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name)
}
break
}

if len(daemonPodsRunning) <= 1 {
// // There are no excess pods to be pruned
if len(daemonPodsRunning) == 0 && shouldRun {
// We are surging so we need to have at least one non-deleted pod on the node
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name)
}
break
}

// When surge is enabled, we allow 2 pods if and only if the oldest pod matching the current hash state
// is not ready AND the oldest pod that doesn't match the current hash state is ready. All other pods are
// deleted. If neither pod is ready, only the one matching the current hash revision is kept.
var oldestNewPod, oldestOldPod *v1.Pod
sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning))
for _, pod := range daemonPodsRunning {
if pod.Labels[apps.ControllerRevisionHashLabelKey] == hash {
if oldestNewPod == nil {
oldestNewPod = pod
continue
}
} else {
if oldestOldPod == nil {
oldestOldPod = pod
continue
}
}
podsToDelete = append(podsToDelete, pod.Name)
}
if oldestNewPod != nil && oldestOldPod != nil {
switch {
case !podutil.IsPodReady(oldestOldPod):
klog.V(5).Infof("Pod %s/%s from daemonset %s is no longer ready and will be replaced with newer pod %s", oldestOldPod.Namespace, oldestOldPod.Name, ds.Name, oldestNewPod.Name)
podsToDelete = append(podsToDelete, oldestOldPod.Name)
case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name)
podsToDelete = append(podsToDelete, oldestOldPod.Name)
}
}

case !shouldContinueRunning && exists:
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
for _, pod := range daemonPods {
Expand All @@ -871,7 +904,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode(
}
}

return nodesNeedingDaemonPods, podsToDelete, nil
return nodesNeedingDaemonPods, podsToDelete
}

// manage manages the scheduling and running of Pods of ds on nodes.
Expand All @@ -889,12 +922,8 @@ func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node,
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds)

if err != nil {
continue
}
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(
node, nodeToDaemonPods, ds, hash)

nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
Expand Down Expand Up @@ -1074,12 +1103,9 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
}

var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int
now := dsc.failedPodsBackoff.Clock.Now()
for _, node := range nodeList {
shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds)
if err != nil {
return err
}

shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds)
scheduled := len(nodeToDaemonPods[node.Name]) > 0

if shouldRun {
Expand All @@ -1092,7 +1118,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
pod := daemonPods[0]
if podutil.IsPodReady(pod) {
numberReady++
if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) {
if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) {
numberAvailable++
}
}
Expand Down Expand Up @@ -1127,9 +1153,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL
}

func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
startTime := time.Now()
startTime := dsc.failedPodsBackoff.Clock.Now()

defer func() {
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime))
klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
}()

namespace, name, err := cache.SplitMetaNamespaceKey(key)
Expand Down Expand Up @@ -1222,39 +1249,39 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
// * shouldContinueRunning:
// Returns true when a daemonset should continue running on a node if a daemonset pod is already
// running on that node.
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) {
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) {
pod := NewPod(ds, node.Name)

// If the daemon set specifies a node name, check that it matches with node.Name.
if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) {
return false, false, nil
return false, false
}

taints := node.Spec.Taints
fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints)
if !fitsNodeName || !fitsNodeAffinity {
return false, false, nil
return false, false
}

if !fitsTaints {
// Scheduled daemon pods should continue running if they tolerate NoExecute taint.
_, untolerated := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute
})
return false, !untolerated, nil
return false, !hasUntoleratedTaint
}

return true, true, nil
return true, true
}

// Predicates checks if a DaemonSet's pod can run on a node.
func Predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) {
fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name
fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node)
_, untolerated := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
_, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool {
return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule
})
fitsTaints = !untolerated
fitsTaints = !hasUntoleratedTaint
return
}

Expand Down