Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Indexed completionMode to Job API #98441

Merged
merged 1 commit into from Mar 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pkg/apis/batch/fuzzer/fuzzer.go
Expand Up @@ -18,7 +18,6 @@ package fuzzer

import (
fuzz "github.com/google/gofuzz"

runtimeserializer "k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/apis/batch"
)
Expand Down Expand Up @@ -53,6 +52,11 @@ var Funcs = func(codecs runtimeserializer.CodecFactory) []interface{} {
} else {
j.ManualSelector = nil
}
if c.Rand.Int31()%2 == 0 {
j.CompletionMode = batch.NonIndexedCompletion
} else {
j.CompletionMode = batch.IndexedCompletion
}
},
func(sj *batch.CronJobSpec, c fuzz.Continue) {
c.FuzzNoCustom(sj)
Expand Down
48 changes: 48 additions & 0 deletions pkg/apis/batch/types.go
Expand Up @@ -85,6 +85,22 @@ type JobTemplateSpec struct {
Spec JobSpec
}

// CompletionMode specifies how Pod completions of a Job are tracked.
type CompletionMode string

const (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that the comment describes completion from the pod perspective. Since the mode is a job level parameter, it might be clearer if we describe it from the job perspective?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggested wording?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps:

NonIndexedMode: a job is completed when the number of successfully completed pods is equal to Completions, if completions is nil, then the success of any pod signals the success of the job.

IndexedMode: a job is completed when at least one pod for each unique index between 0 to Completions - 1 successfully completed.

When we start describing the mode from the perspective of the job, I think it becomes more important to document the restrictions on Completions/Parallelism values here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly, comments for constants don't make it to the reference documentation https://kubernetes.io/docs/reference/kubernetes-api/

So leaving the comments here lean and putting all details in the field.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, that is bad!

// NonIndexedCompletion is a Job completion mode. In this mode, the Job is
// considered complete when there have been .spec.completions
// successfully completed Pods. Pod completions are homologous to each other.
NonIndexedCompletion CompletionMode = "NonIndexed"

// IndexedCompletion is a Job completion mode. In this mode, the Pods of a
// Job get an associated completion index from 0 to (.spec.completions - 1).
// The Job is considered complete when a Pod completes for each completion
// index.
IndexedCompletion CompletionMode = "Indexed"
)

// JobSpec describes how the job execution will look like.
type JobSpec struct {

Expand Down Expand Up @@ -149,6 +165,28 @@ type JobSpec struct {
// TTLAfterFinished feature.
// +optional
TTLSecondsAfterFinished *int32

// CompletionMode specifies how Pod completions are tracked. It can be
// `NonIndexed` (default) or `Indexed`.
//
// `NonIndexed` means that the Job is considered complete when there have
// been .spec.completions successfully completed Pods. Each Pod completion is
// homologous to each other.
//
// `Indexed` means that the Pods of a
// Job get an associated completion index from 0 to (.spec.completions - 1),
// available in the annotation batch.alpha.kubernetes.io/job-completion-index.
// The Job is considered complete when there is one successfully completed Pod
// for each index.
// When value is `Indexed`, .spec.completions must be specified and
// `.spec.parallelism` must be less than or equal to 10^5.
//
// This field is alpha-level and is only honored by servers that enable the
// IndexedJob feature gate. More completion modes can be added in the future.
// If the Job controller observes a mode that it doesn't recognize, the
// controller skips updates for the Job.
// +optional
CompletionMode CompletionMode
}

// JobStatus represents the current state of a Job.
Expand Down Expand Up @@ -183,6 +221,16 @@ type JobStatus struct {
// The number of pods which reached phase Failed.
// +optional
Failed int32

// CompletedIndexes holds the completed indexes when .spec.completionMode =
// "Indexed" in a text format. The indexes are represented as decimal integers
// separated by commas. The numbers are listed in increasing order. Three or
// more consecutive numbers are compressed and represented by the first and
// last element of the series, separated by a hyphen.
// For example, if the completed indexes are 1, 3, 4, 5 and 7, they are
// represented as "1,3-5,7".
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mention the limit on this field if there is one.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no limit here. It's just bounded by parallelism.

// +optional
CompletedIndexes string
}

// JobConditionType is a valid value for JobCondition.Type
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/batch/v1/defaults.go
Expand Up @@ -46,4 +46,7 @@ func SetDefaults_Job(obj *batchv1.Job) {
if labels != nil && len(obj.Labels) == 0 {
obj.Labels = labels
}
if len(obj.Spec.CompletionMode) == 0 {
obj.Spec.CompletionMode = batchv1.NonIndexedCompletion
}
}
116 changes: 65 additions & 51 deletions pkg/apis/batch/v1/defaults_test.go
Expand Up @@ -21,15 +21,15 @@ import (
"testing"

batchv1 "k8s.io/api/batch/v1"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/api/legacyscheme"
_ "k8s.io/kubernetes/pkg/apis/batch/install"
. "k8s.io/kubernetes/pkg/apis/batch/v1"
_ "k8s.io/kubernetes/pkg/apis/core/install"
utilpointer "k8s.io/utils/pointer"

. "k8s.io/kubernetes/pkg/apis/batch/v1"
)

func TestSetDefaultJob(t *testing.T) {
Expand All @@ -49,9 +49,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
Expand All @@ -69,9 +70,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
},
Expand All @@ -86,8 +88,9 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: utilpointer.Int32Ptr(0),
BackoffLimit: utilpointer.Int32Ptr(6),
Parallelism: utilpointer.Int32Ptr(0),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
Expand All @@ -103,8 +106,9 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: utilpointer.Int32Ptr(2),
BackoffLimit: utilpointer.Int32Ptr(6),
Parallelism: utilpointer.Int32Ptr(2),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
Expand All @@ -120,9 +124,10 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(2),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
Completions: utilpointer.Int32Ptr(2),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(6),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
Expand All @@ -138,29 +143,32 @@ func TestSetDefaultJob(t *testing.T) {
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(5),
Completions: utilpointer.Int32Ptr(1),
Parallelism: utilpointer.Int32Ptr(1),
BackoffLimit: utilpointer.Int32Ptr(5),
CompletionMode: batchv1.NonIndexedCompletion,
},
},
expectLabels: true,
},
"All set -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
CompletionMode: batchv1.NonIndexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
Completions: utilpointer.Int32Ptr(8),
Parallelism: utilpointer.Int32Ptr(9),
BackoffLimit: utilpointer.Int32Ptr(10),
CompletionMode: batchv1.NonIndexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
Expand All @@ -171,57 +179,63 @@ func TestSetDefaultJob(t *testing.T) {
"All set, flipped -> no change": {
original: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
CompletionMode: batchv1.IndexedCompletion,
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: defaultLabels},
},
},
},
expected: &batchv1.Job{
Spec: batchv1.JobSpec{
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
Completions: utilpointer.Int32Ptr(11),
Parallelism: utilpointer.Int32Ptr(10),
BackoffLimit: utilpointer.Int32Ptr(9),
CompletionMode: batchv1.IndexedCompletion,
},
},
expectLabels: true,
},
}

