Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hotfix when a plugin (in-tree or out-of-tree) return non-existent/illegal nodes, the pod scheduling flow will abort immediately. #124559

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 14 additions & 9 deletions pkg/scheduler/schedule_one.go
Expand Up @@ -439,13 +439,16 @@ func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework
// filter plugins and filter extenders.
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {
logger := klog.FromContext(ctx)
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
}

allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, diagnosis, err
return nil, framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
}, err
}

diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap, len(allNodes)),
}
// Run "prefilter" plugins.
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
Expand Down Expand Up @@ -483,12 +486,14 @@ func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.F
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for n := range preRes.NodeNames {
nInfo, err := sched.nodeInfoSnapshot.NodeInfos().Get(n)
if err != nil {
return nil, diagnosis, err
for _, n := range allNodes {
if !preRes.NodeNames.Has(n.Node().Name) {
// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
continue
}
nodes = append(nodes, nInfo)
nodes = append(nodes, n)
}
}
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/schedule_one_test.go
Expand Up @@ -2181,7 +2181,7 @@ func TestSchedulerSchedulePod(t *testing.T) {
nodes: []string{"node1", "node2", "node3"},
pod: st.MakePod().Name("test-prefilter").UID("test-prefilter").Obj(),
wantNodes: sets.New("node2"),
wantEvaluatedNodes: ptr.To[int32](1),
wantEvaluatedNodes: ptr.To[int32](3),
},
{
name: "test prefilter plugin returning non-intersecting nodes",
Expand Down
14 changes: 14 additions & 0 deletions pkg/scheduler/testing/wrappers.go
Expand Up @@ -851,6 +851,20 @@ func (p *PersistentVolumeWrapper) HostPathVolumeSource(src *v1.HostPathVolumeSou
return p
}

// NodeAffinityIn creates a HARD node affinity (with the operator In)
// and injects into the pv.
func (p *PersistentVolumeWrapper) NodeAffinityIn(key string, vals []string) *PersistentVolumeWrapper {
if p.Spec.NodeAffinity == nil {
p.Spec.NodeAffinity = &v1.VolumeNodeAffinity{}
}
if p.Spec.NodeAffinity.Required == nil {
p.Spec.NodeAffinity.Required = &v1.NodeSelector{}
}
nodeSelector := MakeNodeSelector().In(key, vals).Obj()
p.Spec.NodeAffinity.Required.NodeSelectorTerms = append(p.Spec.NodeAffinity.Required.NodeSelectorTerms, nodeSelector.NodeSelectorTerms...)
return p
}

// ResourceClaimWrapper wraps a ResourceClaim inside.
type ResourceClaimWrapper struct{ resourcev1alpha2.ResourceClaim }

Expand Down
73 changes: 73 additions & 0 deletions test/e2e/scheduling/predicates.go
Expand Up @@ -39,6 +39,7 @@ import (
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
e2eruntimeclass "k8s.io/kubernetes/test/e2e/framework/node/runtimeclass"
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
testutils "k8s.io/kubernetes/test/utils"
Expand Down Expand Up @@ -857,6 +858,78 @@ var _ = SIGDescribe("SchedulerPredicates", framework.WithSerial(), func() {
ginkgo.By("Expect all pods are scheduled and running")
framework.ExpectNoError(e2epod.WaitForPodsRunning(cs, ns, replicas, time.Minute))
})

// Regression test for an extended scenario for https://issues.k8s.io/123465
ginkgo.It("when PVC has node-affinity to non-existent/illegal nodes, the pod should be scheduled normally if suitable nodes exist", func(ctx context.Context) {
nodeName := GetNodeThatCanRunPod(ctx, f)
nonExistentNodeName1 := string(uuid.NewUUID())
nonExistentNodeName2 := string(uuid.NewUUID())
hostLabel := "kubernetes.io/hostname"
localPath := "/tmp"
podName := "bind-pv-with-non-existent-nodes"
pvcName := "pvc-" + string(uuid.NewUUID())
_, pvc, err := e2epv.CreatePVPVC(ctx, cs, f.Timeouts, e2epv.PersistentVolumeConfig{
PVSource: v1.PersistentVolumeSource{
Local: &v1.LocalVolumeSource{
Path: localPath,
},
},
Prebind: &v1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{Name: pvcName, Namespace: ns},
},
NodeAffinity: &v1.VolumeNodeAffinity{
Required: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: hostLabel,
Operator: v1.NodeSelectorOpIn,
// add non-existent nodes to the list
Values: []string{nodeName, nonExistentNodeName1, nonExistentNodeName2},
},
},
},
},
},
},
}, e2epv.PersistentVolumeClaimConfig{
Name: pvcName,
}, ns, true)
framework.ExpectNoError(err)
bindPvPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "pause",
Image: imageutils.GetE2EImage(imageutils.Pause),
VolumeMounts: []v1.VolumeMount{
{
Name: "data",
MountPath: "/tmp",
},
},
},
},
Volumes: []v1.Volume{
{
Name: "data",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: pvc.Name,
},
},
},
},
},
}
_, err = f.ClientSet.CoreV1().Pods(ns).Create(ctx, bindPvPod, metav1.CreateOptions{})
framework.ExpectNoError(err)
framework.ExpectNoError(e2epod.WaitForPodNotPending(ctx, f.ClientSet, ns, podName))
})
})

