diff --git a/pkg/constants/constants.go b/pkg/constants/constants.go index a41fff566..a81a076b9 100644 --- a/pkg/constants/constants.go +++ b/pkg/constants/constants.go @@ -1,5 +1,12 @@ package constants +import ( + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" +) + // constants defines some file paths that are shared outside of the // MCO package; and thus consumed by other users @@ -9,7 +16,21 @@ const ( APIServerURLFile = "/etc/kubernetes/apiserver-url.env" ) -// ConstantsByName is a map of constants for ease of templating -var ConstantsByName = map[string]string{ - "APIServerURLFile": APIServerURLFile, -} +var ( + // NodeUpdateBackoff is the backoff time before asking APIServer to update node + // object again. + NodeUpdateBackoff = wait.Backoff{ + Steps: 5, + Duration: 100 * time.Millisecond, + Jitter: 1.0, + } + // NodeUpdateInProgressTaint is a taint applied by MCC when the update of node starts. + NodeUpdateInProgressTaint = &corev1.Taint{ + Key: "UpdateInProgress", + Effect: corev1.TaintEffectPreferNoSchedule, + } + // ConstantsByName is a map of constants for ease of templating + ConstantsByName = map[string]string{ + "APIServerURLFile": APIServerURLFile, + } +) diff --git a/pkg/controller/node/node_controller.go b/pkg/controller/node/node_controller.go index 37b6b030e..178a94999 100644 --- a/pkg/controller/node/node_controller.go +++ b/pkg/controller/node/node_controller.go @@ -14,6 +14,7 @@ import ( "github.com/openshift/library-go/pkg/operator/v1helpers" "github.com/openshift/machine-config-operator/internal" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + "github.com/openshift/machine-config-operator/pkg/constants" ctrlcommon "github.com/openshift/machine-config-operator/pkg/controller/common" daemonconsts "github.com/openshift/machine-config-operator/pkg/daemon/constants" mcfgclientset "github.com/openshift/machine-config-operator/pkg/generated/clientset/versioned" @@ -65,12 +66,6 @@ const ( masterPoolName = "master" ) -var nodeUpdateBackoff = wait.Backoff{ - Steps: 5, - Duration: 100 * time.Millisecond, - Jitter: 1.0, -} - // Controller defines the node controller. type Controller struct { client mcfgclientset.Interface @@ -488,6 +483,10 @@ func (ctrl *Controller) updateNode(old, cur interface{}) { ctrl.logPoolNode(pool, curNode, "changed labels") changed = true } + if !reflect.DeepEqual(oldNode.Spec.Taints, curNode.Spec.Taints) { + ctrl.logPoolNode(pool, curNode, "changed taints") + changed = true + } } if !changed { @@ -766,7 +765,18 @@ func (ctrl *Controller) syncMachineConfigPool(key string) error { if err := ctrl.setClusterConfigAnnotation(nodes); err != nil { return goerrs.Wrapf(err, "error setting clusterConfig Annotation for node in pool %q, error: %v", pool.Name, err) } - + // Taint all the nodes in the node pool, irrespective of their upgrade status. + ctx := context.TODO() + for _, node := range nodes { + // All the nodes that need to be upgraded should have `NodeUpdateInProgressTaint` so that they're less likely + // to be chosen during the scheduling cycle. + targetConfig := pool.Spec.Configuration.Name + if node.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] != targetConfig { + if err := ctrl.setUpdateInProgressTaint(ctx, node.Name); err != nil { + return goerrs.Wrapf(err, "failed applying %s taint for node %s", constants.NodeUpdateInProgressTaint.Key, node.Name) + } + } + } candidates, capacity := getAllCandidateMachines(pool, nodes, maxunavail) if len(candidates) > 0 { ctrl.logPool(pool, "%d candidate nodes for update, capacity: %d", len(candidates), capacity) @@ -834,7 +844,7 @@ func (ctrl *Controller) setClusterConfigAnnotation(nodes []*corev1.Node) error { } func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfig string) error { - return clientretry.RetryOnConflict(nodeUpdateBackoff, func() error { + return clientretry.RetryOnConflict(constants.NodeUpdateBackoff, func() error { oldNode, err := ctrl.kubeClient.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) if err != nil { return err @@ -871,7 +881,6 @@ func (ctrl *Controller) setDesiredMachineConfigAnnotation(nodeName, currentConfi // capacity. It is the reponsibility of the caller to choose a subset of the nodes given the capacity. func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*corev1.Node, maxUnavailable int) ([]*corev1.Node, uint) { targetConfig := pool.Spec.Configuration.Name - unavail := getUnavailableMachines(nodesInPool) // If we're at capacity, there's nothing to do. if len(unavail) >= maxUnavailable { @@ -888,10 +897,8 @@ func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*core } continue } - nodes = append(nodes, node) } - // Nodes which are failing to target this config also count against // availability - it might be a transient issue, and if the issue // clears we don't want multiple to update at once. @@ -899,7 +906,6 @@ func getAllCandidateMachines(pool *mcfgv1.MachineConfigPool, nodesInPool []*core return nil, 0 } capacity -= failingThisConfig - return nodes, uint(capacity) } @@ -973,6 +979,47 @@ func (ctrl *Controller) updateCandidateMachines(pool *mcfgv1.MachineConfigPool, return nil } +// setUpdateInProgressTaint applies in progress taint to all the nodes that are to be updated. +// The taint on the individual node is removed by MCD once the update of the node is complete. +// This is to ensure that the updated nodes are being preferred to non-updated nodes there by +// reducing the number of reschedules. +func (ctrl *Controller) setUpdateInProgressTaint(ctx context.Context, nodeName string) error { + return clientretry.RetryOnConflict(constants.NodeUpdateBackoff, func() error { + oldNode, err := ctrl.kubeClient.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{}) + if err != nil { + return err + } + oldData, err := json.Marshal(oldNode) + if err != nil { + return err + } + + newNode := oldNode.DeepCopy() + if newNode.Spec.Taints == nil { + newNode.Spec.Taints = []corev1.Taint{} + } + + for _, taint := range newNode.Spec.Taints { + if taint.MatchTaint(constants.NodeUpdateInProgressTaint) { + return nil + } + } + + newNode.Spec.Taints = append(newNode.Spec.Taints, *constants.NodeUpdateInProgressTaint) + newData, err := json.Marshal(newNode) + if err != nil { + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + if err != nil { + return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) + } + _, err = ctrl.kubeClient.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + return err + }) +} + func maxUnavailable(pool *mcfgv1.MachineConfigPool, nodes []*corev1.Node) (int, error) { intOrPercent := intstrutil.FromInt(1) if pool.Spec.MaxUnavailable != nil { diff --git a/pkg/controller/node/node_controller_test.go b/pkg/controller/node/node_controller_test.go index b3f5258ad..4aa59e1a9 100644 --- a/pkg/controller/node/node_controller_test.go +++ b/pkg/controller/node/node_controller_test.go @@ -3,6 +3,7 @@ package node import ( "encoding/json" "fmt" + "github.com/openshift/machine-config-operator/pkg/constants" "reflect" "testing" "time" @@ -789,9 +790,22 @@ func TestShouldMakeProgress(t *testing.T) { mcpWorker := helpers.NewMachineConfigPool("worker", nil, helpers.WorkerSelector, "v1") mcp.Spec.MaxUnavailable = intStrPtr(intstr.FromInt(1)) + // Node2 is at desired config, so need to do a get on the node2 to check for the taint status + node2 := newNodeWithLabel("node-2", "v1", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}) + // Update node-2 to have the needed taint, this should still have no effect + node2.Spec.Taints = []corev1.Taint{*constants.NodeUpdateInProgressTaint} + node3 := newNodeWithLabel("node-3", "v0", "v0", map[string]string{"node-role/worker": "", "node-role/infra": ""}) + // Update node-3 to have the needed taint, this should result in taint being applied, however we won't see patch + // to the annotations as maxUnavailable is set to 1. + node3.Spec.Taints = []corev1.Taint{*constants.NodeUpdateInProgressTaint} nodes := []*corev1.Node{ + // Since node-0 is at desired config, it shouldn't be tainted. newNodeWithLabel("node-0", "v1", "v1", map[string]string{"node-role/worker": "", "node-role/infra": ""}), newNodeWithLabel("node-1", "v0", "v0", map[string]string{"node-role/worker": "", "node-role/infra": ""}), + // This node is at desiredConfig and has taint, no patch calls + node2, + // This node is not at desiredConfig and has taint, get call + node3, } f.ccLister = append(f.ccLister, cc) @@ -801,10 +815,10 @@ func TestShouldMakeProgress(t *testing.T) { for idx := range nodes { f.kubeobjects = append(f.kubeobjects, nodes[idx]) } - + // First patch to apply taint f.expectGetNodeAction(nodes[1]) expNode := nodes[1].DeepCopy() - expNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] = "v1" + expNode.Spec.Taints = append(expNode.Spec.Taints, *constants.NodeUpdateInProgressTaint) oldData, err := json.Marshal(nodes[1]) if err != nil { t.Fatal(err) @@ -818,11 +832,30 @@ func TestShouldMakeProgress(t *testing.T) { t.Fatal(err) } f.expectPatchNodeAction(expNode, exppatch) + + // We'll get a get on the node object but no patch on the node object as taints are already present + f.expectGetNodeAction(node3) + + // Patch the annotations on the node object now. Only doing it for node-1 as maxUnavailable is set 1 + f.expectGetNodeAction(nodes[1]) + oldData, err = json.Marshal(expNode) + if err != nil { + t.Fatal(err) + } + expNode.Annotations[daemonconsts.DesiredMachineConfigAnnotationKey] = "v1" + newData, err = json.Marshal(expNode) + if err != nil { + t.Fatal(err) + } + exppatch, err = strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + if err != nil { + t.Fatal(err) + } + f.expectPatchNodeAction(expNode, exppatch) expStatus := calculateStatus(mcp, nodes) expMcp := mcp.DeepCopy() expMcp.Status = expStatus f.expectUpdateMachineConfigPoolStatus(expMcp) - f.run(getKey(mcp, t)) } diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index feca535be..03f9157f8 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -27,7 +27,10 @@ import ( "golang.org/x/time/rate" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" coreinformersv1 "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" @@ -36,12 +39,14 @@ import ( corev1lister "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + clientretry "k8s.io/client-go/util/retry" "k8s.io/client-go/util/workqueue" "k8s.io/kubectl/pkg/drain" configv1 "github.com/openshift/api/config/v1" "github.com/openshift/machine-config-operator/lib/resourceread" mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1" + commonconstants "github.com/openshift/machine-config-operator/pkg/constants" ctrlcommon "github.com/openshift/machine-config-operator/pkg/controller/common" "github.com/openshift/machine-config-operator/pkg/daemon/constants" mcfginformersv1 "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions/machineconfiguration.openshift.io/v1" @@ -1290,12 +1295,59 @@ func (dn *Daemon) completeUpdate(desiredConfigName string) error { return err } + ctx := context.TODO() + if err := dn.removeUpdateInProgressTaint(ctx); err != nil { + return err + } + dn.logSystem("Update completed for config %s and node has been successfully uncordoned", desiredConfigName) dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Uncordon", fmt.Sprintf("Update completed for config %s and node has been uncordoned", desiredConfigName)) return nil } +func (dn *Daemon) removeUpdateInProgressTaint(ctx context.Context) error { + return clientretry.RetryOnConflict(commonconstants.NodeUpdateBackoff, func() error { + oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(ctx, dn.name, metav1.GetOptions{}) + if err != nil { + return err + } + oldData, err := json.Marshal(oldNode) + if err != nil { + return err + } + + newNode := oldNode.DeepCopy() + + // New taints to be copied. + var taintsAfterUpgrade []corev1.Taint + for _, taint := range newNode.Spec.Taints { + if taint.MatchTaint(commonconstants.NodeUpdateInProgressTaint) { + continue + } else { + taintsAfterUpgrade = append(taintsAfterUpgrade, taint) + } + } + // updateInProgress taint is not there, so no need to patch the node object, return immediately + if len(taintsAfterUpgrade) == len(newNode.Spec.Taints) { + return nil + } + // Remove the NodeUpdateInProgressTaint. + newNode.Spec.Taints = taintsAfterUpgrade + newData, err := json.Marshal(newNode) + if err != nil { + return err + } + + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{}) + if err != nil { + return fmt.Errorf("failed to create patch for node %q: %v", dn.name, err) + } + _, err = dn.kubeClient.CoreV1().Nodes().Patch(ctx, dn.name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + return err + }) +} + // triggerUpdateWithMachineConfig starts the update. It queries the cluster for // the current and desired config if they weren't passed. func (dn *Daemon) triggerUpdateWithMachineConfig(currentConfig, desiredConfig *mcfgv1.MachineConfig) error {