for name, test := range tests {
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
actual, ok := obj2.(*batchv1.Job)
if !ok {
t.Errorf("%s: unexpected object: %v", name, actual)
t.FailNow()
}
t.Run(name, func(t *testing.T) {

validateDefaultInt32(t, name, "Completions", actual.Spec.Completions, expected.Spec.Completions)
validateDefaultInt32(t, name, "Parallelism", actual.Spec.Parallelism, expected.Spec.Parallelism)
validateDefaultInt32(t, name, "BackoffLimit", actual.Spec.BackoffLimit, expected.Spec.BackoffLimit)

if test.expectLabels != reflect.DeepEqual(actual.Labels, actual.Spec.Template.Labels) {
if test.expectLabels {
t.Errorf("%s: expected: %v, got: %v", name, actual.Spec.Template.Labels, actual.Labels)
} else {
t.Errorf("%s: unexpected equality: %v", name, actual.Labels)
original := test.original
expected := test.expected
obj2 := roundTrip(t, runtime.Object(original))
actual, ok := obj2.(*batchv1.Job)
if !ok {
t.Fatalf("Unexpected object: %v", actual)
}
}

validateDefaultInt32(t, "Completions", actual.Spec.Completions, expected.Spec.Completions)
validateDefaultInt32(t, "Parallelism", actual.Spec.Parallelism, expected.Spec.Parallelism)
validateDefaultInt32(t, "BackoffLimit", actual.Spec.BackoffLimit, expected.Spec.BackoffLimit)

if test.expectLabels != reflect.DeepEqual(actual.Labels, actual.Spec.Template.Labels) {
if test.expectLabels {
t.Errorf("Expected labels: %v, got: %v", actual.Spec.Template.Labels, actual.Labels)
} else {
t.Errorf("Unexpected equality: %v", actual.Labels)
}
}
if actual.Spec.CompletionMode != expected.Spec.CompletionMode {
t.Errorf("Got CompletionMode: %v, want: %v", actual.Spec.CompletionMode, expected.Spec.CompletionMode)
}
})
}
}

func validateDefaultInt32(t *testing.T, name string, field string, actual *int32, expected *int32) {
func validateDefaultInt32(t *testing.T, field string, actual *int32, expected *int32) {
if (actual == nil) != (expected == nil) {
t.Errorf("%s: got different *%s than expected: %v %v", name, field, actual, expected)
t.Errorf("Got different *%s than expected: %v %v", field, actual, expected)
}
if actual != nil && expected != nil {
if *actual != *expected {
t.Errorf("%s: got different %s than expected: %d %d", name, field, *actual, *expected)
t.Errorf("Got different %s than expected: %d %d", field, *actual, *expected)
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/apis/batch/v1/zz_generated.conversion.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.