func patchPod(cs clientset.Interface, old, new *v1.Pod) (*v1.Pod, error) {
Expand Down
106 changes: 106 additions & 0 deletions test/integration/scheduler/filters/filters_test.go
Expand Up @@ -43,6 +43,7 @@ var (
createAndWaitForNodesInCache = testutils.CreateAndWaitForNodesInCache
createNamespacesWithLabels = testutils.CreateNamespacesWithLabels
createNode = testutils.CreateNode
updateNode = testutils.UpdateNode
createPausePod = testutils.CreatePausePod
deletePod = testutils.DeletePod
getPod = testutils.GetPod
Expand Down Expand Up @@ -2042,6 +2043,111 @@ func TestUnschedulablePodBecomesSchedulable(t *testing.T) {
return deletePod(cs, "pod-to-be-deleted", ns)
},
},
{
name: "pod with pvc has node-affinity to non-existent/illegal nodes",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @Huang-Wei , in addition, the test/integration/scheduler/filters/filters_test.go test is added in this PR to enhance stability

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this test to the master branch first

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add this test to the master branch first

done

init: func(cs kubernetes.Interface, ns string) error {
storage := v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}
volType := v1.HostPathDirectoryOrCreate
pv, err := testutils.CreatePV(cs, st.MakePersistentVolume().
Name("pv-has-non-existent-nodes").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: &volType}).
NodeAffinityIn("kubernetes.io/hostname", []string{"node-available", "non-existing"}). // one node exist, one doesn't
Obj())
if err != nil {
return fmt.Errorf("cannot create pv: %w", err)
}
_, err = testutils.CreatePVC(cs, st.MakePersistentVolumeClaim().
Name("pvc-has-non-existent-nodes").
Namespace(ns).
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Resources(storage).
Obj())
if err != nil {
return fmt.Errorf("cannot create pvc: %w", err)
}
return nil
},
pod: &testutils.PausePodConfig{
Name: "pod-with-pvc-has-non-existent-nodes",
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc-has-non-existent-nodes",
},
},
}},
},
update: func(cs kubernetes.Interface, ns string) error {
_, err := createNode(cs, st.MakeNode().Label("kubernetes.io/hostname", "node-available").Name("node-available").Obj())
if err != nil {
return fmt.Errorf("cannot create node: %w", err)
}
return nil
},
},
{
name: "pod with pvc got scheduled after node updated it's label",
init: func(cs kubernetes.Interface, ns string) error {
_, err := createNode(cs, st.MakeNode().Label("foo", "foo").Name("node-foo").Obj())
if err != nil {
return fmt.Errorf("cannot create node: %w", err)
}
storage := v1.VolumeResourceRequirements{Requests: v1.ResourceList{v1.ResourceStorage: resource.MustParse("1Mi")}}
volType := v1.HostPathDirectoryOrCreate
pv, err := testutils.CreatePV(cs, st.MakePersistentVolume().
Name("pv-foo").
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Capacity(storage.Requests).
HostPathVolumeSource(&v1.HostPathVolumeSource{Path: "/tmp", Type: &volType}).
NodeAffinityIn("foo", []string{"bar"}).
Obj())
if err != nil {
return fmt.Errorf("cannot create pv: %w", err)
}
_, err = testutils.CreatePVC(cs, st.MakePersistentVolumeClaim().
Name("pvc-foo").
Namespace(ns).
Annotation(volume.AnnBindCompleted, "true").
VolumeName(pv.Name).
AccessModes([]v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}).
Resources(storage).
Obj())
if err != nil {
return fmt.Errorf("cannot create pvc: %w", err)
}
return nil
},
pod: &testutils.PausePodConfig{
Name: "pod-with-pvc-foo",
Volumes: []v1.Volume{{
Name: "volume",
VolumeSource: v1.VolumeSource{
PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
ClaimName: "pvc-foo",
},
},
}},
},
update: func(cs kubernetes.Interface, ns string) error {
_, err := updateNode(cs, &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node-foo",
Labels: map[string]string{
"foo": "bar",
},
},
})
if err != nil {
return fmt.Errorf("cannot update node: %w", err)
}
return nil
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down
31 changes: 25 additions & 6 deletions test/integration/scheduler/plugins/plugins_test.go
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -80,9 +81,10 @@ type PreEnqueuePlugin struct {
}

