Skip to content

Commit

Permalink
Implement QueueingHintFn in VolumeRestriction
Browse files Browse the repository at this point in the history
  • Loading branch information
HirazawaUi committed Dec 2, 2023
1 parent ae2e0c0 commit 107ed2d
Show file tree
Hide file tree
Showing 2 changed files with 464 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)

// VolumeRestrictions is a plugin that checks volume restrictions.
Expand Down Expand Up @@ -168,7 +170,7 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo
}
}

pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
pvcs, err := pl.readWriteOncePodPVCsForPod(pod, false)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
Expand Down Expand Up @@ -239,7 +241,9 @@ func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod
}, nil
}

func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
// readWriteOncePodPVCsForPod returns the name of WriteOncePod PVCs in a given Pod.
// If ignoreNotFoundError is true, it tries to check all PVCs, ignoring not found errors.
func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(pod *v1.Pod, ignoreNotFoundError bool) (sets.Set[string], error) {
pvcs := sets.New[string]()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
Expand All @@ -248,6 +252,9 @@ func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, po

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if ignoreNotFoundError && apierrors.IsNotFound(err) {
continue
}
return nil, err
}

Expand Down Expand Up @@ -321,13 +328,103 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
// A new Node may make a pod schedulable.
// We intentionally don't set QueueingHint since all Node/Add events could make Pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
}
}

// isSchedulableAfterPodDeleted is invoked whenever a pod deleted,
// It checks whether the deleted pod will conflict with volumes of other pods on the same node
func (pl *VolumeRestrictions) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err)
}

if deletedPod.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}

newPodPvcs, err := pl.readWriteOncePodPVCsForPod(pod, false)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("no PVC for the Pod is found, this Pod won't be schedulable until PVC is created", "pod", klog.KObj(pod), "err", err)
return framework.QueueSkip, nil
}
return framework.Queue, err
}

// deletedPod may contain multiple PVCs and maybe one or more PVCs have been deleted (while some still exist).
// We can always ignore a deleted PVC associated with this deleted Pod because:
// - if a new Pod has this PVC, this Pod won't be schedulable until the PVC with the same name is recreated.
// - if a new Pod doesn't have this PVC, this Pod is completely not related to that deleted PVC.
//
// But, a complex scenario is that when the deleted Pod has more than one PVCs, and PVC-1 is deleted, but PVC-x isn't deleted.
// In this case, as the above describes, PVC1 can be ignored anyway.
// But we still need to check PVC-x, whether the deletion of deletedPod could make the pod schedulable.
deletedPodPvcs, err := pl.readWriteOncePodPVCsForPod(deletedPod, true)
if err != nil {
return framework.Queue, err
}

// If oldPod and the current pod are in conflict because of readWriteOncePodPVC,
// the current pod may be scheduled in the next scheduling cycle, so we return Queue
for pvc := range deletedPodPvcs {
if newPodPvcs.Has(pvc) {
return framework.Queue, nil
}
}

nodeInfo := framework.NewNodeInfo(deletedPod)
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.Queue, nil
}

return framework.QueueSkip, nil
}

// isSchedulableAfterPersistentVolumeClaimChange is invoked whenever a PersistentVolumeClaim added or changed, It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *VolumeRestrictions) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
oldPersistentVolumeClaim, newPersistentVolumeClaim, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeClaimChange: %w", err)
}

if oldPersistentVolumeClaim != nil || newPersistentVolumeClaim.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}

pvcs := sets.New[string]()

for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("The PVC for the Pod is not created, and this Pod won't be schedulable until the PVC is created",
"pod", klog.KObj(pod), "pvc", volume.PersistentVolumeClaim.ClaimName, "err", err)
return framework.QueueSkip, nil
}
return framework.Queue, err
}

pvcs.Insert(pvc.Name)
}

// We're only interested in PVC which the Pod requests.
if oldPersistentVolumeClaim == nil && pvcs.Has(newPersistentVolumeClaim.Name) {
return framework.Queue, nil
}
return framework.QueueSkip, nil
}

// New initializes a new plugin and returns it.
Expand Down

0 comments on commit 107ed2d

Please sign in to comment.