Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enforce ReadWriteOncePod during scheduling #103082

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/testing/defaults/defaults.go
Expand Up @@ -32,6 +32,7 @@ var PluginsV1beta1 = &config.Plugins{
Enabled: []config.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down Expand Up @@ -113,6 +114,7 @@ var PluginsV1beta2 = &config.Plugins{
Enabled: []config.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta1/default_plugins.go
Expand Up @@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta1.Plugins {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/v1beta1/default_plugins_test.go
Expand Up @@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down Expand Up @@ -129,6 +130,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta1/defaults_test.go
Expand Up @@ -329,6 +329,7 @@ func TestSchedulerDefaults(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta2/default_plugins.go
Expand Up @@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta2.Plugins {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/v1beta2/default_plugins_test.go
Expand Up @@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down Expand Up @@ -128,6 +129,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta2/defaults_test.go
Expand Up @@ -332,6 +332,7 @@ func TestSchedulerDefaults(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/plugins/feature/feature.go
Expand Up @@ -23,4 +23,5 @@ type Features struct {
EnablePodAffinityNamespaceSelector bool
EnablePodDisruptionBudget bool
EnablePodOverhead bool
EnableReadWriteOncePod bool
}
7 changes: 5 additions & 2 deletions pkg/scheduler/framework/plugins/registry.go
Expand Up @@ -53,6 +53,7 @@ func NewInTreeRegistry() runtime.Registry {
EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
}

return runtime.Registry{
Expand Down Expand Up @@ -80,8 +81,10 @@ func NewInTreeRegistry() runtime.Registry {
noderesources.RequestedToCapacityRatioName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return noderesources.NewRequestedToCapacityRatio(plArgs, fh, fts)
},
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: volumerestrictions.New,
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return volumerestrictions.New(plArgs, fh, fts)
},
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: nodevolumelimits.NewCSI,
nodevolumelimits.EBSName: nodevolumelimits.NewEBS,
Expand Down
Expand Up @@ -18,17 +18,29 @@ package volumerestrictions

import (
"context"
"sync/atomic"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)

// VolumeRestrictions is a plugin that checks volume restrictions.
type VolumeRestrictions struct{}
type VolumeRestrictions struct {
parallelizer parallelize.Parallelizer
pvcLister corelisters.PersistentVolumeClaimLister
nodeInfoLister framework.SharedLister
enableReadWriteOncePod bool
}

var _ framework.PreFilterPlugin = &VolumeRestrictions{}
var _ framework.FilterPlugin = &VolumeRestrictions{}
var _ framework.EnqueueExtensions = &VolumeRestrictions{}

Expand All @@ -38,6 +50,8 @@ const Name = names.VolumeRestrictions
const (
// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
ErrReasonDiskConflict = "node(s) had no available disk"
// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
)

// Name returns name of the plugin. It is used in logs, etc.
Expand Down Expand Up @@ -106,6 +120,72 @@ func haveOverlap(a1, a2 []string) bool {
return false
}

func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
if pl.enableReadWriteOncePod {
return pl.isReadWriteOncePodAccessModeConflict(pod)
}
return framework.NewStatus(framework.Success)
}

// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.
// This access mode restricts volume access to a single pod on a single node. Since only a single pod can
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
// TODO(#103132): Mark pod as Unschedulable and add preemption logic.
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status {
nodeInfos, err := pl.nodeInfoLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, "error while getting node info")
}

var pvcKeys []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
chrishenzie marked this conversation as resolved.
Show resolved Hide resolved
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
}

if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}

key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName
pvcKeys = append(pvcKeys, key)
}

ctx, cancel := context.WithCancel(context.Background())
var conflicts uint32

processNode := func(i int) {
nodeInfo := nodeInfos[i]
for _, key := range pvcKeys {
refCount := nodeInfo.PVCRefCounts[key]
if refCount > 0 {
atomic.AddUint32(&conflicts, 1)
cancel()
}
}
}
pl.parallelizer.Until(ctx, len(nodeInfos), processNode)

// Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet.
if conflicts > 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
}

return nil
}

func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
chrishenzie marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
Expand Down Expand Up @@ -142,10 +222,22 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent {
{Resource: framework.Pod, ActionType: framework.Delete},
// A new Node may make a pod schedulable.
{Resource: framework.Node, ActionType: framework.Add},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
chrishenzie marked this conversation as resolved.
Show resolved Hide resolved
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update},
}
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &VolumeRestrictions{}, nil
func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
nodeInfoLister := handle.SnapshotSharedLister()

return &VolumeRestrictions{
parallelizer: handle.Parallelizer(),
pvcLister: pvcLister,
nodeInfoLister: nodeInfoLister,
enableReadWriteOncePod: fts.EnableReadWriteOncePod,
}, nil
}