Skip to content

Commit

Permalink
Merge pull request #104613 from ravisantoshgudimetla/reconcile-labels
Browse files Browse the repository at this point in the history
[kubelet]: Reconcile OS and arch labels periodically
  • Loading branch information
k8s-ci-robot committed Nov 8, 2021
2 parents 03fc2ee + 3af5d37 commit cda360c
Show file tree
Hide file tree
Showing 7 changed files with 374 additions and 11 deletions.
21 changes: 20 additions & 1 deletion pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,30 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
}
}

areRequiredLabelsNotPresent := false
osName, osLabelExists := node.Labels[v1.LabelOSStable]
if !osLabelExists || osName != goruntime.GOOS {
if len(node.Labels) == 0 {
node.Labels = make(map[string]string)
}
node.Labels[v1.LabelOSStable] = goruntime.GOOS
areRequiredLabelsNotPresent = true
}
// Set the arch if there is a mismatch
arch, archLabelExists := node.Labels[v1.LabelArchStable]
if !archLabelExists || arch != goruntime.GOARCH {
if len(node.Labels) == 0 {
node.Labels = make(map[string]string)
}
node.Labels[v1.LabelArchStable] = goruntime.GOARCH
areRequiredLabelsNotPresent = true
}

kl.setNodeStatus(node)

now := kl.clock.Now()
if now.Before(kl.lastStatusReportTime.Add(kl.nodeStatusReportFrequency)) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) {
if !podCIDRChanged && !nodeStatusHasChanged(&originalNode.Status, &node.Status) && !areRequiredLabelsNotPresent {
// We must mark the volumes as ReportedInUse in volume manager's dsw even
// if no changes were made to the node status (no volumes were added or removed
// from the VolumesInUse list).
Expand Down
18 changes: 9 additions & 9 deletions pkg/kubelet/kubelet_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestUpdateNewNodeStatus(t *testing.T) {
kubelet.setCachedMachineInfo(machineInfo)

expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestUpdateExistingNodeStatus(t *testing.T) {
kubelet.setCachedMachineInfo(machineInfo)

expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -601,7 +601,7 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) {
kubelet.setCachedMachineInfo(machineInfo)

expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -822,7 +822,7 @@ func TestUpdateNodeStatusWithLease(t *testing.T) {

now := metav1.NewTime(clock.Now()).Rfc3339Copy()
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Conditions: []v1.NodeCondition{
Expand Down Expand Up @@ -1033,13 +1033,13 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
}{
{
desc: "no volumes and no update",
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}},
},
{
desc: "volumes inuse on node and volumeManager",
existingVolumes: []v1.UniqueVolumeName{"vol1"},
existingNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Status: v1.NodeStatus{
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
},
Expand All @@ -1054,14 +1054,14 @@ func TestUpdateNodeStatusAndVolumesInUseWithNodeLease(t *testing.T) {
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
},
},
expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
expectedNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}},
},
{
desc: "volumes inuse in volumeManager but not on node",
existingVolumes: []v1.UniqueVolumeName{"vol1"},
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname}},
expectedNode: &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Status: v1.NodeStatus{
VolumesInUse: []v1.UniqueVolumeName{"vol1"},
},
Expand Down Expand Up @@ -2819,7 +2819,7 @@ func TestUpdateNodeAddresses(t *testing.T) {
},
}
expectedNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}},
Spec: v1.NodeSpec{},
Status: v1.NodeStatus{
Addresses: test.After,
Expand Down
158 changes: 158 additions & 0 deletions pkg/kubelet/kubelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"io/ioutil"
"os"
"reflect"
goruntime "runtime"
"sort"
"strconv"
"testing"
Expand All @@ -30,6 +31,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
core "k8s.io/client-go/testing"
"k8s.io/mount-utils"

v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -858,6 +860,71 @@ func TestHandleNodeSelector(t *testing.T) {
checkPodStatus(t, kl, fittingPod, v1.PodPending)
}

// Tests that we handle not matching labels selector correctly by setting the failed status in status map.
func TestHandleNodeSelectorBasedOnOS(t *testing.T) {
tests := []struct {
name string
nodeLabels map[string]string
podSelector map[string]string
podStatus v1.PodPhase
}{
{
name: "correct OS label, wrong pod selector, admission denied",
nodeLabels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH},
podSelector: map[string]string{v1.LabelOSStable: "dummyOS"},
podStatus: v1.PodFailed,
},
{
name: "correct OS label, correct pod selector, admission denied",
nodeLabels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH},
podSelector: map[string]string{v1.LabelOSStable: goruntime.GOOS},
podStatus: v1.PodPending,
},
{
// Expect no patching to happen, label B should be preserved and can be used for nodeAffinity.
name: "new node label, correct pod selector, admitted",
nodeLabels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH, "key": "B"},
podSelector: map[string]string{"key": "B"},
podStatus: v1.PodPending,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
nodes := []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname, Labels: test.nodeLabels},
Status: v1.NodeStatus{
Allocatable: v1.ResourceList{
v1.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}
kl.nodeLister = testNodeLister{nodes: nodes}

recorder := record.NewFakeRecorder(20)
nodeRef := &v1.ObjectReference{
Kind: "Node",
Name: string("testNode"),
UID: types.UID("testNode"),
Namespace: "",
}
testClusterDNSDomain := "TEST"
kl.dnsConfigurer = dns.NewConfigurer(recorder, nodeRef, nil, nil, testClusterDNSDomain, "")

pod := podWithUIDNameNsSpec("123456789", "podA", "foo", v1.PodSpec{NodeSelector: test.podSelector})

kl.HandlePodAdditions([]*v1.Pod{pod})

// Check pod status stored in the status map.
checkPodStatus(t, kl, pod, test.podStatus)
})
}
}

