Skip to content

Commit

Permalink
kubelet: Refactor tryUpdateNodeStatus() into smaller functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyuanliang committed Nov 2, 2022
1 parent 0a689af commit 18312cb
Showing 1 changed file with 56 additions and 32 deletions.
88 changes: 56 additions & 32 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -478,21 +478,39 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts)
originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(context.TODO(), string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}

originalNode := node.DeepCopy()
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}

node, changed := kl.updateNode(originalNode)

if kl.clock.Since(kl.lastStatusReportTime) < kl.nodeStatusReportFrequency && !changed {
kl.markVolumesFromNode(node)
return nil
}

updatedNode, err := kl.patchNodeStatus(originalNode, node)
if err == nil {
kl.markVolumesFromNode(updatedNode)
}
return err
}

// updateNode creates a copy of originalNode and runs update logic on it.
// It returns the updated node object and a bool indicating if anything has been changed.
func (kl *Kubelet) updateNode(originalNode *v1.Node) (*v1.Node, bool) {
node := originalNode.DeepCopy()

podCIDRChanged := false
if len(node.Spec.PodCIDRs) != 0 {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
var err error
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if podCIDRChanged, err = kl.updatePodCIDR(podCIDRs); err != nil {
klog.ErrorS(err, "Error updating pod CIDR")
Expand Down Expand Up @@ -520,41 +538,47 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {

kl.setNodeStatus(node)

now := kl.clock.Now()
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent {
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
return nil
}
}
return node, podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent
}

// patchNodeStatus patches node on the API server based on originalNode.
// It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful.
func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) {
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
return nil, err
}
kl.lastStatusReportTime = now
kl.lastStatusReportTime = kl.clock.Now()
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
return nil
return updatedNode, nil
}

// markVolumesFromNode updates volumeManager with VolumesInUse status from node.
//
// In the case of node status update being unnecessary, call with the fetched node.
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
//
// Or, after a successful node status update, call with updatedNode returned from
// the patch call, to mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
func (kl *Kubelet) markVolumesFromNode(node *v1.Node) {
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
}

// recordNodeStatusEvent records an event of the given type with the given
Expand Down

0 comments on commit 18312cb

Please sign in to comment.