Skip to content

Commit

Permalink
Merge pull request #100034 from AliceZhang2016/nodeaffinity-cleanup-o…
Browse files Browse the repository at this point in the history
…nly-scheduler

move nodeaffinity helpers to component-helpers package (only impact s…
  • Loading branch information
k8s-ci-robot committed Mar 10, 2021
2 parents aea6f9f + fd8128d commit f170049
Show file tree
Hide file tree
Showing 5 changed files with 740 additions and 36 deletions.
39 changes: 8 additions & 31 deletions pkg/scheduler/framework/plugins/nodeaffinity/node_affinity.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
Expand Down Expand Up @@ -63,8 +62,7 @@ func (pl *NodeAffinity) Name() string {
}

type preFilterState struct {
requiredNodeSelector labels.Selector
requiredNodeAffinity *nodeaffinity.LazyErrorNodeSelector
requiredNodeSelectorAndAffinity nodeaffinity.RequiredNodeAffinity
}

// Clone just returns the same state because it is not affected by pod additions or deletions.
Expand All @@ -74,7 +72,7 @@ func (s *preFilterState) Clone() framework.StateData {

// PreFilter builds and writes cycle state used by Filter.
func (pl *NodeAffinity) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
state := getPodRequiredNodeSelectorAndAffinity(pod)
state := &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
cycleState.Write(preFilterStateKey, state)
return nil
}
Expand All @@ -99,21 +97,15 @@ func (pl *NodeAffinity) Filter(ctx context.Context, state *framework.CycleState,
if err != nil {
// Fallback to calculate requiredNodeSelector and requiredNodeAffinity
// here when PreFilter is disabled.
s = getPodRequiredNodeSelectorAndAffinity(pod)
s = &preFilterState{requiredNodeSelectorAndAffinity: nodeaffinity.GetRequiredNodeAffinity(pod)}
}

if s.requiredNodeSelector != nil {
if !s.requiredNodeSelector.Matches(labels.Set(node.Labels)) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod)
}
}
if s.requiredNodeAffinity != nil {
// Ignore parsing errors for backwards compatibility.
matches, _ := s.requiredNodeAffinity.Match(node)
if !matches {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod)
}
// Ignore parsing errors for backwards compatibility.
match, _ := s.requiredNodeSelectorAndAffinity.Match(node)
if !match {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonPod)
}

return nil
}

Expand Down Expand Up @@ -248,21 +240,6 @@ func getPreScoreState(cycleState *framework.CycleState) (*preScoreState, error)
return s, nil
}

func getPodRequiredNodeSelectorAndAffinity(pod *v1.Pod) *preFilterState {
var selector labels.Selector
if len(pod.Spec.NodeSelector) > 0 {
selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
}
// Use LazyErrorNodeSelector for backwards compatibility of parsing errors.
var affinity *nodeaffinity.LazyErrorNodeSelector
if pod.Spec.Affinity != nil &&
pod.Spec.Affinity.NodeAffinity != nil &&
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
affinity = nodeaffinity.NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
}
return &preFilterState{requiredNodeSelector: selector, requiredNodeAffinity: affinity}
}

func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error) {
c, err := cycleState.Read(preFilterStateKey)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)

const preFilterStateKey = "PreFilter" + Name
Expand Down Expand Up @@ -222,6 +222,7 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
TpKeyToCriticalPaths: make(map[string]*criticalPaths, len(constraints)),
TpPairToMatchNum: make(map[topologyPair]*int32, sizeHeuristic(len(allNodes), constraints)),
}
requiredSchedulingTerm := nodeaffinity.GetRequiredNodeAffinity(pod)
for _, n := range allNodes {
node := n.Node()
if node == nil {
Expand All @@ -230,7 +231,9 @@ func (pl *PodTopologySpread) calPreFilterState(pod *v1.Pod) (*preFilterState, er
}
// In accordance to design, if NodeAffinity or NodeSelector is defined,
// spreading is applied to nodes that pass those filters.
if !helper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) {
// Ignore parsing errors for backwards compatibility.
match, _ := requiredSchedulingTerm.Match(node)
if !match {
continue
}
// Ensure current node's labels contains all topologyKeys in 'Constraints'.
Expand Down
8 changes: 5 additions & 3 deletions pkg/scheduler/framework/plugins/podtopologyspread/scoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/component-helpers/scheduling/corev1/nodeaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework"
pluginhelper "k8s.io/kubernetes/pkg/scheduler/framework/plugins/helper"
)

const preScoreStateKey = "PreScore" + Name
Expand Down Expand Up @@ -136,6 +136,8 @@ func (pl *PodTopologySpread) PreScore(
return nil
}

// Ignore parsing errors for backwards compatibility.
requiredNodeAffinity := nodeaffinity.GetRequiredNodeAffinity(pod)
processAllNode := func(i int) {
nodeInfo := allNodes[i]
node := nodeInfo.Node()
Expand All @@ -144,8 +146,8 @@ func (pl *PodTopologySpread) PreScore(
}
// (1) `node` should satisfy incoming pod's NodeSelector/NodeAffinity
// (2) All topologyKeys need to be present in `node`
if !pluginhelper.PodMatchesNodeSelectorAndAffinityTerms(pod, node) ||
!nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
match, _ := requiredNodeAffinity.Match(node)
if !match || !nodeLabelsMatchSpreadConstraints(node.Labels, state.Constraints) {
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,3 +287,38 @@ type preferredSchedulingTerm struct {
nodeSelectorTerm
weight int
}

type RequiredNodeAffinity struct {
labelSelector labels.Selector
nodeSelector *LazyErrorNodeSelector
}

// GetRequiredNodeAffinity returns the parsing result of pod's nodeSelector and nodeAffinity.
func GetRequiredNodeAffinity(pod *v1.Pod) RequiredNodeAffinity {
var selector labels.Selector
if len(pod.Spec.NodeSelector) > 0 {
selector = labels.SelectorFromSet(pod.Spec.NodeSelector)
}
// Use LazyErrorNodeSelector for backwards compatibility of parsing errors.
var affinity *LazyErrorNodeSelector
if pod.Spec.Affinity != nil &&
pod.Spec.Affinity.NodeAffinity != nil &&
pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
affinity = NewLazyErrorNodeSelector(pod.Spec.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution)
}
return RequiredNodeAffinity{labelSelector: selector, nodeSelector: affinity}
}

// Match checks whether the pod is schedulable onto nodes according to
// the requirements in both nodeSelector and nodeAffinity.
func (s RequiredNodeAffinity) Match(node *v1.Node) (bool, error) {
if s.labelSelector != nil {
if !s.labelSelector.Matches(labels.Set(node.Labels)) {
return false, nil
}
}
if s.nodeSelector != nil {
return s.nodeSelector.Match(node)
}
return true, nil
}

0 comments on commit f170049

Please sign in to comment.