Skip to content

Commit

Permalink
kubelet: Refactor tryUpdateNodeStatus() into smaller functions
Browse files Browse the repository at this point in the history
  • Loading branch information
jingyuanliang committed Nov 7, 2022
1 parent 3efd107 commit 3dc172a
Showing 1 changed file with 58 additions and 32 deletions.
90 changes: 58 additions & 32 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,21 +479,40 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error
if tryNumber == 0 {
util.FromApiserverCache(&opts)
}
node, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
originalNode, err := kl.heartbeatClient.CoreV1().Nodes().Get(ctx, string(kl.nodeName), opts)
if err != nil {
return fmt.Errorf("error getting node %q: %v", kl.nodeName, err)
}

originalNode := node.DeepCopy()
if originalNode == nil {
return fmt.Errorf("nil %q node object", kl.nodeName)
}

node, changed := kl.updateNode(ctx, originalNode)
shouldPatchNodeStatus := changed || kl.clock.Since(kl.lastStatusReportTime) >= kl.nodeStatusReportFrequency

if !shouldPatchNodeStatus {
kl.markVolumesFromNode(node)
return nil
}

updatedNode, err := kl.patchNodeStatus(originalNode, node)
if err == nil {
kl.markVolumesFromNode(updatedNode)
}
return err
}

// updateNode creates a copy of originalNode and runs update logic on it.
// It returns the updated node object and a bool indicating if anything has been changed.
func (kl *Kubelet) updateNode(ctx context.Context, originalNode *v1.Node) (*v1.Node, bool) {
node := originalNode.DeepCopy()

podCIDRChanged := false
if len(node.Spec.PodCIDRs) != 0 {
// Pod CIDR could have been updated before, so we cannot rely on
// node.Spec.PodCIDR being non-empty. We also need to know if pod CIDR is
// actually changed.
var err error
podCIDRs := strings.Join(node.Spec.PodCIDRs, ",")
if podCIDRChanged, err = kl.updatePodCIDR(ctx, podCIDRs); err != nil {
klog.ErrorS(err, "Error updating pod CIDR")
Expand Down Expand Up @@ -521,41 +540,48 @@ func (kl *Kubelet) tryUpdateNodeStatus(ctx context.Context, tryNumber int) error

kl.setNodeStatus(ctx, node)

now := kl.clock.Now()
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent {
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
return nil
}
}
changed := podCIDRChanged || nodeStatusHasChanged(&originalNode.Status, &node.Status) || areRequiredLabelsNotPresent
return node, changed
}

// patchNodeStatus patches node on the API server based on originalNode.
// It returns any potential error, or an updatedNode and refreshes the state of kubelet when successful.
func (kl *Kubelet) patchNodeStatus(originalNode, node *v1.Node) (*v1.Node, error) {
// Patch the current status on the API server
updatedNode, _, err := nodeutil.PatchNodeStatus(kl.heartbeatClient.CoreV1(), types.NodeName(kl.nodeName), originalNode, node)
if err != nil {
return err
return nil, err
}
kl.lastStatusReportTime = now
kl.lastStatusReportTime = kl.clock.Now()
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
return nil
return updatedNode, nil
}

// markVolumesFromNode updates volumeManager with VolumesInUse status from node.
//
// In the case of node status update being unnecessary, call with the fetched node.
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
//
// The reason is that on a kubelet restart, the volume manager's dsw is
// repopulated and the volume ReportedInUse is initialized to false, while the
// VolumesInUse list from the Node object still contains the state from the
// previous kubelet instantiation.
//
// Once the volumes are added to the dsw, the ReportedInUse field needs to be
// synced from the VolumesInUse list in the Node.Status.
//
// The MarkVolumesAsReportedInUse() call cannot be performed in dsw directly
// because it does not have access to the Node object.
// This also cannot be populated on node status manager init because the volume
// may not have been added to dsw at that time.
//
// Or, after a successful node status update, call with updatedNode returned from
// the patch call, to mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
func (kl *Kubelet) markVolumesFromNode(node *v1.Node) {
kl.volumeManager.MarkVolumesAsReportedInUse(node.Status.VolumesInUse)
}

// recordNodeStatusEvent records an event of the given type with the given
Expand Down

0 comments on commit 3dc172a

Please sign in to comment.