Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement QueueingHintFn in VolumeRestriction #119405

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)

// VolumeRestrictions is a plugin that checks volume restrictions.
Expand Down Expand Up @@ -168,7 +170,7 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo
}
}

pvcs, err := pl.readWriteOncePodPVCsForPod(ctx, pod)
pvcs, err := pl.readWriteOncePodPVCsForPod(pod, false)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, framework.NewStatus(framework.UnschedulableAndUnresolvable, err.Error())
Expand Down Expand Up @@ -239,7 +241,9 @@ func (pl *VolumeRestrictions) calPreFilterState(ctx context.Context, pod *v1.Pod
}, nil
}

func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, pod *v1.Pod) (sets.Set[string], error) {
// readWriteOncePodPVCsForPod returns the name of ReadWriteOncePod PVCs in a given Pod.
// If ignoreNotFoundError is true, it tries to check all PVCs, ignoring not found errors.
func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(pod *v1.Pod, ignoreNotFoundError bool) (sets.Set[string], error) {
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
pvcs := sets.New[string]()
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
Expand All @@ -248,6 +252,9 @@ func (pl *VolumeRestrictions) readWriteOncePodPVCsForPod(ctx context.Context, po

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if ignoreNotFoundError && apierrors.IsNotFound(err) {
continue
}
return nil, err
}

Expand Down Expand Up @@ -321,13 +328,103 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
// A new Node may make a pod schedulable.
// We intentionally don't set QueueingHint since all Node/Add events could make Pods schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
}
}

// isSchedulableAfterPodDeleted is invoked whenever a pod deleted,
// It checks whether the deleted pod will conflict with volumes of other pods on the same node
func (pl *VolumeRestrictions) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
deletedPod, _, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPodDeleted: %w", err)
}

if deletedPod.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}

newPodPvcs, err := pl.readWriteOncePodPVCsForPod(pod, false)
if err != nil {
if apierrors.IsNotFound(err) {
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
logger.V(5).Info("no PVC for the Pod is found, this Pod won't be schedulable until PVC is created", "pod", klog.KObj(pod), "err", err)
return framework.QueueSkip, nil
}
return framework.Queue, err
}

// deletedPod may contain multiple PVCs and maybe one or more PVCs have been deleted (while some still exist).
// We can always ignore a deleted PVC associated with this deleted Pod because:
// - if a new Pod has this PVC, this Pod won't be schedulable until the PVC with the same name is recreated.
// - if a new Pod doesn't have this PVC, this Pod is completely not related to that deleted PVC.
//
// But, a complex scenario is that when the deleted Pod has more than one PVCs, and PVC-1 is deleted, but PVC-x isn't deleted.
// In this case, as the above describes, PVC1 can be ignored anyway.
// But we still need to check PVC-x, whether the deletion of deletedPod could make the pod schedulable.
deletedPodPvcs, err := pl.readWriteOncePodPVCsForPod(deletedPod, true)
if err != nil {
return framework.Queue, err
}

// If oldPod and the current pod are in conflict because of readWriteOncePodPVC,
// the current pod may be scheduled in the next scheduling cycle, so we return Queue
for pvc := range deletedPodPvcs {
if newPodPvcs.Has(pvc) {
return framework.Queue, nil
}
}
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved

nodeInfo := framework.NewNodeInfo(deletedPod)
if !satisfyVolumeConflicts(pod, nodeInfo) {
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
return framework.Queue, nil
}

return framework.QueueSkip, nil
}

// isSchedulableAfterPersistentVolumeClaimChange is invoked whenever a PersistentVolumeClaim added or changed, It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *VolumeRestrictions) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) (framework.QueueingHint, error) {
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
oldPersistentVolumeClaim, newPersistentVolumeClaim, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
if err != nil {
return framework.Queue, fmt.Errorf("unexpected objects in isSchedulableAfterPersistentVolumeClaimChange: %w", err)
}

if oldPersistentVolumeClaim != nil || newPersistentVolumeClaim.Namespace != pod.Namespace {
return framework.QueueSkip, nil
}

pvcs := sets.New[string]()

for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

pvc, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(5).Info("The PVC for the Pod is not created, and this Pod won't be schedulable until the PVC is created",
"pod", klog.KObj(pod), "pvc", volume.PersistentVolumeClaim.ClaimName, "err", err)
return framework.QueueSkip, nil
}
return framework.Queue, err
}

pvcs.Insert(pvc.Name)
}

HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
// We're only interested in PVC which the Pod requests.
if oldPersistentVolumeClaim == nil && pvcs.Has(newPersistentVolumeClaim.Name) {
return framework.Queue, nil
HirazawaUi marked this conversation as resolved.
Show resolved Hide resolved
}
return framework.QueueSkip, nil
}

// New initializes a new plugin and returns it.
Expand Down