diff --git a/pkg/scheduler/apis/config/testing/defaults/defaults.go b/pkg/scheduler/apis/config/testing/defaults/defaults.go index 0aeb66187d22..f321b229f3bd 100644 --- a/pkg/scheduler/apis/config/testing/defaults/defaults.go +++ b/pkg/scheduler/apis/config/testing/defaults/defaults.go @@ -32,6 +32,7 @@ var PluginsV1beta1 = &config.Plugins{ Enabled: []config.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, @@ -162,6 +163,7 @@ var PluginsV1beta2 = &config.Plugins{ Enabled: []config.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/apis/config/v1beta1/default_plugins.go b/pkg/scheduler/apis/config/v1beta1/default_plugins.go index 71cc0cf6dc87..9813720202b7 100644 --- a/pkg/scheduler/apis/config/v1beta1/default_plugins.go +++ b/pkg/scheduler/apis/config/v1beta1/default_plugins.go @@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta1.Plugins { Enabled: []v1beta1.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/apis/config/v1beta1/default_plugins_test.go b/pkg/scheduler/apis/config/v1beta1/default_plugins_test.go index 837881941319..10932ffd0565 100644 --- a/pkg/scheduler/apis/config/v1beta1/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1beta1/default_plugins_test.go @@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) { Enabled: []v1beta1.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, @@ -129,6 +130,7 @@ func TestApplyFeatureGates(t *testing.T) { Enabled: []v1beta1.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/apis/config/v1beta1/defaults_test.go b/pkg/scheduler/apis/config/v1beta1/defaults_test.go index b094603ecce6..82ddc702d8b3 100644 --- a/pkg/scheduler/apis/config/v1beta1/defaults_test.go +++ b/pkg/scheduler/apis/config/v1beta1/defaults_test.go @@ -337,6 +337,7 @@ func TestSchedulerDefaults(t *testing.T) { Enabled: []v1beta1.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/apis/config/v1beta2/default_plugins.go b/pkg/scheduler/apis/config/v1beta2/default_plugins.go index accbddd1eaf7..182394a68a91 100644 --- a/pkg/scheduler/apis/config/v1beta2/default_plugins.go +++ b/pkg/scheduler/apis/config/v1beta2/default_plugins.go @@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta2.Plugins { Enabled: []v1beta2.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go b/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go index d1f609d0758a..67cb3e9d6df2 100644 --- a/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go +++ b/pkg/scheduler/apis/config/v1beta2/default_plugins_test.go @@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) { Enabled: []v1beta2.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, @@ -128,6 +129,7 @@ func TestApplyFeatureGates(t *testing.T) { Enabled: []v1beta2.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/apis/config/v1beta2/defaults_test.go b/pkg/scheduler/apis/config/v1beta2/defaults_test.go index 42a84030c077..0ca6c82169b1 100644 --- a/pkg/scheduler/apis/config/v1beta2/defaults_test.go +++ b/pkg/scheduler/apis/config/v1beta2/defaults_test.go @@ -320,6 +320,7 @@ func TestSchedulerDefaults(t *testing.T) { Enabled: []v1beta2.Plugin{ {Name: names.NodeResourcesFit}, {Name: names.NodePorts}, + {Name: names.VolumeRestrictions}, {Name: names.PodTopologySpread}, {Name: names.InterPodAffinity}, {Name: names.VolumeBinding}, diff --git a/pkg/scheduler/framework/plugins/feature/feature.go b/pkg/scheduler/framework/plugins/feature/feature.go index 4f4a10807902..7987f5a476dc 100644 --- a/pkg/scheduler/framework/plugins/feature/feature.go +++ b/pkg/scheduler/framework/plugins/feature/feature.go @@ -23,4 +23,5 @@ type Features struct { EnablePodAffinityNamespaceSelector bool EnablePodDisruptionBudget bool EnablePodOverhead bool + EnableReadWriteOncePod bool } diff --git a/pkg/scheduler/framework/plugins/registry.go b/pkg/scheduler/framework/plugins/registry.go index d7e37167dc96..c4794cec28b9 100644 --- a/pkg/scheduler/framework/plugins/registry.go +++ b/pkg/scheduler/framework/plugins/registry.go @@ -53,6 +53,7 @@ func NewInTreeRegistry() runtime.Registry { EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector), EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget), EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead), + EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod), } return runtime.Registry{ @@ -80,8 +81,10 @@ func NewInTreeRegistry() runtime.Registry { noderesources.RequestedToCapacityRatioName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { return noderesources.NewRequestedToCapacityRatio(plArgs, fh, fts) }, - volumebinding.Name: volumebinding.New, - volumerestrictions.Name: volumerestrictions.New, + volumebinding.Name: volumebinding.New, + volumerestrictions.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) { + return volumerestrictions.New(plArgs, fh, fts) + }, volumezone.Name: volumezone.New, nodevolumelimits.CSIName: nodevolumelimits.NewCSI, nodevolumelimits.EBSName: nodevolumelimits.NewEBS, diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index b4ee829d583c..53856c26eda5 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -18,17 +18,29 @@ package volumerestrictions import ( "context" + "sync/atomic" v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" + corelisters "k8s.io/client-go/listers/core/v1" + v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" "k8s.io/kubernetes/pkg/scheduler/framework/plugins/names" + "k8s.io/kubernetes/pkg/scheduler/internal/parallelize" ) // VolumeRestrictions is a plugin that checks volume restrictions. -type VolumeRestrictions struct{} +type VolumeRestrictions struct { + parallelizer parallelize.Parallelizer + pvcLister corelisters.PersistentVolumeClaimLister + nodeInfoLister framework.SharedLister + enableReadWriteOncePod bool +} +var _ framework.PreFilterPlugin = &VolumeRestrictions{} var _ framework.FilterPlugin = &VolumeRestrictions{} var _ framework.EnqueueExtensions = &VolumeRestrictions{} @@ -38,6 +50,8 @@ const Name = names.VolumeRestrictions const ( // ErrReasonDiskConflict is used for NoDiskConflict predicate error. ErrReasonDiskConflict = "node(s) had no available disk" + // ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode. + ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode" ) // Name returns name of the plugin. It is used in logs, etc. @@ -106,6 +120,72 @@ func haveOverlap(a1, a2 []string) bool { return false } +func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { + if pl.enableReadWriteOncePod { + return pl.isReadWriteOncePodAccessModeConflict(pod) + } + return framework.NewStatus(framework.Success) +} + +// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode. +// This access mode restricts volume access to a single pod on a single node. Since only a single pod can +// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable. +// TODO(#103132): Mark pod as Unschedulable and add preemption logic. +func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status { + nodeInfos, err := pl.nodeInfoLister.NodeInfos().List() + if err != nil { + return framework.NewStatus(framework.Error, "error while getting node info") + } + + var pvcKeys []string + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim == nil { + continue + } + + pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName) + if err != nil { + if apierrors.IsNotFound(err) { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error()) + } + return framework.AsStatus(err) + } + + if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) { + continue + } + + key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName + pvcKeys = append(pvcKeys, key) + } + + ctx, cancel := context.WithCancel(context.Background()) + var conflicts uint32 + + processNode := func(i int) { + nodeInfo := nodeInfos[i] + for _, key := range pvcKeys { + refCount := nodeInfo.PVCRefCounts[key] + if refCount > 0 { + atomic.AddUint32(&conflicts, 1) + cancel() + } + } + } + pl.parallelizer.Until(ctx, len(nodeInfos), processNode) + + // Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet. + if conflicts > 0 { + return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict) + } + + return nil +} + +func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions { + return nil +} + // Filter invoked at the filter extension point. // It evaluates if a pod can fit due to the volumes it requests, and those that // are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume @@ -142,10 +222,22 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent { {Resource: framework.Pod, ActionType: framework.Delete}, // A new Node may make a pod schedulable. {Resource: framework.Node, ActionType: framework.Add}, + // Pods may fail to schedule because the PVC it uses has not yet been created. + // This PVC is required to exist to check its access modes. + {Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, } } // New initializes a new plugin and returns it. -func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) { - return &VolumeRestrictions{}, nil +func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) { + informerFactory := handle.SharedInformerFactory() + pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister() + nodeInfoLister := handle.SnapshotSharedLister() + + return &VolumeRestrictions{ + parallelizer: handle.Parallelizer(), + pvcLister: pvcLister, + nodeInfoLister: nodeInfoLister, + enableReadWriteOncePod: fts.EnableReadWriteOncePod, + }, nil } diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go index a7208a4c76a4..2e3d3e7c0916 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions_test.go @@ -22,7 +22,16 @@ import ( "testing" v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/scheduler/apis/config" "k8s.io/kubernetes/pkg/scheduler/framework" + "k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature" + plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing" + "k8s.io/kubernetes/pkg/scheduler/internal/cache" ) func TestGCEDiskConflicts(t *testing.T) { @@ -64,7 +73,7 @@ func TestGCEDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p, _ := New(nil, nil) + p := newPlugin(t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -112,7 +121,7 @@ func TestAWSDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p, _ := New(nil, nil) + p := newPlugin(t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -166,7 +175,7 @@ func TestRBDDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p, _ := New(nil, nil) + p := newPlugin(t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -220,7 +229,7 @@ func TestISCSIDiskConflicts(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - p, _ := New(nil, nil) + p := newPlugin(t) gotStatus := p.(framework.FilterPlugin).Filter(context.Background(), nil, test.pod, test.nodeInfo) if !reflect.DeepEqual(gotStatus, test.wantStatus) { t.Errorf("status does not match: %v, want: %v", gotStatus, test.wantStatus) @@ -228,3 +237,150 @@ func TestISCSIDiskConflicts(t *testing.T) { }) } } + +func TestAccessModeConflicts(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.ReadWriteOncePod, true)() + + podWithReadWriteOncePodPVC := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + // Required for querying lister for PVCs in the same namespace. + Namespace: "default", + Name: "pod-with-rwop", + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "claim-with-rwop", + }, + }, + }, + }, + }, + } + podWithReadWriteManyPVC := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + // Required for querying lister for PVCs in the same namespace. + Namespace: "default", + Name: "pod-with-rwx", + }, + Spec: v1.PodSpec{ + NodeName: "node-1", + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "claim-with-rwx", + }, + }, + }, + }, + }, + } + + node := &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "node-1", + }, + } + + readWriteOncePodPVC := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "claim-with-rwop", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOncePod}, + }, + } + readWriteManyPVC := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "claim-with-rwx", + }, + Spec: v1.PersistentVolumeClaimSpec{ + AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteMany}, + }, + } + + tests := []struct { + name string + pod *v1.Pod + existingPods []*v1.Pod + existingNodes []*v1.Node + existingPVCs []*v1.PersistentVolumeClaim + enableReadWriteOncePod bool + wantStatus *framework.Status + }{ + { + name: "nothing", + pod: &v1.Pod{}, + existingPods: []*v1.Pod{}, + existingNodes: []*v1.Node{}, + existingPVCs: []*v1.PersistentVolumeClaim{}, + enableReadWriteOncePod: true, + wantStatus: nil, + }, + { + name: "failed to get PVC", + pod: podWithReadWriteOncePodPVC, + existingPods: []*v1.Pod{}, + existingNodes: []*v1.Node{}, + existingPVCs: []*v1.PersistentVolumeClaim{}, + enableReadWriteOncePod: true, + wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, "persistentvolumeclaim \"claim-with-rwop\" not found"), + }, + { + name: "no access mode conflict", + pod: podWithReadWriteOncePodPVC, + existingPods: []*v1.Pod{podWithReadWriteManyPVC}, + existingNodes: []*v1.Node{node}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC}, + enableReadWriteOncePod: true, + wantStatus: nil, + }, + { + name: "access mode conflict", + pod: podWithReadWriteOncePodPVC, + existingPods: []*v1.Pod{podWithReadWriteOncePodPVC, podWithReadWriteManyPVC}, + existingNodes: []*v1.Node{node}, + existingPVCs: []*v1.PersistentVolumeClaim{readWriteOncePodPVC, readWriteManyPVC}, + enableReadWriteOncePod: true, + wantStatus: framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict), + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + p := newPluginWithListers(t, test.existingPods, test.existingNodes, test.existingPVCs, test.enableReadWriteOncePod) + gotStatus := p.(framework.PreFilterPlugin).PreFilter(context.Background(), nil, test.pod) + if !reflect.DeepEqual(gotStatus, test.wantStatus) { + t.Errorf("status does not match: %+v, want: %+v", gotStatus, test.wantStatus) + } + }) + } +} + +func newPlugin(t *testing.T) framework.Plugin { + return newPluginWithListers(t, nil, nil, nil, true) +} + +func newPluginWithListers(t *testing.T, pods []*v1.Pod, nodes []*v1.Node, pvcs []*v1.PersistentVolumeClaim, enableReadWriteOncePod bool) framework.Plugin { + ctx := context.Background() + pluginFactory := func(plArgs runtime.Object, fh framework.Handle) (framework.Plugin, error) { + return New(plArgs, fh, feature.Features{ + EnableReadWriteOncePod: enableReadWriteOncePod, + }) + } + snapshot := cache.NewSnapshot(pods, nodes) + + objects := make([]runtime.Object, 0, len(pvcs)) + for _, pvc := range pvcs { + objects = append(objects, pvc) + } + + return plugintesting.SetupPluginWithInformers(ctx, t, pluginFactory, &config.InterPodAffinityArgs{}, snapshot, objects) +} diff --git a/pkg/scheduler/framework/types.go b/pkg/scheduler/framework/types.go index da8346ca25f6..a37b27b45ce2 100644 --- a/pkg/scheduler/framework/types.go +++ b/pkg/scheduler/framework/types.go @@ -387,6 +387,10 @@ type NodeInfo struct { // state information. ImageStates map[string]*ImageStateSummary + // PVCRefCounts contains a mapping of PVC names to the number of pods on the node using it. + // Keys are in the format "namespace/name". + PVCRefCounts map[string]int + // Whenever NodeInfo changes, generation is bumped. // This is used to avoid cloning it if the object didn't change. Generation int64 @@ -512,6 +516,7 @@ func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { Generation: nextGeneration(), UsedPorts: make(HostPortInfo), ImageStates: make(map[string]*ImageStateSummary), + PVCRefCounts: make(map[string]int), } for _, pod := range pods { ni.AddPod(pod) @@ -536,6 +541,7 @@ func (n *NodeInfo) Clone() *NodeInfo { Allocatable: n.Allocatable.Clone(), UsedPorts: make(HostPortInfo), ImageStates: n.ImageStates, + PVCRefCounts: n.PVCRefCounts, Generation: n.Generation, } if len(n.Pods) > 0 { @@ -595,6 +601,7 @@ func (n *NodeInfo) AddPodInfo(podInfo *PodInfo) { // Consume ports when pods added. n.updateUsedPorts(podInfo.Pod, true) + n.updatePVCRefCounts(podInfo.Pod, true) n.Generation = nextGeneration() } @@ -672,6 +679,7 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error { // Release ports when remove Pods. n.updateUsedPorts(pod, false) + n.updatePVCRefCounts(pod, false) n.Generation = nextGeneration() n.resetSlicesIfEmpty() @@ -749,6 +757,25 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) { } } +// updatePVCRefCounts updates the PVCRefCounts of NodeInfo. +func (n *NodeInfo) updatePVCRefCounts(pod *v1.Pod, add bool) { + for _, v := range pod.Spec.Volumes { + if v.PersistentVolumeClaim == nil { + continue + } + + key := pod.Namespace + "/" + v.PersistentVolumeClaim.ClaimName + if add { + n.PVCRefCounts[key] += 1 + } else { + n.PVCRefCounts[key] -= 1 + if n.PVCRefCounts[key] <= 0 { + delete(n.PVCRefCounts, key) + } + } + } +} + // SetNode sets the overall node information. func (n *NodeInfo) SetNode(node *v1.Node) { n.node = node diff --git a/pkg/scheduler/framework/types_test.go b/pkg/scheduler/framework/types_test.go index f028139b868c..7b1a0b420906 100644 --- a/pkg/scheduler/framework/types_test.go +++ b/pkg/scheduler/framework/types_test.go @@ -211,7 +211,7 @@ type testingMode interface { Fatalf(format string, args ...interface{}) } -func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort) *v1.Pod { +func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, ports []v1.ContainerPort, volumes []v1.Volume) *v1.Pod { req := v1.ResourceList{} if cpu != "" { req = v1.ResourceList{ @@ -240,6 +240,7 @@ func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, po Ports: ports, }}, NodeName: nodeName, + Volumes: volumes, }, } } @@ -247,8 +248,8 @@ func makeBasePod(t testingMode, nodeName, objName, cpu, mem, extended string, po func TestNewNodeInfo(t *testing.T) { nodeName := "test-node" pods := []*v1.Pod{ - makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), - makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}, nil), + makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}, nil), } expected := &NodeInfo{ @@ -274,7 +275,8 @@ func TestNewNodeInfo(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - ImageStates: map[string]*ImageStateSummary{}, + ImageStates: map[string]*ImageStateSummary{}, + PVCRefCounts: map[string]int{}, Pods: []*PodInfo{ { Pod: &v1.Pod{ @@ -366,7 +368,8 @@ func TestNodeInfoClone(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - ImageStates: map[string]*ImageStateSummary{}, + ImageStates: map[string]*ImageStateSummary{}, + PVCRefCounts: map[string]int{}, Pods: []*PodInfo{ { Pod: &v1.Pod{ @@ -439,7 +442,8 @@ func TestNodeInfoClone(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - ImageStates: map[string]*ImageStateSummary{}, + ImageStates: map[string]*ImageStateSummary{}, + PVCRefCounts: map[string]int{}, Pods: []*PodInfo{ { Pod: &v1.Pod{ @@ -548,6 +552,15 @@ func TestNodeInfoAddPod(t *testing.T) { Overhead: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("500m"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-1", + }, + }, + }, + }, }, }, { @@ -578,6 +591,15 @@ func TestNodeInfoAddPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("500"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-1", + }, + }, + }, + }, }, }, { @@ -618,6 +640,15 @@ func TestNodeInfoAddPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("500"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-2", + }, + }, + }, + }, }, }, } @@ -649,7 +680,8 @@ func TestNodeInfoAddPod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - ImageStates: map[string]*ImageStateSummary{}, + ImageStates: map[string]*ImageStateSummary{}, + PVCRefCounts: map[string]int{"node_info_cache_test/pvc-1": 2, "node_info_cache_test/pvc-2": 1}, Pods: []*PodInfo{ { Pod: &v1.Pod{ @@ -680,6 +712,15 @@ func TestNodeInfoAddPod(t *testing.T) { Overhead: v1.ResourceList{ v1.ResourceCPU: resource.MustParse("500m"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-1", + }, + }, + }, + }, }, }, }, @@ -712,6 +753,15 @@ func TestNodeInfoAddPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("500"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-1", + }, + }, + }, + }, }, }, }, @@ -754,6 +804,15 @@ func TestNodeInfoAddPod(t *testing.T) { v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("500"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-2", + }, + }, + }, + }, }, }, }, @@ -779,9 +838,10 @@ func TestNodeInfoAddPod(t *testing.T) { func TestNodeInfoRemovePod(t *testing.T) { nodeName := "test-node" pods := []*v1.Pod{ - makeBasePod(t, nodeName, "test-1", "100m", "500", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}), - - makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}), + makeBasePod(t, nodeName, "test-1", "100m", "500", "", + []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 80, Protocol: "TCP"}}, + []v1.Volume{{VolumeSource: v1.VolumeSource{PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ClaimName: "pvc-1"}}}}), + makeBasePod(t, nodeName, "test-2", "200m", "1Ki", "", []v1.ContainerPort{{HostIP: "127.0.0.1", HostPort: 8080, Protocol: "TCP"}}, nil), } // add pod Overhead @@ -798,7 +858,7 @@ func TestNodeInfoRemovePod(t *testing.T) { expectedNodeInfo *NodeInfo }{ { - pod: makeBasePod(t, nodeName, "non-exist", "0", "0", "", []v1.ContainerPort{{}}), + pod: makeBasePod(t, nodeName, "non-exist", "0", "0", "", []v1.ContainerPort{{}}, []v1.Volume{}), errExpected: true, expectedNodeInfo: &NodeInfo{ node: &v1.Node{ @@ -828,7 +888,8 @@ func TestNodeInfoRemovePod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - ImageStates: map[string]*ImageStateSummary{}, + ImageStates: map[string]*ImageStateSummary{}, + PVCRefCounts: map[string]int{"node_info_cache_test/pvc-1": 1}, Pods: []*PodInfo{ { Pod: &v1.Pod{ @@ -860,6 +921,15 @@ func TestNodeInfoRemovePod(t *testing.T) { v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("500"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-1", + }, + }, + }, + }, }, }, }, @@ -929,6 +999,15 @@ func TestNodeInfoRemovePod(t *testing.T) { v1.ResourceCPU: resource.MustParse("500m"), v1.ResourceMemory: resource.MustParse("500"), }, + Volumes: []v1.Volume{ + { + VolumeSource: v1.VolumeSource{ + PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{ + ClaimName: "pvc-1", + }, + }, + }, + }, }, }, errExpected: false, @@ -959,7 +1038,8 @@ func TestNodeInfoRemovePod(t *testing.T) { {Protocol: "TCP", Port: 8080}: {}, }, }, - ImageStates: map[string]*ImageStateSummary{}, + ImageStates: map[string]*ImageStateSummary{}, + PVCRefCounts: map[string]int{}, Pods: []*PodInfo{ { Pod: &v1.Pod{