Skip to content

Commit

Permalink
Merge pull request #113330 from jprzychodzen/automated-cherry-pick-of…
Browse files Browse the repository at this point in the history
…-#113136-origin-release-1.25

Automated cherry pick of #113136: NodeLifecycleController: Remove race condition
  • Loading branch information
k8s-ci-robot committed Oct 29, 2022
2 parents a822916 + f8b2b13 commit cc056d0
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 8 deletions.
11 changes: 8 additions & 3 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,9 +1127,14 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str

// PatchNodeTaints patches node's taints.
func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
oldData, err := json.Marshal(oldNode)
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
oldNodeNoRV := oldNode.DeepCopy()
oldNodeNoRV.ResourceVersion = ""
oldDataNoRV, err := json.Marshal(&oldNodeNoRV)
if err != nil {
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err)
}

newTaints := newNode.Spec.Taints
Expand All @@ -1140,7 +1145,7 @@ func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{})
if err != nil {
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
}
Expand Down
37 changes: 37 additions & 0 deletions pkg/controller/controller_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -988,6 +988,43 @@ func TestAddOrUpdateTaintOnNode(t *testing.T) {
},
requestCount: 1,
},
{
name: "add taint to changed node",
nodeHandler: &testutil.FakeNodeHandler{
Existing: []*v1.Node{
{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
ResourceVersion: "1",
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{
{Key: "key1", Value: "value1", Effect: "NoSchedule"},
},
},
},
},
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
AsyncCalls: []func(*testutil.FakeNodeHandler){func(m *testutil.FakeNodeHandler) {
if len(m.UpdatedNodes) == 0 {
m.UpdatedNodes = append(m.UpdatedNodes, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node1",
ResourceVersion: "2",
},
Spec: v1.NodeSpec{
Taints: []v1.Taint{},
}})
}
}},
},
nodeName: "node1",
taintsToAdd: []*v1.Taint{{Key: "key2", Value: "value2", Effect: "NoExecute"}},
expectedTaints: []v1.Taint{
{Key: "key2", Value: "value2", Effect: "NoExecute"},
},
requestCount: 5,
},
}
for _, test := range tests {
err := AddOrUpdateTaintOnNode(context.TODO(), test.nodeHandler, test.nodeName, test.taintsToAdd...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ const (
testNodeMonitorGracePeriod = 40 * time.Second
testNodeStartupGracePeriod = 60 * time.Second
testNodeMonitorPeriod = 5 * time.Second
testRateLimiterQPS = float32(10000)
testRateLimiterQPS = float32(100000)
testLargeClusterThreshold = 20
testUnhealthyThreshold = float32(0.55)
)
Expand Down
17 changes: 16 additions & 1 deletion pkg/controller/testutil/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch"
Expand Down Expand Up @@ -64,6 +65,7 @@ type FakeNodeHandler struct {
// Input: Hooks determine if request is valid or not
CreateHook func(*FakeNodeHandler, *v1.Node) bool
Existing []*v1.Node
AsyncCalls []func(*FakeNodeHandler)

// Output
CreatedNodes []*v1.Node
Expand Down Expand Up @@ -131,10 +133,11 @@ func (m *FakeNodeHandler) Create(_ context.Context, node *v1.Node, _ metav1.Crea
}

// Get returns a Node from the fake store.
func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
func (m *FakeNodeHandler) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
m.lock.Lock()
defer func() {
m.RequestCount++
m.runAsyncCalls()
m.lock.Unlock()
}()
for i := range m.UpdatedNodes {
Expand All @@ -152,6 +155,12 @@ func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOpt
return nil, nil
}

func (m *FakeNodeHandler) runAsyncCalls() {
for _, a := range m.AsyncCalls {
a(m)
}
}

// List returns a list of Nodes from the fake store.
func (m *FakeNodeHandler) List(_ context.Context, opts metav1.ListOptions) (*v1.NodeList, error) {
m.lock.Lock()
Expand Down Expand Up @@ -212,6 +221,9 @@ func (m *FakeNodeHandler) Update(_ context.Context, node *v1.Node, _ metav1.Upda
nodeCopy := *node
for i, updateNode := range m.UpdatedNodes {
if updateNode.Name == nodeCopy.Name {
if updateNode.GetObjectMeta().GetResourceVersion() != nodeCopy.GetObjectMeta().GetResourceVersion() {
return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
}
m.UpdatedNodes[i] = &nodeCopy
return node, nil
}
Expand Down Expand Up @@ -345,6 +357,9 @@ func (m *FakeNodeHandler) Patch(_ context.Context, name string, pt types.PatchTy
if updatedNodeIndex < 0 {
m.UpdatedNodes = append(m.UpdatedNodes, &updatedNode)
} else {
if updatedNode.GetObjectMeta().GetResourceVersion() != m.UpdatedNodes[updatedNodeIndex].GetObjectMeta().GetResourceVersion() {
return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
}
m.UpdatedNodes[updatedNodeIndex] = &updatedNode
}

Expand Down
11 changes: 8 additions & 3 deletions staging/src/k8s.io/cloud-provider/node/helpers/taints.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,14 @@ func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v

// PatchNodeTaints patches node's taints.
func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
oldData, err := json.Marshal(oldNode)
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
oldNodeNoRV := oldNode.DeepCopy()
oldNodeNoRV.ResourceVersion = ""
oldDataNoRV, err := json.Marshal(&oldNodeNoRV)
if err != nil {
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err)
}

newTaints := newNode.Spec.Taints
Expand All @@ -102,7 +107,7 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
}

patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{})
if err != nil {
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
}
Expand Down

0 comments on commit cc056d0

Please sign in to comment.