type PreFilterPlugin struct {
numPreFilterCalled int
failPreFilter bool
rejectPreFilter bool
numPreFilterCalled int
failPreFilter bool
rejectPreFilter bool
preFilterResultNodes sets.Set[string]
}

type ScorePlugin struct {
Expand Down Expand Up @@ -524,6 +526,9 @@ func (pp *PreFilterPlugin) PreFilter(ctx context.Context, state *framework.Cycle
if pp.rejectPreFilter {
return nil, framework.NewStatus(framework.Unschedulable, fmt.Sprintf("reject pod %v", pod.Name))
}
if len(pp.preFilterResultNodes) != 0 {
return &framework.PreFilterResult{NodeNames: pp.preFilterResultNodes}, nil
}
return nil, nil
}

Expand Down Expand Up @@ -627,9 +632,10 @@ func TestPreFilterPlugin(t *testing.T) {
testContext := testutils.InitTestAPIServer(t, "prefilter-plugin", nil)

tests := []struct {
name string
fail bool
reject bool
name string
fail bool
reject bool
preFilterResultNodes sets.Set[string]
}{
{
name: "disable fail and reject flags",
Expand All @@ -646,6 +652,18 @@ func TestPreFilterPlugin(t *testing.T) {
fail: false,
reject: true,
},
{
name: "inject legal node names in PreFilterResult",
fail: false,
reject: false,
preFilterResultNodes: sets.New[string]("test-node-0", "test-node-1"),
},
{
name: "inject legal and illegal node names in PreFilterResult",
fail: false,
reject: false,
preFilterResultNodes: sets.New[string]("test-node-0", "non-existent-node"),
},
}

for _, test := range tests {
Expand All @@ -661,6 +679,7 @@ func TestPreFilterPlugin(t *testing.T) {

preFilterPlugin.failPreFilter = test.fail
preFilterPlugin.rejectPreFilter = test.reject
preFilterPlugin.preFilterResultNodes = test.preFilterResultNodes
// Create a best effort pod.
pod, err := testutils.CreatePausePod(testCtx.ClientSet,
testutils.InitPausePod(&testutils.PausePodConfig{Name: "test-pod", Namespace: testCtx.NS.Name}))
Expand Down