diff --git a/pkg/api/v1/pod/util.go b/pkg/api/v1/pod/util.go index f382509aeefe..9560121bbe5f 100644 --- a/pkg/api/v1/pod/util.go +++ b/pkg/api/v1/pod/util.go @@ -290,7 +290,7 @@ func IsPodAvailable(pod *v1.Pod, minReadySeconds int32, now metav1.Time) bool { c := GetPodReadyCondition(pod.Status) minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second - if minReadySeconds == 0 || !c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time) { + if minReadySeconds == 0 || (!c.LastTransitionTime.IsZero() && c.LastTransitionTime.Add(minReadySecondsDuration).Before(now.Time)) { return true } return false diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index 90c4ea1aec81..a383bb382b0a 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -642,11 +642,7 @@ func (dsc *DaemonSetsController) addNode(obj interface{}) { } node := obj.(*v1.Node) for _, ds := range dsList { - shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) - if err != nil { - continue - } - if shouldRun { + if shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds); shouldRun { dsc.enqueueDaemonSet(ds) } } @@ -704,14 +700,8 @@ func (dsc *DaemonSetsController) updateNode(old, cur interface{}) { } // TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too). for _, ds := range dsList { - oldShouldRun, oldShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(oldNode, ds) - if err != nil { - continue - } - currentShouldRun, currentShouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(curNode, ds) - if err != nil { - continue - } + oldShouldRun, oldShouldContinueRunning := dsc.nodeShouldRunDaemonPod(oldNode, ds) + currentShouldRun, currentShouldContinueRunning := dsc.nodeShouldRunDaemonPod(curNode, ds) if (oldShouldRun != currentShouldRun) || (oldShouldContinueRunning != currentShouldContinueRunning) { dsc.enqueueDaemonSet(ds) } @@ -806,13 +796,10 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( node *v1.Node, nodeToDaemonPods map[string][]*v1.Pod, ds *apps.DaemonSet, -) (nodesNeedingDaemonPods, podsToDelete []string, err error) { - - shouldRun, shouldContinueRunning, err := dsc.nodeShouldRunDaemonPod(node, ds) - if err != nil { - return - } + hash string, +) (nodesNeedingDaemonPods, podsToDelete []string) { + shouldRun, shouldContinueRunning := dsc.nodeShouldRunDaemonPod(node, ds) daemonPods, exists := nodeToDaemonPods[node.Name] switch { @@ -853,14 +840,60 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( daemonPodsRunning = append(daemonPodsRunning, pod) } } - // If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods. - // Sort the daemon pods by creation time, so the oldest is preserved. - if len(daemonPodsRunning) > 1 { + + // When surge is not enabled, if there is more than 1 running pod on a node delete all but the oldest + if !util.AllowsSurge(ds) { + if len(daemonPodsRunning) <= 1 { + // There are no excess pods to be pruned, and no pods to create + break + } + sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) for i := 1; i < len(daemonPodsRunning); i++ { podsToDelete = append(podsToDelete, daemonPodsRunning[i].Name) } + break + } + + if len(daemonPodsRunning) <= 1 { + // // There are no excess pods to be pruned + if len(daemonPodsRunning) == 0 && shouldRun { + // We are surging so we need to have at least one non-deleted pod on the node + nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, node.Name) + } + break } + + // When surge is enabled, we allow 2 pods if and only if the oldest pod matching the current hash state + // is not ready AND the oldest pod that doesn't match the current hash state is ready. All other pods are + // deleted. If neither pod is ready, only the one matching the current hash revision is kept. + var oldestNewPod, oldestOldPod *v1.Pod + sort.Sort(podByCreationTimestampAndPhase(daemonPodsRunning)) + for _, pod := range daemonPodsRunning { + if pod.Labels[apps.ControllerRevisionHashLabelKey] == hash { + if oldestNewPod == nil { + oldestNewPod = pod + continue + } + } else { + if oldestOldPod == nil { + oldestOldPod = pod + continue + } + } + podsToDelete = append(podsToDelete, pod.Name) + } + if oldestNewPod != nil && oldestOldPod != nil { + switch { + case !podutil.IsPodReady(oldestOldPod): + klog.V(5).Infof("Pod %s/%s from daemonset %s is no longer ready and will be replaced with newer pod %s", oldestOldPod.Namespace, oldestOldPod.Name, ds.Name, oldestNewPod.Name) + podsToDelete = append(podsToDelete, oldestOldPod.Name) + case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}): + klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name) + podsToDelete = append(podsToDelete, oldestOldPod.Name) + } + } + case !shouldContinueRunning && exists: // If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node. for _, pod := range daemonPods { @@ -871,7 +904,7 @@ func (dsc *DaemonSetsController) podsShouldBeOnNode( } } - return nodesNeedingDaemonPods, podsToDelete, nil + return nodesNeedingDaemonPods, podsToDelete } // manage manages the scheduling and running of Pods of ds on nodes. @@ -889,12 +922,8 @@ func (dsc *DaemonSetsController) manage(ds *apps.DaemonSet, nodeList []*v1.Node, // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. var nodesNeedingDaemonPods, podsToDelete []string for _, node := range nodeList { - nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode, err := dsc.podsShouldBeOnNode( - node, nodeToDaemonPods, ds) - - if err != nil { - continue - } + nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode( + node, nodeToDaemonPods, ds, hash) nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...) podsToDelete = append(podsToDelete, podsToDeleteOnNode...) @@ -1074,12 +1103,9 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL } var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled, numberReady, updatedNumberScheduled, numberAvailable int + now := dsc.failedPodsBackoff.Clock.Now() for _, node := range nodeList { - shouldRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) - if err != nil { - return err - } - + shouldRun, _ := dsc.nodeShouldRunDaemonPod(node, ds) scheduled := len(nodeToDaemonPods[node.Name]) > 0 if shouldRun { @@ -1092,7 +1118,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL pod := daemonPods[0] if podutil.IsPodReady(pod) { numberReady++ - if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) { + if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { numberAvailable++ } } @@ -1127,9 +1153,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *apps.DaemonSet, nodeL } func (dsc *DaemonSetsController) syncDaemonSet(key string) error { - startTime := time.Now() + startTime := dsc.failedPodsBackoff.Clock.Now() + defer func() { - klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Since(startTime)) + klog.V(4).Infof("Finished syncing daemon set %q (%v)", key, dsc.failedPodsBackoff.Clock.Now().Sub(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -1222,39 +1249,39 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { // * shouldContinueRunning: // Returns true when a daemonset should continue running on a node if a daemonset pod is already // running on that node. -func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool, error) { +func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *v1.Node, ds *apps.DaemonSet) (bool, bool) { pod := NewPod(ds, node.Name) // If the daemon set specifies a node name, check that it matches with node.Name. if !(ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == node.Name) { - return false, false, nil + return false, false } taints := node.Spec.Taints fitsNodeName, fitsNodeAffinity, fitsTaints := Predicates(pod, node, taints) if !fitsNodeName || !fitsNodeAffinity { - return false, false, nil + return false, false } if !fitsTaints { // Scheduled daemon pods should continue running if they tolerate NoExecute taint. - _, untolerated := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool { + _, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool { return t.Effect == v1.TaintEffectNoExecute }) - return false, !untolerated, nil + return false, !hasUntoleratedTaint } - return true, true, nil + return true, true } // Predicates checks if a DaemonSet's pod can run on a node. func Predicates(pod *v1.Pod, node *v1.Node, taints []v1.Taint) (fitsNodeName, fitsNodeAffinity, fitsTaints bool) { fitsNodeName = len(pod.Spec.NodeName) == 0 || pod.Spec.NodeName == node.Name fitsNodeAffinity = pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) - _, untolerated := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool { + _, hasUntoleratedTaint := v1helper.FindMatchingUntoleratedTaint(taints, pod.Spec.Tolerations, func(t *v1.Taint) bool { return t.Effect == v1.TaintEffectNoExecute || t.Effect == v1.TaintEffectNoSchedule }) - fitsTaints = !untolerated + fitsTaints = !hasUntoleratedTaint return } diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index ee3f555ccb7b..1b5c27207bcc 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -33,9 +33,11 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage/names" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" core "k8s.io/client-go/testing" @@ -43,11 +45,14 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" + featuregatetesting "k8s.io/component-base/featuregate/testing" "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/scheduling" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/daemon/util" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/securitycontext" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -120,7 +125,7 @@ func newDaemonSet(name string) *apps.DaemonSet { } } -func newRollbackStrategy() *apps.DaemonSetUpdateStrategy { +func newRollingUpdateStrategy() *apps.DaemonSetUpdateStrategy { one := intstr.FromInt(1) return &apps.DaemonSetUpdateStrategy{ Type: apps.RollingUpdateDaemonSetStrategyType, @@ -135,7 +140,7 @@ func newOnDeleteStrategy() *apps.DaemonSetUpdateStrategy { } func updateStrategies() []*apps.DaemonSetUpdateStrategy { - return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollbackStrategy()} + return []*apps.DaemonSetUpdateStrategy{newOnDeleteStrategy(), newRollingUpdateStrategy()} } func newNode(name string, label map[string]string) *v1.Node { @@ -378,7 +383,7 @@ func validateSyncDaemonSets(manager *daemonSetsController, fakePodControl *fakeP return fmt.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.Templates)) } if len(fakePodControl.DeletePodName) != expectedDeletes { - return fmt.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.DeletePodName)) + return fmt.Errorf("Unexpected number of deletes. Expected %d, got %v\n", expectedDeletes, fakePodControl.DeletePodName) } if len(manager.fakeRecorder.Events) != expectedEvents { return fmt.Errorf("Unexpected number of events. Expected %d, saw %d\n", expectedEvents, len(manager.fakeRecorder.Events)) @@ -402,23 +407,22 @@ func validateSyncDaemonSets(manager *daemonSetsController, fakePodControl *fakeP return nil } -func syncAndValidateDaemonSets(manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) error { +func expectSyncDaemonSets(t *testing.T, manager *daemonSetsController, ds *apps.DaemonSet, podControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) { + t.Helper() key, err := controller.KeyFunc(ds) if err != nil { - return fmt.Errorf("could not get key for daemon") + t.Fatal("could not get key for daemon") } err = manager.syncHandler(key) if err != nil { - klog.Warning(err) + t.Log(err) } err = validateSyncDaemonSets(manager, podControl, expectedCreates, expectedDeletes, expectedEvents) if err != nil { - return err + t.Fatal(err) } - - return nil } // clearExpectations copies the FakePodControl to PodStore and clears the create and delete expectations. @@ -431,6 +435,39 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *apps.Dae return } manager.expectations.DeleteExpectations(key) + + now := manager.failedPodsBackoff.Clock.Now() + hash, _ := currentDSHash(manager, ds) + // log all the pods in the store + var lines []string + for _, obj := range manager.podStore.List() { + pod := obj.(*v1.Pod) + if pod.CreationTimestamp.IsZero() { + pod.CreationTimestamp.Time = now + } + var readyLast time.Time + ready := podutil.IsPodReady(pod) + if ready { + if c := podutil.GetPodReadyCondition(pod.Status); c != nil { + readyLast = c.LastTransitionTime.Time.Add(time.Duration(ds.Spec.MinReadySeconds) * time.Second) + } + } + nodeName, _ := util.GetTargetNodeName(pod) + + lines = append(lines, fmt.Sprintf("node=%s current=%-5t ready=%-5t age=%-4d pod=%s now=%d available=%d", + nodeName, + hash == pod.Labels[apps.ControllerRevisionHashLabelKey], + ready, + now.Unix(), + pod.Name, + pod.CreationTimestamp.Unix(), + readyLast.Unix(), + )) + } + sort.Strings(lines) + for _, line := range lines { + klog.Info(line) + } } func TestDeleteFinalStateUnknown(t *testing.T) { @@ -669,10 +706,7 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) { t.Error(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) } } @@ -692,10 +726,7 @@ func TestSimpleDaemonSetScheduleDaemonSetPodsLaunchesPods(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, nodeNum, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, nodeNum, 0, 0) if len(podControl.podIDMap) != nodeNum { t.Fatalf("failed to create pods for DaemonSet") @@ -773,10 +804,7 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0) expectedLimit := 0 for pass := uint8(0); expectedLimit <= podControl.FakePodControl.CreateLimit; pass++ { @@ -805,10 +833,7 @@ func TestDaemonSetPodCreateExpectationsError(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, podControl.FakePodControl.CreateLimit, 0, 0) dsKey, err := controller.KeyFunc(ds) if err != nil { @@ -843,10 +868,7 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) { manager.dsStore.Add(ds) addNodes(manager.nodeStore, 0, 5, nil) - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) // Make sure the single sync() updated Status already for the change made // during the manage() phase. @@ -870,11 +892,7 @@ func TestNoNodesDoesNothing(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } - + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -897,10 +915,7 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -927,10 +942,7 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -999,15 +1011,9 @@ func TestInsufficientCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T) } switch strategy.Type { case apps.OnDeleteDaemonSetStrategyType: - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) case apps.RollingUpdateDaemonSetStrategyType: - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) default: t.Fatalf("unexpected UpdateStrategy %+v", strategy) } @@ -1040,10 +1046,7 @@ func TestInsufficientCapacityNodeSufficientCapacityWithNodeLabelDaemonLaunchPod( if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) // we do not expect any event for insufficient free resource if len(manager.fakeRecorder.Events) != 0 { t.Fatalf("unexpected events, got %v, expected %v: %+v", len(manager.fakeRecorder.Events), 0, manager.fakeRecorder.Events) @@ -1073,10 +1076,7 @@ func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1109,10 +1109,7 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1144,10 +1141,7 @@ func TestDontDoAnythingIfBeingDeletedRace(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1186,10 +1180,7 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1234,10 +1225,7 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1287,10 +1275,7 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 1) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 1) } } @@ -1312,10 +1297,7 @@ func TestDealsWithExistingPods(t *testing.T) { addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 2) addPods(manager.podStore, "node-3", simpleDaemonSetLabel, ds, 5) addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, ds, 2) - err = syncAndValidateDaemonSets(manager, ds, podControl, 2, 5, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 2, 5, 0) } } @@ -1335,10 +1317,7 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, daemon, podControl, 3, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, daemon, podControl, 3, 0, 0) } } @@ -1362,10 +1341,7 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) { addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 3) addPods(manager.podStore, "node-1", simpleDaemonSetLabel2, ds, 1) addPods(manager.podStore, "node-4", simpleDaemonSetLabel, ds, 1) - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 4, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 5, 4, 0) } } @@ -1393,10 +1369,7 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) { addPods(manager.podStore, "node-7", simpleDaemonSetLabel2, ds, 4) addPods(manager.podStore, "node-9", simpleDaemonSetLabel, ds, 1) addPods(manager.podStore, "node-9", simpleDaemonSetLabel2, ds, 1) - err = syncAndValidateDaemonSets(manager, ds, podControl, 3, 20, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 3, 20, 0) } } @@ -1416,10 +1389,7 @@ func TestBadSelectorDaemonDoesNothing(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1438,10 +1408,7 @@ func TestNameDaemonSetLaunchesPods(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1460,10 +1427,7 @@ func TestBadNameDaemonSetDoesNothing(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1484,10 +1448,7 @@ func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1508,10 +1469,7 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1529,10 +1487,7 @@ func TestSelectorDaemonSetLaunchesPods(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 3, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 3, 0, 0) } // Daemon with node affinity should launch pods on nodes matching affinity. @@ -1568,10 +1523,7 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, daemon, podControl, 3, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, daemon, podControl, 3, 0, 0) } } @@ -1601,10 +1553,7 @@ func TestNumberReadyStatus(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) if updated.Status.NumberReady != 0 { t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status) } @@ -1616,10 +1565,7 @@ func TestNumberReadyStatus(t *testing.T) { pod.Status.Conditions = append(pod.Status.Conditions, condition) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) if updated.Status.NumberReady != 2 { t.Errorf("Wrong daemon %s status: %v", updated.Name, updated.Status) } @@ -1653,10 +1599,7 @@ func TestObservedGeneration(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) if updated.Status.ObservedGeneration != ds.Generation { t.Errorf("Wrong ObservedGeneration for daemon %s in status. Expected %d, got %d", updated.Name, ds.Generation, updated.Status.ObservedGeneration) } @@ -1691,10 +1634,7 @@ func TestDaemonKillFailedPods(t *testing.T) { addNodes(manager.nodeStore, 0, 1, nil) addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods) addPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numNormalPods) - err = syncAndValidateDaemonSets(manager, ds, podControl, test.expectedCreates, test.expectedDeletes, test.expectedEvents) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, test.expectedCreates, test.expectedDeletes, test.expectedEvents) } }) } @@ -1731,10 +1671,7 @@ func TestDaemonKillFailedPodsBackoff(t *testing.T) { backoffKey := failedPodsBackoffKey(ds, nodeName) // First sync will delete the pod, initializing backoff - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 1, 1) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 1, 1) initialDelay := manager.failedPodsBackoff.Get(backoffKey) if initialDelay <= 0 { t.Fatal("Initial delay is expected to be set.") @@ -1743,10 +1680,7 @@ func TestDaemonKillFailedPodsBackoff(t *testing.T) { resetCounters(manager) // Immediate (second) sync gets limited by the backoff - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) delay := manager.failedPodsBackoff.Get(backoffKey) if delay != initialDelay { t.Fatal("Backoff delay shouldn't be raised while waiting.") @@ -1770,10 +1704,7 @@ func TestDaemonKillFailedPodsBackoff(t *testing.T) { } // After backoff time, it will delete the failed pod - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 1, 1) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 1, 1) }) } } @@ -1804,10 +1735,7 @@ func TestNoScheduleTaintedDoesntEvicitRunningIntolerantPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1837,10 +1765,7 @@ func TestNoExecuteTaintedDoesEvicitRunningIntolerantPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 1, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 1, 0) } } @@ -1865,10 +1790,7 @@ func TestTaintedNodeDaemonDoesNotLaunchIntolerantPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -1894,10 +1816,7 @@ func TestTaintedNodeDaemonLaunchesToleratePod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1925,10 +1844,7 @@ func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1956,10 +1872,7 @@ func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -1979,10 +1892,7 @@ func TestNodeDaemonLaunchesToleratePod(t *testing.T) { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -2008,10 +1918,7 @@ func TestDaemonSetRespectsTermination(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } } @@ -2053,10 +1960,7 @@ func TestTaintPressureNodeDaemonLaunchesPod(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) } } @@ -2079,7 +1983,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { nodeUnschedulable bool ds *apps.DaemonSet shouldRun, shouldContinueRunning bool - err error }{ { predicateName: "ShouldRunDaemonPod", @@ -2358,7 +2261,7 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { manager.podNodeIndex.Add(p) } c.ds.Spec.UpdateStrategy = *strategy - shouldRun, shouldContinueRunning, err := manager.nodeShouldRunDaemonPod(node, c.ds) + shouldRun, shouldContinueRunning := manager.nodeShouldRunDaemonPod(node, c.ds) if shouldRun != c.shouldRun { t.Errorf("[%v] strategy: %v, predicateName: %v expected shouldRun: %v, got: %v", i, c.ds.Spec.UpdateStrategy.Type, c.predicateName, c.shouldRun, shouldRun) @@ -2366,9 +2269,6 @@ func TestNodeShouldRunDaemonPod(t *testing.T) { if shouldContinueRunning != c.shouldContinueRunning { t.Errorf("[%v] strategy: %v, predicateName: %v expected shouldContinueRunning: %v, got: %v", i, c.ds.Spec.UpdateStrategy.Type, c.predicateName, c.shouldContinueRunning, shouldContinueRunning) } - if err != c.err { - t.Errorf("[%v] strategy: %v, predicateName: %v expected err: %v, got: %v", i, c.predicateName, c.ds.Spec.UpdateStrategy.Type, c.err, err) - } } } } @@ -2475,10 +2375,7 @@ func TestUpdateNode(t *testing.T) { if c.expectedCreates != nil { expectedCreates = c.expectedCreates() } - err = syncAndValidateDaemonSets(manager, c.ds, podControl, expectedCreates, 0, expectedEvents) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, c.ds, podControl, expectedCreates, 0, expectedEvents) manager.enqueueDaemonSet = func(ds *apps.DaemonSet) { if ds.Name == "ds" { @@ -2658,10 +2555,7 @@ func TestDeleteNoDaemonPod(t *testing.T) { } switch strategy.Type { case apps.OnDeleteDaemonSetStrategyType, apps.RollingUpdateDaemonSetStrategyType: - err = syncAndValidateDaemonSets(manager, c.ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, c.ds, podControl, 1, 0, 0) default: t.Fatalf("unexpected UpdateStrategy %+v", strategy) } @@ -2713,10 +2607,7 @@ func TestDeleteUnscheduledPodForNotExistingNode(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 1, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 1, 0) } } @@ -3186,3 +3077,237 @@ func getQueuedKeys(queue workqueue.RateLimitingInterface) []string { sort.Strings(keys) return keys } + +// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods. +func TestSurgeDealsWithExistingPods(t *testing.T) { + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1) + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 2) + addPods(manager.podStore, "node-3", simpleDaemonSetLabel, ds, 5) + addPods(manager.podStore, "node-4", simpleDaemonSetLabel2, ds, 2) + expectSyncDaemonSets(t, manager, ds, podControl, 2, 5, 0) +} + +func TestSurgePreservesReadyOldPods(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // will be preserved because it's the current hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + manager.podStore.Add(pod) + + // will be preserved because it's the oldest AND it is ready + pod = newPod("node-1-old-", "node-1", simpleDaemonSetLabel, ds) + delete(pod.Labels, apps.ControllerRevisionHashLabelKey) + pod.CreationTimestamp.Time = time.Unix(50, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(pod) + + // will be deleted because it's not the oldest, even though it is ready + oldReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 1, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldReadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} + +func TestSurgeCreatesNewPodWhenAtMaxSurgeAndOldPodDeleted(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // will be preserved because it has the newest hash, and is also consuming the surge budget + pod := newPod("node-0-", "node-0", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}} + manager.podStore.Add(pod) + + // will be preserved because it is ready + oldPodReady := newPod("node-0-old-ready-", "node-0", simpleDaemonSetLabel, ds) + delete(oldPodReady.Labels, apps.ControllerRevisionHashLabelKey) + oldPodReady.CreationTimestamp.Time = time.Unix(50, 0) + oldPodReady.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldPodReady) + + // create old ready pods on all other nodes + for i := 1; i < 5; i++ { + oldPod := newPod(fmt.Sprintf("node-%d-preserve-", i), fmt.Sprintf("node-%d", i), simpleDaemonSetLabel, ds) + delete(oldPod.Labels, apps.ControllerRevisionHashLabelKey) + oldPod.CreationTimestamp.Time = time.Unix(1, 0) + oldPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldPod) + + // mark the last old pod as deleted, which should trigger a creation above surge + if i == 4 { + thirty := int64(30) + timestamp := metav1.Time{Time: time.Unix(1+thirty, 0)} + oldPod.DeletionGracePeriodSeconds = &thirty + oldPod.DeletionTimestamp = ×tamp + } + } + + // controller should detect that node-4 has only a deleted pod + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) + clearExpectations(t, manager, ds, podControl) +} + +func TestSurgeDeletesUnreadyOldPods(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // will be preserved because it has the newest hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + manager.podStore.Add(pod) + + // will be deleted because it is unready + oldUnreadyPod := newPod("node-1-old-unready-", "node-1", simpleDaemonSetLabel, ds) + delete(oldUnreadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldUnreadyPod.CreationTimestamp.Time = time.Unix(50, 0) + oldUnreadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionFalse}} + manager.podStore.Add(oldUnreadyPod) + + // will be deleted because it is not the oldest + oldReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 2, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldReadyPod.Name, oldUnreadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} + +func TestSurgePreservesOldReadyWithUnsatisfiedMinReady(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.MinReadySeconds = 15 + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // the clock will be set 10s after the newest pod on node-1 went ready, which is not long enough to be available + manager.DaemonSetsController.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(50+10, 0)) + + // will be preserved because it has the newest hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: time.Unix(50, 0)}}} + manager.podStore.Add(pod) + + // will be preserved because it is ready AND the newest pod is not yet available for long enough + oldReadyPod := newPod("node-1-old-ready-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(50, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + // will be deleted because it is not the oldest + oldExcessReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldExcessReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldExcessReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldExcessReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldExcessReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 1, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldExcessReadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} + +func TestSurgeDeletesOldReadyWithUnsatisfiedMinReady(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + ds.Spec.MinReadySeconds = 15 + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(1)) + manager, podControl, _, err := newTestController(ds) + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + manager.dsStore.Add(ds) + addNodes(manager.nodeStore, 0, 5, nil) + + // the clock will be set 20s after the newest pod on node-1 went ready, which is not long enough to be available + manager.DaemonSetsController.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(50+20, 0)) + + // will be preserved because it has the newest hash + pod := newPod("node-1-", "node-1", simpleDaemonSetLabel, ds) + pod.CreationTimestamp.Time = time.Unix(100, 0) + pod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: metav1.Time{Time: time.Unix(50, 0)}}} + manager.podStore.Add(pod) + + // will be preserved because it is ready AND the newest pod is not yet available for long enough + oldReadyPod := newPod("node-1-old-ready-", "node-1", simpleDaemonSetLabel, ds) + delete(oldReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldReadyPod.CreationTimestamp.Time = time.Unix(50, 0) + oldReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldReadyPod) + + // will be deleted because it is not the oldest + oldExcessReadyPod := newPod("node-1-delete-", "node-1", simpleDaemonSetLabel, ds) + delete(oldExcessReadyPod.Labels, apps.ControllerRevisionHashLabelKey) + oldExcessReadyPod.CreationTimestamp.Time = time.Unix(60, 0) + oldExcessReadyPod.Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} + manager.podStore.Add(oldExcessReadyPod) + + addPods(manager.podStore, "node-2", simpleDaemonSetLabel, ds, 1) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 2, 0) + + actual := sets.NewString(podControl.DeletePodName...) + expected := sets.NewString(oldExcessReadyPod.Name, oldReadyPod.Name) + if !actual.Equal(expected) { + t.Errorf("unexpected deletes\nexpected: %v\n actual: %v", expected.List(), actual.List()) + } +} diff --git a/pkg/controller/daemon/update.go b/pkg/controller/daemon/update.go index c8a67c35bcae..cd2ee0c72f1f 100644 --- a/pkg/controller/daemon/update.go +++ b/pkg/controller/daemon/update.go @@ -26,12 +26,11 @@ import ( "k8s.io/klog/v2" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - intstrutil "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/json" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" @@ -39,44 +38,197 @@ import ( labelsutil "k8s.io/kubernetes/pkg/util/labels" ) -// rollingUpdate deletes old daemon set pods making sure that no more than -// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable +// rollingUpdate identifies the set of old pods to delete, or additional pods to create on nodes, +// remaining within the constraints imposed by the update strategy. func (dsc *DaemonSetsController) rollingUpdate(ds *apps.DaemonSet, nodeList []*v1.Node, hash string) error { nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) } - - _, oldPods := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods, hash) - maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeList, nodeToDaemonPods) + maxSurge, maxUnavailable, err := dsc.updatedDesiredNodeCounts(ds, nodeList, nodeToDaemonPods) if err != nil { return fmt.Errorf("couldn't get unavailable numbers: %v", err) } - oldAvailablePods, oldUnavailablePods := util.SplitByAvailablePods(ds.Spec.MinReadySeconds, oldPods) - // for oldPods delete all not running pods + now := dsc.failedPodsBackoff.Clock.Now() + + // When not surging, we delete just enough pods to stay under the maxUnavailable limit, if any + // are necessary, and let the core loop create new instances on those nodes. + // + // Assumptions: + // * Expect manage loop to allow no more than one pod per node + // * Expect manage loop will create new pods + // * Expect manage loop will handle failed pods + // * Deleted pods do not count as unavailable so that updates make progress when nodes are down + // Invariants: + // * The number of new pods that are unavailable must be less than maxUnavailable + // * A node with an available old pod is a candidate for deletion if it does not violate other invariants + // + if maxSurge == 0 { + var numUnavailable int + var allowedReplacementPods []string + var candidatePodsToDelete []string + for nodeName, pods := range nodeToDaemonPods { + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) + if !ok { + // let the manage loop clean up this node, and treat it as an unavailable node + klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) + numUnavailable++ + continue + } + switch { + case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil: + // the manage loop will handle creating or deleting the appropriate pod, consider this unavailable + numUnavailable++ + case newPod != nil: + // this pod is up to date, check its availability + if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { + // an unavailable new pod is counted against maxUnavailable + numUnavailable++ + } + default: + // this pod is old, it is an update candidate + switch { + case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): + // the old pod isn't available, so it needs to be replaced + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the replacement + if allowedReplacementPods == nil { + allowedReplacementPods = make([]string, 0, len(nodeToDaemonPods)) + } + allowedReplacementPods = append(allowedReplacementPods, oldPod.Name) + case numUnavailable >= maxUnavailable: + // no point considering any other candidates + continue + default: + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a candidate to replace", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the candidate + if candidatePodsToDelete == nil { + candidatePodsToDelete = make([]string, 0, maxUnavailable) + } + candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name) + } + } + } + + // use any of the candidates we can, including the allowedReplacemnntPods + klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, up to %d unavailable, %d new are unavailable, %d candidates", ds.Namespace, ds.Name, len(allowedReplacementPods), maxUnavailable, numUnavailable, len(candidatePodsToDelete)) + remainingUnavailable := maxUnavailable - numUnavailable + if remainingUnavailable < 0 { + remainingUnavailable = 0 + } + if max := len(candidatePodsToDelete); remainingUnavailable > max { + remainingUnavailable = max + } + oldPodsToDelete := append(allowedReplacementPods, candidatePodsToDelete[:remainingUnavailable]...) + + return dsc.syncNodes(ds, oldPodsToDelete, nil, hash) + } + + // When surging, we create new pods whenever an old pod is unavailable, and we can create up + // to maxSurge extra pods + // + // Assumptions: + // * Expect manage loop to allow no more than two pods per node, one old, one new + // * Expect manage loop will create new pods if there are no pods on node + // * Expect manage loop will handle failed pods + // * Deleted pods do not count as unavailable so that updates make progress when nodes are down + // Invariants: + // * A node with an unavailable old pod is a candidate for immediate new pod creation + // * An old available pod is deleted if a new pod is available + // * No more than maxSurge new pods are created for old available pods at any one time + // var oldPodsToDelete []string - klog.V(4).Infof("Marking all unavailable old pods for deletion") - for _, pod := range oldUnavailablePods { - // Skip terminating pods. We won't delete them again - if pod.DeletionTimestamp != nil { + var candidateNewNodes []string + var allowedNewNodes []string + var numSurge int + + for nodeName, pods := range nodeToDaemonPods { + newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash) + if !ok { + // let the manage loop clean up this node, and treat it as a surge node + klog.V(3).Infof("DaemonSet %s/%s has excess pods on node %s, skipping to allow the core loop to process", ds.Namespace, ds.Name, nodeName) + numSurge++ continue } - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) + switch { + case oldPod == nil: + // we don't need to do anything to this node, the manage loop will handle it + case newPod == nil: + // this is a surge candidate + switch { + case !podutil.IsPodAvailable(oldPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}): + // the old pod isn't available, allow it to become a replacement + klog.V(5).Infof("Pod %s on node %s is out of date and not available, allowing replacement", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the replacement + if allowedNewNodes == nil { + allowedNewNodes = make([]string, 0, len(nodeToDaemonPods)) + } + allowedNewNodes = append(allowedNewNodes, nodeName) + case numSurge >= maxSurge: + // no point considering any other candidates + continue + default: + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is out of date, this is a surge candidate", ds.Namespace, ds.Name, oldPod.Name, nodeName) + // record the candidate + if candidateNewNodes == nil { + candidateNewNodes = make([]string, 0, maxSurge) + } + candidateNewNodes = append(candidateNewNodes, nodeName) + } + default: + // we have already surged onto this node, determine our state + if !podutil.IsPodAvailable(newPod, ds.Spec.MinReadySeconds, metav1.Time{Time: now}) { + // we're waiting to go available here + numSurge++ + continue + } + // we're available, delete the old pod + klog.V(5).Infof("DaemonSet %s/%s pod %s on node %s is available, remove %s", ds.Namespace, ds.Name, newPod.Name, nodeName, oldPod.Name) + oldPodsToDelete = append(oldPodsToDelete, oldPod.Name) + } } - klog.V(4).Infof("Marking old pods for deletion") - for _, pod := range oldAvailablePods { - if numUnavailable >= maxUnavailable { - klog.V(4).Infof("Number of unavailable DaemonSet pods: %d, is equal to or exceeds allowed maximum: %d", numUnavailable, maxUnavailable) - break + // use any of the candidates we can, including the allowedNewNodes + klog.V(5).Infof("DaemonSet %s/%s allowing %d replacements, surge up to %d, %d are in progress, %d candidates", ds.Namespace, ds.Name, len(allowedNewNodes), maxSurge, numSurge, len(candidateNewNodes)) + remainingSurge := maxSurge - numSurge + if remainingSurge < 0 { + remainingSurge = 0 + } + if max := len(candidateNewNodes); remainingSurge > max { + remainingSurge = max + } + newNodesToCreate := append(allowedNewNodes, candidateNewNodes[:remainingSurge]...) + + return dsc.syncNodes(ds, oldPodsToDelete, newNodesToCreate, hash) +} + +// findUpdatedPodsOnNode looks at non-deleted pods on a given node and returns true if there +// is at most one of each old and new pods, or false if there are multiples. We can skip +// processing the particular node in those scenarios and let the manage loop prune the +// excess pods for our next time around. +func findUpdatedPodsOnNode(ds *apps.DaemonSet, podsOnNode []*v1.Pod, hash string) (newPod, oldPod *v1.Pod, ok bool) { + for _, pod := range podsOnNode { + if pod.DeletionTimestamp != nil { + continue + } + generation, err := util.GetTemplateGeneration(ds) + if err != nil { + generation = nil + } + if util.IsPodUpdated(pod, hash, generation) { + if newPod != nil { + return nil, nil, false + } + newPod = pod + } else { + if oldPod != nil { + return nil, nil, false + } + oldPod = pod } - klog.V(4).Infof("Marking pod %s/%s for deletion", ds.Name, pod.Name) - oldPodsToDelete = append(oldPodsToDelete, pod.Name) - numUnavailable++ } - return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) + return newPod, oldPod, true } // constructHistory finds all histories controlled by the given DaemonSet, and @@ -363,64 +515,41 @@ func (dsc *DaemonSetsController) snapshot(ds *apps.DaemonSet, revision int64) (* return history, err } -func (dsc *DaemonSetsController) getAllDaemonSetPods(ds *apps.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod, hash string) ([]*v1.Pod, []*v1.Pod) { - var newPods []*v1.Pod - var oldPods []*v1.Pod - - for _, pods := range nodeToDaemonPods { - for _, pod := range pods { - // If the returned error is not nil we have a parse error. - // The controller handles this via the hash. - generation, err := util.GetTemplateGeneration(ds) - if err != nil { - generation = nil - } - if util.IsPodUpdated(pod, hash, generation) { - newPods = append(newPods, pod) - } else { - oldPods = append(oldPods, pod) - } - } - } - return newPods, oldPods -} - -func (dsc *DaemonSetsController) getUnavailableNumbers(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { - klog.V(4).Infof("Getting unavailable numbers") - var numUnavailable, desiredNumberScheduled int +// updatedDesiredNodeCounts calculates the true number of allowed unavailable or surge pods and +// updates the nodeToDaemonPods array to include an empty array for every node that is not scheduled. +func (dsc *DaemonSetsController) updatedDesiredNodeCounts(ds *apps.DaemonSet, nodeList []*v1.Node, nodeToDaemonPods map[string][]*v1.Pod) (int, int, error) { + var desiredNumberScheduled int for i := range nodeList { node := nodeList[i] - wantToRun, _, err := dsc.nodeShouldRunDaemonPod(node, ds) - if err != nil { - return -1, -1, err - } + wantToRun, _ := dsc.nodeShouldRunDaemonPod(node, ds) if !wantToRun { continue } desiredNumberScheduled++ - daemonPods, exists := nodeToDaemonPods[node.Name] - if !exists { - numUnavailable++ - continue - } - available := false - for _, pod := range daemonPods { - //for the purposes of update we ensure that the Pod is both available and not terminating - if podutil.IsPodAvailable(pod, ds.Spec.MinReadySeconds, metav1.Now()) && pod.DeletionTimestamp == nil { - available = true - break - } - } - if !available { - numUnavailable++ + + if _, exists := nodeToDaemonPods[node.Name]; !exists { + nodeToDaemonPods[node.Name] = nil } } - maxUnavailable, err := intstrutil.GetScaledValueFromIntOrPercent(ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable, desiredNumberScheduled, true) + + maxUnavailable, err := util.UnavailableCount(ds, desiredNumberScheduled) if err != nil { return -1, -1, fmt.Errorf("invalid value for MaxUnavailable: %v", err) } - klog.V(4).Infof(" DaemonSet %s/%s, maxUnavailable: %d, numUnavailable: %d", ds.Namespace, ds.Name, maxUnavailable, numUnavailable) - return maxUnavailable, numUnavailable, nil + + maxSurge, err := util.SurgeCount(ds, desiredNumberScheduled) + if err != nil { + return -1, -1, fmt.Errorf("invalid value for MaxSurge: %v", err) + } + + // if the daemonset returned with an impossible configuration, obey the default of unavailable=1 (in the + // event the apiserver returns 0 for both surge and unavailability) + if desiredNumberScheduled > 0 && maxUnavailable == 0 && maxSurge == 0 { + klog.Warningf("DaemonSet %s/%s is not configured for surge or unavailability, defaulting to accepting unavailability", ds.Namespace, ds.Name) + maxUnavailable = 1 + } + klog.V(5).Infof("DaemonSet %s/%s, maxSurge: %d, maxUnavailable: %d", ds.Namespace, ds.Name, maxSurge, maxUnavailable) + return maxSurge, maxUnavailable, nil } type historiesByRevision []*apps.ControllerRevision diff --git a/pkg/controller/daemon/update_test.go b/pkg/controller/daemon/update_test.go index ffbd58158955..5379f667d57b 100644 --- a/pkg/controller/daemon/update_test.go +++ b/pkg/controller/daemon/update_test.go @@ -18,12 +18,20 @@ package daemon import ( "testing" + "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/clock" "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" + podutil "k8s.io/kubernetes/pkg/api/v1/pod" + "k8s.io/kubernetes/pkg/controller/daemon/util" + "k8s.io/kubernetes/pkg/features" ) func TestDaemonSetUpdatesPods(t *testing.T) { @@ -34,67 +42,79 @@ func TestDaemonSetUpdatesPods(t *testing.T) { } maxUnavailable := 2 addNodes(manager.nodeStore, 0, 5, nil) - err = manager.dsStore.Add(ds) - if err != nil { - t.Fatal(err) - } - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) - if err != nil { - t.Error(err) - } + manager.dsStore.Add(ds) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) markPodsReady(podControl.podStore) ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" ds.Spec.UpdateStrategy.Type = apps.RollingUpdateDaemonSetStrategyType intStr := intstr.FromInt(maxUnavailable) ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} - err = manager.dsStore.Update(ds) - if err != nil { - t.Fatal(err) - } + manager.dsStore.Update(ds) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, maxUnavailable, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, maxUnavailable, 0) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, maxUnavailable, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, maxUnavailable, 0, 0) markPodsReady(podControl.podStore) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, maxUnavailable, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, maxUnavailable, 0) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, maxUnavailable, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, maxUnavailable, 0, 0) markPodsReady(podControl.podStore) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 1, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 1, 0) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 1, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) markPodsReady(podControl.podStore) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + clearExpectations(t, manager, ds, podControl) +} + +func TestDaemonSetUpdatesPodsWithMaxSurge(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + manager, podControl, _, err := newTestController(ds) if err != nil { - t.Error(err) + t.Fatalf("error creating DaemonSets controller: %v", err) } + addNodes(manager.nodeStore, 0, 5, nil) + manager.dsStore.Add(ds) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + markPodsReady(podControl.podStore) + + // surge is thhe controlling amount + maxSurge := 2 + ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(maxSurge)) + manager.dsStore.Update(ds) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, 0, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + markPodsReady(podControl.podStore) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, maxSurge, maxSurge, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + markPodsReady(podControl.podStore) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 5%maxSurge, maxSurge, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + markPodsReady(podControl.podStore) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 5%maxSurge, 0) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) } func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { @@ -109,10 +129,7 @@ func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) markPodsReady(podControl.podStore) ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" @@ -126,21 +143,13 @@ func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) { // new pods are not ready numUnavailable == maxUnavailable clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, maxUnavailable, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, maxUnavailable, 0) + clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, maxUnavailable, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, maxUnavailable, 0, 0) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) clearExpectations(t, manager, ds, podControl) } @@ -156,10 +165,7 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) { if err != nil { t.Fatal(err) } - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" ds.Spec.UpdateStrategy.Type = apps.RollingUpdateDaemonSetStrategyType @@ -172,22 +178,157 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) { // all old pods are unavailable so should be removed clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 5, 0) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 5, 0) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) + clearExpectations(t, manager, ds, podControl) +} + +func TestDaemonSetUpdatesAllOldPodsNotReadyMaxSurge(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, true)() + ds := newDaemonSet("foo") + manager, podControl, _, err := newTestController(ds) if err != nil { - t.Error(err) + t.Fatalf("error creating DaemonSets controller: %v", err) } + addNodes(manager.nodeStore, 0, 5, nil) + manager.dsStore.Add(ds) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + + maxSurge := 3 + ds.Spec.Template.Spec.Containers[0].Image = "foo2/bar2" + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(maxSurge)) + manager.dsStore.Update(ds) + + // all old pods are unavailable so should be surged + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(100, 0)) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) + + // waiting for pods to go ready, old pods are deleted + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(200, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 5, 0) + + setPodReadiness(t, manager, true, 5, func(_ *v1.Pod) bool { return true }) + ds.Spec.MinReadySeconds = 15 + ds.Spec.Template.Spec.Containers[0].Image = "foo3/bar3" + manager.dsStore.Update(ds) + + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(300, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 3, 0, 0) + + hash, err := currentDSHash(manager, ds) if err != nil { - t.Error(err) + t.Fatal(err) } + currentPods := podsByNodeMatchingHash(manager, hash) + // mark two updated pods as ready at time 300 + setPodReadiness(t, manager, true, 2, func(pod *v1.Pod) bool { + return pod.Labels[apps.ControllerRevisionHashLabelKey] == hash + }) + // mark one of the old pods that is on a node without an updated pod as unready + setPodReadiness(t, manager, false, 1, func(pod *v1.Pod) bool { + nodeName, err := util.GetTargetNodeName(pod) + if err != nil { + t.Fatal(err) + } + return pod.Labels[apps.ControllerRevisionHashLabelKey] != hash && len(currentPods[nodeName]) == 0 + }) + // the new pods should still be considered waiting to hit min readiness, so one pod should be created to replace + // the deleted old pod + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(310, 0)) clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) + expectSyncDaemonSets(t, manager, ds, podControl, 1, 0, 0) + + // the new pods are now considered available, so delete the old pods + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(320, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 1, 3, 0) + + // mark all updated pods as ready at time 320 + currentPods = podsByNodeMatchingHash(manager, hash) + setPodReadiness(t, manager, true, 3, func(pod *v1.Pod) bool { + return pod.Labels[apps.ControllerRevisionHashLabelKey] == hash + }) + + // the new pods are now considered available, so delete the old pods + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(340, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 2, 0) + + // controller has completed upgrade + manager.failedPodsBackoff.Clock = clock.NewFakeClock(time.Unix(350, 0)) + clearExpectations(t, manager, ds, podControl) + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) +} + +func podsByNodeMatchingHash(dsc *daemonSetsController, hash string) map[string][]string { + byNode := make(map[string][]string) + for _, obj := range dsc.podStore.List() { + pod := obj.(*v1.Pod) + if pod.Labels[apps.ControllerRevisionHashLabelKey] != hash { + continue + } + nodeName, err := util.GetTargetNodeName(pod) + if err != nil { + panic(err) + } + byNode[nodeName] = append(byNode[nodeName], pod.Name) + } + return byNode +} + +func setPodReadiness(t *testing.T, dsc *daemonSetsController, ready bool, count int, fn func(*v1.Pod) bool) { + t.Helper() + for _, obj := range dsc.podStore.List() { + if count <= 0 { + break + } + pod := obj.(*v1.Pod) + if pod.DeletionTimestamp != nil { + continue + } + if podutil.IsPodReady(pod) == ready { + continue + } + if !fn(pod) { + continue + } + condition := v1.PodCondition{Type: v1.PodReady} + if ready { + condition.Status = v1.ConditionTrue + } else { + condition.Status = v1.ConditionFalse + } + if !podutil.UpdatePodCondition(&pod.Status, &condition) { + t.Fatal("failed to update pod") + } + // TODO: workaround UpdatePodCondition calling time.Now() directly + setCondition := podutil.GetPodReadyCondition(pod.Status) + setCondition.LastTransitionTime.Time = dsc.failedPodsBackoff.Clock.Now() + klog.Infof("marked pod %s ready=%t", pod.Name, ready) + count-- + } + if count > 0 { + t.Fatalf("could not mark %d pods ready=%t", count, ready) + } +} + +func currentDSHash(dsc *daemonSetsController, ds *apps.DaemonSet) (string, error) { + // Construct histories of the DaemonSet, and get the hash of current history + cur, _, err := dsc.constructHistory(ds) if err != nil { - t.Error(err) + return "", err } - clearExpectations(t, manager, ds, podControl) + return cur.Labels[apps.DefaultDaemonSetUniqueLabelKey], nil + } func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { @@ -198,40 +339,50 @@ func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) { } maxUnavailable := 3 addNodes(manager.nodeStore, 0, 5, nil) - err = manager.dsStore.Add(ds) - if err != nil { - t.Fatal(err) - } - err = syncAndValidateDaemonSets(manager, ds, podControl, 5, 0, 0) - if err != nil { - t.Error(err) - } + manager.dsStore.Add(ds) + expectSyncDaemonSets(t, manager, ds, podControl, 5, 0, 0) ds.Spec.UpdateStrategy.Type = apps.RollingUpdateDaemonSetStrategyType intStr := intstr.FromInt(maxUnavailable) ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} - err = manager.dsStore.Update(ds) - if err != nil { - t.Fatal(err) - } + manager.dsStore.Update(ds) // template is not changed no pod should be removed clearExpectations(t, manager, ds, podControl) - err = syncAndValidateDaemonSets(manager, ds, podControl, 0, 0, 0) - if err != nil { - t.Error(err) - } + expectSyncDaemonSets(t, manager, ds, podControl, 0, 0, 0) clearExpectations(t, manager, ds, podControl) } +func newUpdateSurge(value intstr.IntOrString) apps.DaemonSetUpdateStrategy { + zero := intstr.FromInt(0) + return apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &apps.RollingUpdateDaemonSet{ + MaxUnavailable: &zero, + MaxSurge: &value, + }, + } +} + +func newUpdateUnavailable(value intstr.IntOrString) apps.DaemonSetUpdateStrategy { + return apps.DaemonSetUpdateStrategy{ + Type: apps.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &apps.RollingUpdateDaemonSet{ + MaxUnavailable: &value, + }, + } +} + func TestGetUnavailableNumbers(t *testing.T) { cases := []struct { name string Manager *daemonSetsController ds *apps.DaemonSet nodeToPods map[string][]*v1.Pod + enableSurge bool + maxSurge int maxUnavailable int - numUnavailable int + emptyNodes int Err error }{ { @@ -245,13 +396,12 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromInt(0) - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0)) return ds }(), nodeToPods: make(map[string][]*v1.Pod), maxUnavailable: 0, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes with ready pods", @@ -265,8 +415,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromInt(1) - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(1)) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -280,7 +429,7 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 0, + emptyNodes: 0, }, { name: "Two nodes, one node without pods", @@ -294,8 +443,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromInt(0) - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromInt(0)) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -305,8 +453,33 @@ func TestGetUnavailableNumbers(t *testing.T) { mapping["node-0"] = []*v1.Pod{pod0} return mapping }(), - maxUnavailable: 0, - numUnavailable: 1, + maxUnavailable: 1, + emptyNodes: 1, + }, + { + name: "Two nodes, one node without pods, surge", + Manager: func() *daemonSetsController { + manager, _, _, err := newTestController() + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 2, nil) + return manager + }(), + ds: func() *apps.DaemonSet { + ds := newDaemonSet("x") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromInt(0)) + return ds + }(), + nodeToPods: func() map[string][]*v1.Pod { + mapping := make(map[string][]*v1.Pod) + pod0 := newPod("pod-0", "node-0", simpleDaemonSetLabel, nil) + markPodReady(pod0) + mapping["node-0"] = []*v1.Pod{pod0} + return mapping + }(), + maxUnavailable: 1, + emptyNodes: 1, }, { name: "Two nodes with pods, MaxUnavailable in percents", @@ -320,8 +493,7 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromString("50%") - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%")) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -335,10 +507,10 @@ func TestGetUnavailableNumbers(t *testing.T) { return mapping }(), maxUnavailable: 1, - numUnavailable: 0, + emptyNodes: 0, }, { - name: "Two nodes with pods, MaxUnavailable in percents, pod terminating", + name: "Two nodes with pods, MaxUnavailable in percents, surge", Manager: func() *daemonSetsController { manager, _, _, err := newTestController() if err != nil { @@ -349,8 +521,67 @@ func TestGetUnavailableNumbers(t *testing.T) { }(), ds: func() *apps.DaemonSet { ds := newDaemonSet("x") - intStr := intstr.FromString("50%") - ds.Spec.UpdateStrategy.RollingUpdate = &apps.RollingUpdateDaemonSet{MaxUnavailable: &intStr} + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("50%")) + return ds + }(), + nodeToPods: func() map[string][]*v1.Pod { + mapping := make(map[string][]*v1.Pod) + pod0 := newPod("pod-0", "node-0", simpleDaemonSetLabel, nil) + pod1 := newPod("pod-1", "node-1", simpleDaemonSetLabel, nil) + markPodReady(pod0) + markPodReady(pod1) + mapping["node-0"] = []*v1.Pod{pod0} + mapping["node-1"] = []*v1.Pod{pod1} + return mapping + }(), + enableSurge: true, + maxSurge: 1, + maxUnavailable: 0, + emptyNodes: 0, + }, + { + name: "Two nodes with pods, MaxUnavailable is 100%, surge", + Manager: func() *daemonSetsController { + manager, _, _, err := newTestController() + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 2, nil) + return manager + }(), + ds: func() *apps.DaemonSet { + ds := newDaemonSet("x") + ds.Spec.UpdateStrategy = newUpdateSurge(intstr.FromString("100%")) + return ds + }(), + nodeToPods: func() map[string][]*v1.Pod { + mapping := make(map[string][]*v1.Pod) + pod0 := newPod("pod-0", "node-0", simpleDaemonSetLabel, nil) + pod1 := newPod("pod-1", "node-1", simpleDaemonSetLabel, nil) + markPodReady(pod0) + markPodReady(pod1) + mapping["node-0"] = []*v1.Pod{pod0} + mapping["node-1"] = []*v1.Pod{pod1} + return mapping + }(), + enableSurge: true, + maxSurge: 2, + maxUnavailable: 0, + emptyNodes: 0, + }, + { + name: "Two nodes with pods, MaxUnavailable in percents, pod terminating", + Manager: func() *daemonSetsController { + manager, _, _, err := newTestController() + if err != nil { + t.Fatalf("error creating DaemonSets controller: %v", err) + } + addNodes(manager.nodeStore, 0, 3, nil) + return manager + }(), + ds: func() *apps.DaemonSet { + ds := newDaemonSet("x") + ds.Spec.UpdateStrategy = newUpdateUnavailable(intstr.FromString("50%")) return ds }(), nodeToPods: func() map[string][]*v1.Pod { @@ -365,26 +596,41 @@ func TestGetUnavailableNumbers(t *testing.T) { mapping["node-1"] = []*v1.Pod{pod1} return mapping }(), - maxUnavailable: 1, - numUnavailable: 1, + maxUnavailable: 2, + emptyNodes: 1, }, } for _, c := range cases { - c.Manager.dsStore.Add(c.ds) - nodeList, err := c.Manager.nodeLister.List(labels.Everything()) - if err != nil { - t.Fatalf("error listing nodes: %v", err) - } - maxUnavailable, numUnavailable, err := c.Manager.getUnavailableNumbers(c.ds, nodeList, c.nodeToPods) - if err != nil && c.Err != nil { - if c.Err != err { - t.Errorf("Test case: %s. Expected error: %v but got: %v", c.name, c.Err, err) + t.Run(c.name, func(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.DaemonSetUpdateSurge, c.enableSurge)() + + c.Manager.dsStore.Add(c.ds) + nodeList, err := c.Manager.nodeLister.List(labels.Everything()) + if err != nil { + t.Fatalf("error listing nodes: %v", err) } - } else if err != nil { - t.Errorf("Test case: %s. Unexpected error: %v", c.name, err) - } else if maxUnavailable != c.maxUnavailable || numUnavailable != c.numUnavailable { - t.Errorf("Test case: %s. Wrong values. maxUnavailable: %d, expected: %d, numUnavailable: %d. expected: %d", c.name, maxUnavailable, c.maxUnavailable, numUnavailable, c.numUnavailable) - } + maxSurge, maxUnavailable, err := c.Manager.updatedDesiredNodeCounts(c.ds, nodeList, c.nodeToPods) + if err != nil && c.Err != nil { + if c.Err != err { + t.Fatalf("Expected error: %v but got: %v", c.Err, err) + } + } + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if maxSurge != c.maxSurge || maxUnavailable != c.maxUnavailable { + t.Errorf("Wrong values. maxSurge: %d, expected %d, maxUnavailable: %d, expected: %d", maxSurge, c.maxSurge, maxUnavailable, c.maxUnavailable) + } + var emptyNodes int + for _, pods := range c.nodeToPods { + if len(pods) == 0 { + emptyNodes++ + } + } + if emptyNodes != c.emptyNodes { + t.Errorf("expected numEmpty to be %d, was %d", c.emptyNodes, emptyNodes) + } + }) } } diff --git a/pkg/controller/daemon/util/daemonset_util.go b/pkg/controller/daemon/util/daemonset_util.go index cd576770159e..f2b58237c205 100644 --- a/pkg/controller/daemon/util/daemonset_util.go +++ b/pkg/controller/daemon/util/daemonset_util.go @@ -24,8 +24,10 @@ import ( v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" + intstrutil "k8s.io/apimachinery/pkg/util/intstr" + utilfeature "k8s.io/apiserver/pkg/util/feature" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/features" ) // GetTemplateGeneration gets the template generation associated with a v1.DaemonSet by extracting it from the @@ -122,6 +124,43 @@ func CreatePodTemplate(template v1.PodTemplateSpec, generation *int64, hash stri return newTemplate } +// AllowsSurge returns true if the daemonset allows more than a single pod on any node. +func AllowsSurge(ds *apps.DaemonSet) bool { + maxSurge, err := SurgeCount(ds, 1) + return err == nil && maxSurge > 0 +} + +// SurgeCount returns 0 if surge is not requested, the expected surge number to allow +// out of numberToSchedule if surge is configured, or an error if the surge percentage +// requested is invalid. +func SurgeCount(ds *apps.DaemonSet, numberToSchedule int) (int, error) { + if ds.Spec.UpdateStrategy.Type != apps.RollingUpdateDaemonSetStrategyType { + return 0, nil + } + if !utilfeature.DefaultFeatureGate.Enabled(features.DaemonSetUpdateSurge) { + return 0, nil + } + r := ds.Spec.UpdateStrategy.RollingUpdate + if r == nil { + return 0, nil + } + return intstrutil.GetScaledValueFromIntOrPercent(r.MaxSurge, numberToSchedule, true) +} + +// UnavailableCount returns 0 if unavailability is not requested, the expected +// unavailability number to allow out of numberToSchedule if requested, or an error if +// the unavailability percentage requested is invalid. +func UnavailableCount(ds *apps.DaemonSet, numberToSchedule int) (int, error) { + if ds.Spec.UpdateStrategy.Type != apps.RollingUpdateDaemonSetStrategyType { + return 0, nil + } + r := ds.Spec.UpdateStrategy.RollingUpdate + if r == nil { + return 0, nil + } + return intstrutil.GetScaledValueFromIntOrPercent(r.MaxUnavailable, numberToSchedule, true) +} + // IsPodUpdated checks if pod contains label value that either matches templateGeneration or hash func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool { // Compare with hash to see if the pod is updated, need to maintain backward compatibility of templateGeneration @@ -131,20 +170,6 @@ func IsPodUpdated(pod *v1.Pod, hash string, dsTemplateGeneration *int64) bool { return hashMatches || templateMatches } -// SplitByAvailablePods splits provided daemon set pods by availability -func SplitByAvailablePods(minReadySeconds int32, pods []*v1.Pod) ([]*v1.Pod, []*v1.Pod) { - unavailablePods := []*v1.Pod{} - availablePods := []*v1.Pod{} - for _, pod := range pods { - if podutil.IsPodAvailable(pod, minReadySeconds, metav1.Now()) { - availablePods = append(availablePods, pod) - } else { - unavailablePods = append(unavailablePods, pod) - } - } - return availablePods, unavailablePods -} - // ReplaceDaemonSetPodNodeNameNodeAffinity replaces the RequiredDuringSchedulingIgnoredDuringExecution // NodeAffinity of the given affinity with a new NodeAffinity that selects the given nodeName. // Note that this function assumes that no NodeAffinity conflicts with the selected nodeName. diff --git a/test/e2e/apps/daemon_set.go b/test/e2e/apps/daemon_set.go index 329688eb3b9b..db23bc1efd7f 100644 --- a/test/e2e/apps/daemon_set.go +++ b/test/e2e/apps/daemon_set.go @@ -17,10 +17,14 @@ limitations under the License. package apps import ( + "bytes" "context" "fmt" + "math/rand" "reflect" + "sort" "strings" + "text/tabwriter" "time" "github.com/onsi/ginkgo" @@ -32,6 +36,8 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" @@ -482,8 +488,334 @@ var _ = SIGDescribe("Daemon set [Serial]", func() { framework.ExpectEqual(rollbackPods[pod.Name], true, fmt.Sprintf("unexpected pod %s be restarted", pod.Name)) } }) + + // TODO: This test is expected to be promoted to conformance after the feature is promoted + ginkgo.It("should surge pods onto nodes when spec was updated and update strategy is RollingUpdate [Feature:DaemonSetUpdateSurge]", func() { + label := map[string]string{daemonsetNameLabel: dsName} + + framework.Logf("Creating surge daemon set %s", dsName) + maxSurgeOverlap := 60 * time.Second + maxSurge := 1 + surgePercent := intstr.FromString("20%") + zero := intstr.FromInt(0) + oldVersion := "1" + ds := newDaemonSet(dsName, image, label) + ds.Spec.Template.Spec.Containers[0].Env = []v1.EnvVar{ + {Name: "VERSION", Value: oldVersion}, + } + // delay shutdown by 15s to allow containers to overlap in time + ds.Spec.Template.Spec.Containers[0].Lifecycle = &v1.Lifecycle{ + PreStop: &v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/sh", "-c", "sleep 15"}, + }, + }, + } + // use a readiness probe that can be forced to fail (by changing the contents of /var/tmp/ready) + ds.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{ + Handler: v1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/bin/sh", "-ec", `touch /var/tmp/ready; [[ "$( cat /var/tmp/ready )" == "" ]]`}, + }, + }, + InitialDelaySeconds: 7, + PeriodSeconds: 3, + SuccessThreshold: 1, + FailureThreshold: 1, + } + // use a simple surge strategy + ds.Spec.UpdateStrategy = appsv1.DaemonSetUpdateStrategy{ + Type: appsv1.RollingUpdateDaemonSetStrategyType, + RollingUpdate: &appsv1.RollingUpdateDaemonSet{ + MaxUnavailable: &zero, + MaxSurge: &surgePercent, + }, + } + // The pod must be ready for at least 10s before we delete the old pod + ds.Spec.MinReadySeconds = 10 + + ds, err := c.AppsV1().DaemonSets(ns).Create(context.TODO(), ds, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Check that daemon pods launch on every node of the cluster.") + err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) + framework.ExpectNoError(err, "error waiting for daemon pod to start") + + // Check history and labels + ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + waitForHistoryCreated(c, ns, label, 1) + cur := curHistory(listDaemonHistories(c, ns, label), ds) + hash := cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] + framework.ExpectEqual(cur.Revision, int64(1)) + checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) + + newVersion := "2" + ginkgo.By("Update daemon pods environment var") + patch := fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","env":[{"name":"VERSION","value":"%s"}]}]}}}}`, ds.Spec.Template.Spec.Containers[0].Name, newVersion) + ds, err = c.AppsV1().DaemonSets(ns).Patch(context.TODO(), dsName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + framework.ExpectNoError(err) + + // Time to complete the rolling upgrade is proportional to the number of nodes in the cluster. + // Get the number of nodes, and set the timeout appropriately. + nodes, err := c.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + framework.ExpectNoError(err) + nodeCount := len(nodes.Items) + retryTimeout := dsRetryTimeout + time.Duration(nodeCount*30)*time.Second + + ginkgo.By("Check that daemon pods surge and invariants are preserved during that rollout") + ageOfOldPod := make(map[string]time.Time) + deliberatelyDeletedPods := sets.NewString() + err = wait.PollImmediate(dsRetryPeriod, retryTimeout, func() (bool, error) { + podList, err := c.CoreV1().Pods(ds.Namespace).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return false, err + } + pods := podList.Items + + var buf bytes.Buffer + pw := tabwriter.NewWriter(&buf, 1, 1, 1, ' ', 0) + fmt.Fprint(pw, "Node\tVersion\tName\tUID\tDeleted\tReady\n") + + now := time.Now() + podUIDs := sets.NewString() + deletedPodUIDs := sets.NewString() + nodes := sets.NewString() + versions := sets.NewString() + nodesToVersions := make(map[string]map[string]int) + nodesToDeletedVersions := make(map[string]map[string]int) + var surgeCount, newUnavailableCount, newDeliberatelyDeletedCount, oldUnavailableCount, nodesWithoutOldVersion int + for _, pod := range pods { + if !metav1.IsControlledBy(&pod, ds) { + continue + } + nodeName := pod.Spec.NodeName + nodes.Insert(nodeName) + podVersion := pod.Spec.Containers[0].Env[0].Value + if pod.DeletionTimestamp != nil { + if !deliberatelyDeletedPods.Has(string(pod.UID)) { + versions := nodesToDeletedVersions[nodeName] + if versions == nil { + versions = make(map[string]int) + nodesToDeletedVersions[nodeName] = versions + } + versions[podVersion]++ + } + } else { + versions := nodesToVersions[nodeName] + if versions == nil { + versions = make(map[string]int) + nodesToVersions[nodeName] = versions + } + versions[podVersion]++ + } + + ready := podutil.IsPodAvailable(&pod, ds.Spec.MinReadySeconds, metav1.Now()) + if podVersion == newVersion { + surgeCount++ + if !ready || pod.DeletionTimestamp != nil { + if deliberatelyDeletedPods.Has(string(pod.UID)) { + newDeliberatelyDeletedCount++ + } + newUnavailableCount++ + } + } else { + if !ready || pod.DeletionTimestamp != nil { + oldUnavailableCount++ + } + } + fmt.Fprintf(pw, "%s\t%s\t%s\t%s\t%t\t%t\n", pod.Spec.NodeName, podVersion, pod.Name, pod.UID, pod.DeletionTimestamp != nil, ready) + } + + // print a stable sorted list of pods by node for debugging + pw.Flush() + lines := strings.Split(buf.String(), "\n") + lines = lines[:len(lines)-1] + sort.Strings(lines[1:]) + for _, line := range lines { + framework.Logf("%s", line) + } + + // if there is an old and new pod at the same time, record a timestamp + deletedPerNode := make(map[string]int) + for _, pod := range pods { + if !metav1.IsControlledBy(&pod, ds) { + continue + } + // ignore deleted pods + if pod.DeletionTimestamp != nil { + deletedPodUIDs.Insert(string(pod.UID)) + if !deliberatelyDeletedPods.Has(string(pod.UID)) { + deletedPerNode[pod.Spec.NodeName]++ + } + continue + } + podUIDs.Insert(string(pod.UID)) + podVersion := pod.Spec.Containers[0].Env[0].Value + if podVersion == newVersion { + continue + } + // if this is a pod in an older version AND there is a new version of this pod, record when + // we started seeing this, otherwise delete the record (perhaps the node was drained) + if nodesToVersions[pod.Spec.NodeName][newVersion] > 0 { + if _, ok := ageOfOldPod[string(pod.UID)]; !ok { + ageOfOldPod[string(pod.UID)] = now + } + } else { + delete(ageOfOldPod, string(pod.UID)) + } + } + // purge the old pods list of any deleted pods + for uid := range ageOfOldPod { + if !podUIDs.Has(uid) { + delete(ageOfOldPod, uid) + } + } + deliberatelyDeletedPods = deliberatelyDeletedPods.Intersection(deletedPodUIDs) + + for _, versions := range nodesToVersions { + if versions[oldVersion] == 0 { + nodesWithoutOldVersion++ + } + } + + var errs []string + + // invariant: we should not see more than 1 deleted pod per node unless a severe node problem is occurring or the controller is misbehaving + for node, count := range deletedPerNode { + if count > 1 { + errs = append(errs, fmt.Sprintf("Node %s has %d deleted pods, which may indicate a problem on the node or a controller race condition", node, count)) + } + } + + // invariant: the controller must react to the new pod becoming ready within a reasonable timeframe (2x grace period) + for uid, firstSeen := range ageOfOldPod { + if now.Sub(firstSeen) > maxSurgeOverlap { + errs = append(errs, fmt.Sprintf("An old pod with UID %s has been running alongside a newer version for longer than %s", uid, maxSurgeOverlap)) + } + } + + // invariant: we should never have more than maxSurge + oldUnavailableCount instances of the new version unready unless a flake in the infrastructure happens, or + // if we deliberately deleted one of the new pods + if newUnavailableCount > (maxSurge + oldUnavailableCount + newDeliberatelyDeletedCount + nodesWithoutOldVersion) { + errs = append(errs, fmt.Sprintf("observed %d new unavailable pods greater than (surge count %d + old unavailable count %d + deliberately deleted new count %d + nodes without old version %d), may be infrastructure flake", newUnavailableCount, maxSurge, oldUnavailableCount, newDeliberatelyDeletedCount, nodesWithoutOldVersion)) + } + // invariant: the total number of versions created should be 2 + if versions.Len() > 2 { + errs = append(errs, fmt.Sprintf("observed %d versions running simultaneously, must have max 2", versions.Len())) + } + for _, node := range nodes.List() { + // ignore pods that haven't been scheduled yet + if len(node) == 0 { + continue + } + versionCount := make(map[string]int) + // invariant: surge should never have more than one instance of a pod per node running + for version, count := range nodesToVersions[node] { + if count > 1 { + errs = append(errs, fmt.Sprintf("node %s has %d instances of version %s running simultaneously, must have max 1", node, count, version)) + } + versionCount[version] += count + } + // invariant: when surging, the most number of pods we should allow to be deleted is 2 (if we are getting evicted) + for version, count := range nodesToDeletedVersions[node] { + if count > 2 { + errs = append(errs, fmt.Sprintf("node %s has %d deleted instances of version %s running simultaneously, must have max 1", node, count, version)) + } + versionCount[version] += count + } + // invariant: on any node, we should never have more than two instances of a version (if we are getting evicted) + for version, count := range versionCount { + if count > 2 { + errs = append(errs, fmt.Sprintf("node %s has %d total instances of version %s running simultaneously, must have max 2 (one deleted and one running)", node, count, version)) + } + } + } + + if len(errs) > 0 { + sort.Strings(errs) + return false, fmt.Errorf("invariants were violated during daemonset update:\n%s", strings.Join(errs, "\n")) + } + + // Make sure every daemon pod on the node has been updated + nodeNames := schedulableNodes(c, ds) + for _, node := range nodeNames { + switch { + case + // if we don't have the new version yet + nodesToVersions[node][newVersion] == 0, + // if there are more than one version on a node + len(nodesToVersions[node]) > 1, + // if there are still any deleted pods + len(nodesToDeletedVersions[node]) > 0, + // if any of the new pods are unavailable + newUnavailableCount > 0: + + // inject a failure randomly to ensure the controller recovers + switch rand.Intn(25) { + // cause a random old pod to go unready + case 0: + // select a not-deleted pod of the old version + if pod := randomPod(pods, func(pod *v1.Pod) bool { + return pod.DeletionTimestamp == nil && oldVersion == pod.Spec.Containers[0].Env[0].Value + }); pod != nil { + // make the /tmp/ready file read only, which will cause readiness to fail + if _, err := framework.RunKubectl(pod.Namespace, "exec", "-c", pod.Spec.Containers[0].Name, pod.Name, "--", "/bin/sh", "-ec", "echo 0 > /var/tmp/ready"); err != nil { + framework.Logf("Failed to mark pod %s as unready via exec: %v", pod.Name, err) + } else { + framework.Logf("Marked old pod %s as unready", pod.Name) + } + } + case 1: + // delete a random pod + if pod := randomPod(pods, func(pod *v1.Pod) bool { + return pod.DeletionTimestamp == nil + }); pod != nil { + if err := c.CoreV1().Pods(ds.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}); err != nil { + framework.Logf("Failed to delete pod %s early: %v", pod.Name, err) + } else { + framework.Logf("Deleted pod %s prematurely", pod.Name) + deliberatelyDeletedPods.Insert(string(pod.UID)) + } + } + } + + // then wait + return false, nil + } + } + return true, nil + }) + framework.ExpectNoError(err) + + ginkgo.By("Check that daemon pods are still running on every node of the cluster.") + err = wait.PollImmediate(dsRetryPeriod, dsRetryTimeout, checkRunningOnAllNodes(f, ds)) + framework.ExpectNoError(err, "error waiting for daemon pod to start") + + // Check history and labels + ds, err = c.AppsV1().DaemonSets(ns).Get(context.TODO(), ds.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + waitForHistoryCreated(c, ns, label, 2) + cur = curHistory(listDaemonHistories(c, ns, label), ds) + hash = cur.Labels[appsv1.DefaultDaemonSetUniqueLabelKey] + framework.ExpectEqual(cur.Revision, int64(2)) + checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash) + }) }) +// randomPod selects a random pod within pods that causes fn to return true, or nil +// if no pod can be found matching the criteria. +func randomPod(pods []v1.Pod, fn func(p *v1.Pod) bool) *v1.Pod { + podCount := len(pods) + for offset, i := rand.Intn(podCount), 0; i < (podCount - 1); i++ { + pod := &pods[(offset+i)%podCount] + if fn(pod) { + return pod + } + } + return nil +} + // getDaemonSetImagePatch generates a patch for updating a DaemonSet's container image func getDaemonSetImagePatch(containerName, containerImage string) string { return fmt.Sprintf(`{"spec":{"template":{"spec":{"containers":[{"name":"%s","image":"%s"}]}}}}`, containerName, containerImage)