Skip to content

Commit

Permalink
addressed comments
Browse files Browse the repository at this point in the history
  • Loading branch information
vicentefb committed May 14, 2024
1 parent 97babe7 commit de8dc04
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 49 deletions.
18 changes: 7 additions & 11 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
utilac "sigs.k8s.io/kueue/pkg/util/admissioncheck"
utileq "sigs.k8s.io/kueue/pkg/util/equality"
"sigs.k8s.io/kueue/pkg/util/limitrange"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
"sigs.k8s.io/kueue/pkg/workload"
Expand Down Expand Up @@ -240,7 +241,7 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, err
}
if features.Enabled(features.ResizableJobs) {
if err := r.downSizeJobIfNecessary(&wl, ctx); err != nil {
if err := r.downSizePodSetAssigmentsIfNecessary(&wl, ctx); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -305,16 +306,11 @@ func (r *WorkloadReconciler) isScaledDown(wl *kueue.Workload) bool {
if !features.Enabled(features.ResizableJobs) {
return false
}
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
if ptr.Deref(wl.Status.Admission.PodSetAssignments[i].Count, 0) > wl.Spec.PodSets[i].Count {
return true
}
}
return false

return utileq.IsResizedPsa(wl.Status.Admission.PodSetAssignments[1:], wl.Spec.PodSets[1:])
}

func (r *WorkloadReconciler) downSizeJobIfNecessary(wl *kueue.Workload, ctx context.Context) error {
func (r *WorkloadReconciler) downSizePodSetAssigmentsIfNecessary(wl *kueue.Workload, ctx context.Context) error {
updateStatus := false
podSetSize := len(wl.Spec.PodSets)
for i := 1; i < podSetSize; i++ {
Expand All @@ -329,10 +325,10 @@ func (r *WorkloadReconciler) downSizeJobIfNecessary(wl *kueue.Workload, ctx cont
for k := range currentAssignedResourceUsage {
resourceQuantity := originalResourceRequests[k]
resourceQuantity.Mul(int64(diff))
originalResourceRequests[k] = resourceQuantity
//originalResourceRequests[k] = resourceQuantity

assignedResourceQuantity := currentAssignedResourceUsage[k]
assignedResourceQuantity.Sub(originalResourceRequests[k])
assignedResourceQuantity.Sub(resourceQuantity)
currentAssignedResourceUsage[k] = assignedResourceQuantity
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/jobframework/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ type JobWithReclaimablePods interface {
ReclaimablePods() ([]kueue.ReclaimablePod, error)
}

type ReizableJobs interface {
// IsResizable returns the true/false depending if the job is being downsized.
type ResizableJob interface {
// IsResizable returns the true/false if the job supports resizing.
// It only supports downsizing for now, see KEP:77
IsResizable(wl *kueue.Workload) bool
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
}

// 4.1 update podSetCount for RayCluster resize (downsize)
if jobSizeable, implementsSizable := job.(ReizableJobs); implementsSizable && jobSizeable.IsResizable(wl) && workload.IsAdmitted(wl) {
if jobSizeable, implementsSizable := job.(ResizableJob); implementsSizable && jobSizeable.IsResizable(wl) && workload.IsAdmitted(wl) {
toUpdate := wl
_, err := r.updateWorkloadToMatchJob(ctx, job, object, toUpdate, "Updated Workload due to resize: %v")
if err != nil {
Expand Down Expand Up @@ -709,7 +709,7 @@ func equivalentToWorkload(ctx context.Context, c client.Client, job GenericJob,
jobPodSets := clearMinCountsIfFeatureDisabled(job.PodSets())

if runningPodSets := expectedRunningPodSets(ctx, c, wl); runningPodSets != nil {
jobSizeable, implementsSizable := job.(ReizableJobs)
jobSizeable, implementsSizable := job.(ResizableJob)
if equality.ComparePodSetSlices(jobPodSets, runningPodSets) || (implementsSizable && jobSizeable.IsResizable(wl)) {
return true
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/controller/jobs/raycluster/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/kueue/pkg/controller/jobframework"
"sigs.k8s.io/kueue/pkg/features"
"sigs.k8s.io/kueue/pkg/podset"
"sigs.k8s.io/kueue/pkg/util/equality"
)

var (
Expand Down Expand Up @@ -93,13 +94,8 @@ func (j *RayCluster) IsResizable(wl *kueue.Workload) bool {
return false
}
pods := j.PodSets()
for i := 1; i < len(pods); i++ {
if wl.Spec.PodSets[i].Count > pods[i].Count {
return true
}
}
return equality.IsResized(wl.Spec.PodSets[1:], pods[1:])

return false
}

func (j *RayCluster) PodSets() []kueue.PodSet {
Expand Down
32 changes: 32 additions & 0 deletions pkg/util/equality/podset.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,35 @@ func ComparePodSetSlices(a, b []kueue.PodSet) bool {
}
return true
}

func isDownsized(oldPodsets, newPodSets []kueue.PodSet) bool {
for i := 0; i < len(oldPodsets); i++ {
if oldPodsets[i].Count > newPodSets[i].Count {
return true
}
}
return false
}

// First check is to see if there was an upsize which we don't support yet
// Second check is to see if there was a downsize
func IsResized(oldPodsets, newPodSets []kueue.PodSet) bool {
return !isDownsized(newPodSets, oldPodsets) && isDownsized(oldPodsets, newPodSets)
}

// This check is intended for workload PodSets and PodSetAssignments
// First check is to see if there was an upsize which we don't support yet
// Second check is to see if there was a downsize
func IsResizedPsa(oldPodsetsAssignments []kueue.PodSetAssignment, newPodSets []kueue.PodSet) bool {
for i := 0; i < len(oldPodsetsAssignments); i++ {
if ptr.Deref(oldPodsetsAssignments[i].Count, 0) < newPodSets[i].Count {
return false
}
}
for i := 0; i < len(oldPodsetsAssignments); i++ {
if ptr.Deref(oldPodsetsAssignments[i].Count, 0) > newPodSets[i].Count {
return true
}
}
return false
}
28 changes: 0 additions & 28 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,6 @@ func MakeWorkload(name, ns string) *WorkloadWrapper {
}}
}

// MakeWorkload creates a wrapper for a Workload with a single
// pod with a single container.
func MakeWorkloadWithMorePodSets(name, ns string) *WorkloadWrapper {
return &WorkloadWrapper{kueue.Workload{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: ns},
Spec: kueue.WorkloadSpec{
PodSets: []kueue.PodSet{
*MakePodSet("main", 3).Obj(),
},
},
}}
}

func (w *WorkloadWrapper) ScaleDown() *WorkloadWrapper {
w.Spec.PodSets[0].Count = w.Spec.PodSets[0].Count - 1
return w
}

func (w *WorkloadWrapper) IncreasePodSetCount() *WorkloadWrapper {
w.Spec.PodSets[0].Count = w.Spec.PodSets[0].Count + 1
return w
}

func (w *WorkloadWrapper) Obj() *kueue.Workload {
return &w.Workload
}
Expand Down Expand Up @@ -131,11 +108,6 @@ func (w *WorkloadWrapper) Request(r corev1.ResourceName, q string) *WorkloadWrap
return w
}

func (w *WorkloadWrapper) RequestScaleDown(r corev1.ResourceName, q string) *WorkloadWrapper {
w.Spec.PodSets[1].Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(q)
return w
}

func (w *WorkloadWrapper) Limit(r corev1.ResourceName, q string) *WorkloadWrapper {
res := &w.Spec.PodSets[0].Template.Spec.Containers[0].Resources
if res.Limits == nil {
Expand Down

0 comments on commit de8dc04

Please sign in to comment.