Skip to content

Commit

Permalink
Merge pull request #103082 from chrishenzie/read-write-once-pod-acces…
Browse files Browse the repository at this point in the history
…s-mode-scheduler

Enforce ReadWriteOncePod during scheduling
  • Loading branch information
k8s-ci-robot committed Jun 30, 2021
2 parents 0dad7d1 + 7ad44d0 commit 385402d
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 22 deletions.
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/testing/defaults/defaults.go
Expand Up @@ -32,6 +32,7 @@ var PluginsV1beta1 = &config.Plugins{
Enabled: []config.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down Expand Up @@ -162,6 +163,7 @@ var PluginsV1beta2 = &config.Plugins{
Enabled: []config.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta1/default_plugins.go
Expand Up @@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta1.Plugins {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/v1beta1/default_plugins_test.go
Expand Up @@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down Expand Up @@ -129,6 +130,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta1/defaults_test.go
Expand Up @@ -337,6 +337,7 @@ func TestSchedulerDefaults(t *testing.T) {
Enabled: []v1beta1.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta2/default_plugins.go
Expand Up @@ -38,6 +38,7 @@ func getDefaultPlugins() *v1beta2.Plugins {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/apis/config/v1beta2/default_plugins_test.go
Expand Up @@ -47,6 +47,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down Expand Up @@ -128,6 +129,7 @@ func TestApplyFeatureGates(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/apis/config/v1beta2/defaults_test.go
Expand Up @@ -320,6 +320,7 @@ func TestSchedulerDefaults(t *testing.T) {
Enabled: []v1beta2.Plugin{
{Name: names.NodeResourcesFit},
{Name: names.NodePorts},
{Name: names.VolumeRestrictions},
{Name: names.PodTopologySpread},
{Name: names.InterPodAffinity},
{Name: names.VolumeBinding},
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler/framework/plugins/feature/feature.go
Expand Up @@ -23,4 +23,5 @@ type Features struct {
EnablePodAffinityNamespaceSelector bool
EnablePodDisruptionBudget bool
EnablePodOverhead bool
EnableReadWriteOncePod bool
}
7 changes: 5 additions & 2 deletions pkg/scheduler/framework/plugins/registry.go
Expand Up @@ -53,6 +53,7 @@ func NewInTreeRegistry() runtime.Registry {
EnablePodAffinityNamespaceSelector: feature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector),
EnablePodDisruptionBudget: feature.DefaultFeatureGate.Enabled(features.PodDisruptionBudget),
EnablePodOverhead: feature.DefaultFeatureGate.Enabled(features.PodOverhead),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
}

return runtime.Registry{
Expand Down Expand Up @@ -80,8 +81,10 @@ func NewInTreeRegistry() runtime.Registry {
noderesources.RequestedToCapacityRatioName: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return noderesources.NewRequestedToCapacityRatio(plArgs, fh, fts)
},
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: volumerestrictions.New,
volumebinding.Name: volumebinding.New,
volumerestrictions.Name: func(plArgs apiruntime.Object, fh framework.Handle) (framework.Plugin, error) {
return volumerestrictions.New(plArgs, fh, fts)
},
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: nodevolumelimits.NewCSI,
nodevolumelimits.EBSName: nodevolumelimits.NewEBS,
Expand Down
Expand Up @@ -18,17 +18,29 @@ package volumerestrictions

import (
"context"
"sync/atomic"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/internal/parallelize"
)

// VolumeRestrictions is a plugin that checks volume restrictions.
type VolumeRestrictions struct{}
type VolumeRestrictions struct {
parallelizer parallelize.Parallelizer
pvcLister corelisters.PersistentVolumeClaimLister
nodeInfoLister framework.SharedLister
enableReadWriteOncePod bool
}

var _ framework.PreFilterPlugin = &VolumeRestrictions{}
var _ framework.FilterPlugin = &VolumeRestrictions{}
var _ framework.EnqueueExtensions = &VolumeRestrictions{}

Expand All @@ -38,6 +50,8 @@ const Name = names.VolumeRestrictions
const (
// ErrReasonDiskConflict is used for NoDiskConflict predicate error.
ErrReasonDiskConflict = "node(s) had no available disk"
// ErrReasonReadWriteOncePodConflict is used when a pod is found using the same PVC with the ReadWriteOncePod access mode.
ErrReasonReadWriteOncePodConflict = "node has pod using PersistentVolumeClaim with the same name and ReadWriteOncePod access mode"
)

// Name returns name of the plugin. It is used in logs, etc.
Expand Down Expand Up @@ -106,6 +120,72 @@ func haveOverlap(a1, a2 []string) bool {
return false
}

func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
if pl.enableReadWriteOncePod {
return pl.isReadWriteOncePodAccessModeConflict(pod)
}
return framework.NewStatus(framework.Success)
}

// isReadWriteOncePodAccessModeConflict checks if a pod uses a PVC with the ReadWriteOncePod access mode.
// This access mode restricts volume access to a single pod on a single node. Since only a single pod can
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
// TODO(#103132): Mark pod as Unschedulable and add preemption logic.
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status {
nodeInfos, err := pl.nodeInfoLister.NodeInfos().List()
if err != nil {
return framework.NewStatus(framework.Error, "error while getting node info")
}

var pvcKeys []string
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
}
return framework.AsStatus(err)
}

if !v1helper.ContainsAccessMode(pvc.Spec.AccessModes, v1.ReadWriteOncePod) {
continue
}

key := pod.Namespace + "/" + volume.PersistentVolumeClaim.ClaimName
pvcKeys = append(pvcKeys, key)
}

ctx, cancel := context.WithCancel(context.Background())
var conflicts uint32

processNode := func(i int) {
nodeInfo := nodeInfos[i]
for _, key := range pvcKeys {
refCount := nodeInfo.PVCRefCounts[key]
if refCount > 0 {
atomic.AddUint32(&conflicts, 1)
cancel()
}
}
}
pl.parallelizer.Until(ctx, len(nodeInfos), processNode)

// Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet.
if conflicts > 0 {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, ErrReasonReadWriteOncePodConflict)
}

return nil
}

func (pl *VolumeRestrictions) PreFilterExtensions() framework.PreFilterExtensions {
return nil
}

// Filter invoked at the filter extension point.
// It evaluates if a pod can fit due to the volumes it requests, and those that
// are already mounted. If there is already a volume mounted on that node, another pod that uses the same volume
Expand Down Expand Up @@ -142,10 +222,22 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEvent {
{Resource: framework.Pod, ActionType: framework.Delete},
// A new Node may make a pod schedulable.
{Resource: framework.Node, ActionType: framework.Add},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update},
}
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &VolumeRestrictions{}, nil
func New(_ runtime.Object, handle framework.Handle, fts feature.Features) (framework.Plugin, error) {
informerFactory := handle.SharedInformerFactory()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
nodeInfoLister := handle.SnapshotSharedLister()

return &VolumeRestrictions{
parallelizer: handle.Parallelizer(),
pvcLister: pvcLister,
nodeInfoLister: nodeInfoLister,
enableReadWriteOncePod: fts.EnableReadWriteOncePod,
}, nil
}

0 comments on commit 385402d

Please sign in to comment.