// Tests that we handle exceeded resources correctly by setting the failed status in status map.
func TestHandleMemExceeded(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
Expand Down Expand Up @@ -2291,6 +2358,97 @@ func TestPreInitRuntimeService(t *testing.T) {
}
}

func TestSyncLabels(t *testing.T) {
tests := []struct {
name string
existingNode *v1.Node
isPatchingNeeded bool
}{
{
name: "no labels",
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{}}},
isPatchingNeeded: true,
},
{
name: "wrong labels",
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1.LabelOSStable: "dummyOS", v1.LabelArchStable: "dummyArch"}}},
isPatchingNeeded: true,
},
{
name: "correct labels",
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: goruntime.GOARCH}}},
isPatchingNeeded: false,
},
{
name: "partially correct labels",
existingNode: &v1.Node{ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{v1.LabelOSStable: goruntime.GOOS, v1.LabelArchStable: "dummyArch"}}},
isPatchingNeeded: true,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
testKubelet := newTestKubelet(t, false)
defer testKubelet.Cleanup()
kl := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient

test.existingNode.Name = string(kl.nodeName)

kl.nodeLister = testNodeLister{nodes: []*v1.Node{test.existingNode}}
go func() { kl.syncNodeStatus() }()

err := retryWithExponentialBackOff(
100*time.Millisecond,
func() (bool, error) {
var savedNode *v1.Node
if test.isPatchingNeeded {
actions := kubeClient.Actions()
if len(actions) == 0 {
t.Logf("No action yet")
return false, nil
}
action := actions[1]
if action.GetVerb() == "patch" {
patchAction := action.(core.PatchActionImpl)
var err error
savedNode, err = applyNodeStatusPatch(test.existingNode, patchAction.GetPatch())
if err != nil {
t.Logf("node patching failed, %v", err)
return false, nil
}
}
} else {
savedNode = test.existingNode
}
val, ok := savedNode.Labels[v1.LabelOSStable]
if !ok {
t.Logf("expected kubernetes.io/os label to be present")
return false, nil
}
if val != goruntime.GOOS {
t.Logf("expected kubernetes.io/os to match runtime.GOOS but got %v", val)
return false, nil
}
val, ok = savedNode.Labels[v1.LabelArchStable]
if !ok {
t.Logf("expected kubernetes.io/arch label to be present")
return false, nil
}
if val != goruntime.GOARCH {
t.Logf("expected kubernetes.io/arch to match runtime.GOARCH but got %v", val)
return false, nil
}
return true, nil
},
)
if err != nil {
t.Fatalf("expected labels to be reconciled but it failed with %v", err)
}
})
}
}

func waitForVolumeUnmount(
volumeManager kubeletvolume.VolumeManager,
pod *v1.Pod) error {
Expand Down
29 changes: 29 additions & 0 deletions pkg/kubelet/lifecycle/predicate.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package lifecycle

import (
"fmt"
"runtime"

v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/util/feature"
Expand Down Expand Up @@ -153,11 +154,39 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
Message: message,
}
}
if rejectPodAdmissionBasedOnOSSelector(admitPod, node) {
return PodAdmitResult{
Admit: false,
Reason: "PodOSSelectorNodeLabelDoesNotMatch",
Message: "Failed to admit pod as the `kubernetes.io/os` label doesn't match node label",
}
}
return PodAdmitResult{
Admit: true,
}
}

// rejectPodAdmissionBasedOnOSSelector rejects pod if it's nodeSelector doesn't match
// We expect the kubelet status reconcile which happens every 10sec to update the node labels if there is a mismatch.
func rejectPodAdmissionBasedOnOSSelector(pod *v1.Pod, node *v1.Node) bool {
labels := node.Labels
osName, osLabelExists := labels[v1.LabelOSStable]
if !osLabelExists || osName != runtime.GOOS {
if len(labels) == 0 {
labels = make(map[string]string)
}
labels[v1.LabelOSStable] = runtime.GOOS
}
podLabelSelector, podOSLabelExists := pod.Labels[v1.LabelOSStable]
if !podOSLabelExists {
// If the labelselector didn't exist, let's keep the current behavior as is
return false
} else if podOSLabelExists && podLabelSelector != labels[v1.LabelOSStable] {
return true
}
return false
}

func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulerframework.NodeInfo) *v1.Pod {
podCopy := pod.DeepCopy()
for i, c := range pod.Spec.Containers {
Expand Down

0 comments on commit cda360c

Please sign in to comment.