Skip to content

Commit

Permalink
Implement QueueingHintFn in VolumeRestriction
Browse files Browse the repository at this point in the history
  • Loading branch information
HirazawaUi committed Jul 23, 2023
1 parent 16534de commit 8da238a
Show file tree
Hide file tree
Showing 2 changed files with 375 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ import (
"context"
"fmt"

"github.com/google/go-cmp/cmp"
apiequality "k8s.io/apimachinery/pkg/api/equality"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/feature"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/names"
"k8s.io/kubernetes/pkg/scheduler/util"
)

// VolumeRestrictions is a plugin that checks volume restrictions.
Expand Down Expand Up @@ -338,13 +343,111 @@ func (pl *VolumeRestrictions) EventsToRegister() []framework.ClusterEventWithHin
// Pods may fail to schedule because of volumes conflicting with other pods on same node.
// Once running pods are deleted and volumes have been released, the unschedulable pod will be schedulable.
// Due to immutable fields `spec.volumes`, pod update events are ignored.
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}},
{Event: framework.ClusterEvent{Resource: framework.Pod, ActionType: framework.Delete}, QueueingHintFn: pl.isSchedulableAfterPodDeleted},
// A new Node may make a pod schedulable.
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}},
{Event: framework.ClusterEvent{Resource: framework.Node, ActionType: framework.Add}, QueueingHintFn: pl.isSchedulableAfterNodeAdded},
// Pods may fail to schedule because the PVC it uses has not yet been created.
// This PVC is required to exist to check its access modes.
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}},
{Event: framework.ClusterEvent{Resource: framework.PersistentVolumeClaim, ActionType: framework.Add | framework.Update}, QueueingHintFn: pl.isSchedulableAfterPersistentVolumeClaimChange},
}
}

// isSchedulableAfterPodDeleted is invoked whenever a pod deleted,
// It checks whether the deleted pod will conflict with volumes of other pods on the same node
func (pl *VolumeRestrictions) isSchedulableAfterPodDeleted(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
oldPod, _, err := util.As[*v1.Pod](oldObj, newObj)
if err != nil {
logger.Error(err, "unexpected objects in isSchedulableAfterPodDeleted", "oldObj", oldObj, "newObj", newObj)
return framework.QueueAfterBackoff
}

nodeInfo := framework.NewNodeInfo(oldPod)
if !satisfyVolumeConflicts(pod, nodeInfo) {
return framework.QueueSkip
}

return framework.QueueImmediately
}

// isSchedulableAfterNodeAdded is invoked whenever a node added, It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *VolumeRestrictions) isSchedulableAfterNodeAdded(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
_, newNode, err := util.As[*v1.Node](oldObj, newObj)
if err != nil {
logger.Error(err, "unexpected objects in isSchedulableAfterNodeAdded", "oldObj", oldObj, "newObj", newObj)
return framework.QueueAfterBackoff
}

logger.V(4).Info("node was added", "pod", klog.KObj(pod), "node", klog.KObj(newNode))
return framework.QueueImmediately
}

// isSchedulableAfterPersistentVolumeClaimChange is invoked whenever a PersistentVolumeClaim changed, It checks whether
// that change made a previously unschedulable pod schedulable.
func (pl *VolumeRestrictions) isSchedulableAfterPersistentVolumeClaimChange(logger klog.Logger, pod *v1.Pod, oldObj, newObj interface{}) framework.QueueingHint {
originPersistentVolumeClaim, modifiedPersistentVolumeClaim, err := util.As[*v1.PersistentVolumeClaim](oldObj, newObj)
if err != nil {
logger.Error(err, "unexpected objects in isSchedulableAfterPersistentVolumeClaimChange", "oldObj", oldObj, "newObj", newObj)
return framework.QueueAfterBackoff
}

usePVC := false
// Check if the pod's pvc is created
if pl.FilterPVCFromPod(pod, modifiedPersistentVolumeClaim) {
usePVC = true
}

if !usePVC {
// This was not the PersistentVolumeClaim the pod was waiting for.
logger.V(6).Info("unrelated PersistentVolumeClaim got modified", "pod", klog.KObj(pod), "PersistentVolumeClaim", klog.KObj(modifiedPersistentVolumeClaim))
return framework.QueueSkip
}

if !pl.enableReadWriteOncePod {
logger.V(7).Info("spec of PersistentVolumeClaim for pod got updated", "pod", klog.KObj(pod), "PersistentVolumeClaim", klog.KObj(modifiedPersistentVolumeClaim))
return framework.QueueImmediately
}

// If the PVC is modified but the accessModes is not modified, the modified has no relate with the pod, and the pod is still in unschedulable
if oldObj != nil && apiequality.Semantic.DeepEqual(originPersistentVolumeClaim.Spec.AccessModes, modifiedPersistentVolumeClaim.Spec.AccessModes) {
if loggerV := logger.V(7); loggerV.Enabled() {
// Log more information.
loggerV.Info("PersistentVolumeClaim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "PersistentVolumeClaim", klog.KObj(modifiedPersistentVolumeClaim), "diff", cmp.Diff(originPersistentVolumeClaim, modifiedPersistentVolumeClaim))
} else {
logger.V(6).Info("PersistentVolumeClaim for pod got modified where the pod doesn't care", "pod", klog.KObj(pod), "PersistentVolumeClaim", klog.KObj(modifiedPersistentVolumeClaim))
}
return framework.QueueSkip
}

key := framework.GetNamespacedName(pod.Namespace, modifiedPersistentVolumeClaim.Name)
if pl.sharedLister.StorageInfos().IsPVCUsedByPods(key) && v1helper.ContainsAccessMode(modifiedPersistentVolumeClaim.Spec.AccessModes, v1.ReadWriteOncePod) {
logger.V(4).Info("Pod uses PVCs with the ReadWriteOncePod access mode", "pod", klog.KObj(pod), "PersistentVolumeClaim", klog.KObj(modifiedPersistentVolumeClaim))
return framework.QueueAfterBackoff
}

logger.V(4).Info("spec of PersistentVolumeClaim for pod got updated", "pod", klog.KObj(pod), "PersistentVolumeClaim", klog.KObj(modifiedPersistentVolumeClaim))
return framework.QueueImmediately
}

// FilterPVCFromPod Get the PVC bound to the volume in the pod and compare it to the name and namespace of the parameter pvc
// return true if the name and namespace are equal, false if not
func (pl *VolumeRestrictions) FilterPVCFromPod(pod *v1.Pod, pvc *v1.PersistentVolumeClaim) bool {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim == nil {
continue
}

podPersistentVolumeClaim, err := pl.pvcLister.PersistentVolumeClaims(pod.Namespace).Get(volume.PersistentVolumeClaim.ClaimName)
if err != nil {
continue
}

if pvc.Name == podPersistentVolumeClaim.Name && pvc.Namespace == podPersistentVolumeClaim.Namespace {
return true
}
}

return false
}

// New initializes a new plugin and returns it.
Expand Down

0 comments on commit 8da238a

Please sign in to comment.