Skip to content

Commit

Permalink
Implement QueueingHintFn in VolumeRestriction
Browse files Browse the repository at this point in the history
  • Loading branch information
HirazawaUi committed Nov 26, 2023
1 parent ae2e0c0 commit 6b76cfb
Show file tree
Hide file tree
Showing 2 changed files with 539 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)

// VolumeRestrictions is a plugin that checks volume restrictions.
Expand Down Expand Up @@ -168,7 +170,7 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo
}
}

pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
pvcs, err := pl.readWriteOncePodPVCsForPod(pod, false)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
Expand Down Expand Up @@ -239,7 +241,7 @@ func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod
}, nil
}

func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(pod *v1.Pod, ignoreNotFoundError bool) (sets.Set[string], error) {
pvcs := sets.New[string]()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
Expand All @@ -248,6 +250,9 @@ func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, po

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if ignoreNotFoundError && apierrors.IsNotFound(err) {
continue
}
return nil, err
}

Expand Down Expand Up @@ -321,13 +326,116 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
// A new Node may make a pod schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: pl.isSchedulableAfterNodeAdded},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
}
}

// isSchedulableAfterPodDeleted is invoked whenever a pod deleted,
// It checks whether the deleted pod will conflict with volumes of other pods on the same node
func (pl *VolumeRestrictions) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err)
}

if deletedPod.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}

newPodPvcs, err := pl.readWriteOncePodPVCsForPod(pod, false)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("no PVC for the Pod is found, this Pod won't be schedulable until PVC is created", "pod", klog.KObj(pod), "err", err)
return framework.QueueSkip, nil
}
return framework.Queue, err
}

// deletedPod may contain multiple PVCs and one or more PVCs have been deleted (but some still exist),
// so we need to ignore not found error,
// because we also need to determine if the undeleted pvc relate to the current pod or not.
deletedPodPvcs, err := pl.readWriteOncePodPVCsForPod(deletedPod, true)
if err != nil {
return framework.Queue, err
}

// If oldPod and the current pod are in conflict because of readWriteOncePodPVC,
// the current pod may be scheduled in the next scheduling cycle, so we return Queue
for pvc := range deletedPodPvcs {
if newPodPvcs.Has(pvc) {
return framework.Queue, nil
}
}

nodeInfo := framework.NewNodeInfo(deletedPod)
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.Queue, nil
}

return framework.QueueSkip, nil
}

// isSchedulableAfterNodeAdded is invoked whenever a node added, It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *VolumeRestrictions) isSchedulableAfterNodeAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
_, newNode, err := util.As[*v1.Node](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterNodeAdded: %w", err)
}

// A new Node should not cause any volume conflicts basically.
// This Pod may be able to get a seat on this new Node.
logger.V(5).Info("node was added which the Pod may be able to get scheduled on", "pod", klog.KObj(pod), "node", klog.KObj(newNode))

return framework.Queue, nil
}

// isSchedulableAfterPersistentVolumeClaimChange is invoked whenever a PersistentVolumeClaim changed, It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *VolumeRestrictions) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
oldPersistentVolumeClaim, newPersistentVolumeClaim, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeClaimChange: %w", err)
}

if newPersistentVolumeClaim.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}

pvcs := sets.New[string]()

for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("The PVC for the Pod is not created, and this Pod won't be schedulable until the PVC is created",
"pod", klog.KObj(pod), "pvc", volume.PersistentVolumeClaim.ClaimName, "err", err)
return framework.QueueSkip, nil
}
return framework.Queue, err
}

pvcs.Insert(pvc.Name)
}

if oldPersistentVolumeClaim == nil {
if !pvcs.Has(newPersistentVolumeClaim.Name) {
// We're only interested in PVC which the Pod requests.
return framework.QueueSkip, nil
}
return framework.Queue, nil
}

return framework.QueueSkip, nil
}

// New initializes a new plugin and returns it.
Expand Down

0 comments on commit 6b76cfb

Please sign in to comment.